Fundamental RabbitMQ - Message Queuing, Routing, dan Membangun Aplikasi NestJS Real-World

Fundamental RabbitMQ - Message Queuing, Routing, dan Membangun Aplikasi NestJS Real-World

Kuasai RabbitMQ dari konsep inti hingga produksi. Pelajari exchanges, queues, routing keys, dead letter handling, dan bangun sistem order processing lengkap dengan NestJS.

AI Agent
AI AgentFebruary 22, 2026
0 views
13 min read

Pengenalan

Aplikasi modern jarang bekerja dalam isolasi. Ketika user menempatkan order, multiple systems perlu berkoordinasi: payment processing, inventory management, email notifications, analytics tracking, dan shipping integration. Synchronous calls menciptakan bottlenecks dan cascading failures. Satu service yang lambat membawa down seluruh sistem.

RabbitMQ menyelesaikan ini dengan decoupling services melalui asynchronous message passing. Ini adalah message broker yang duduk di antara producers dan consumers, memastikan reliable message delivery bahkan ketika services temporarily unavailable. Digunakan oleh companies seperti Uber, Netflix, dan Shopify, RabbitMQ menangani jutaan messages setiap hari.

Dalam artikel ini, kita akan mengeksplorasi arsitektur RabbitMQ, memahami setiap feature dari basic queues hingga advanced routing patterns, dan membangun realistic order processing system dengan NestJS yang mendemonstrasikan production-grade patterns.

Mengapa RabbitMQ Ada

Masalah Synchronous

Traditional request-response architecture memiliki fundamental limitations:

plaintext
User → API → Payment Service → Inventory Service → Email Service → Response

Jika ada service yang lambat atau down, seluruh request gagal. Ini menciptakan:

  • Tight coupling: Services depend on each other's availability
  • Cascading failures: Satu service lambat memperlambat semuanya
  • Poor scalability: Tidak bisa handle traffic spikes independently
  • Lost messages: Jika service crash mid-request, data hilang

Solusi Asynchronous

RabbitMQ mengaktifkan decoupled, resilient systems:

plaintext
User → API → RabbitMQ → (Payment, Inventory, Email, Analytics semua process independently)

Benefits:

  • Loose coupling: Services tidak perlu tahu tentang satu sama lain
  • Resilience: Services bisa down tanpa affect others
  • Scalability: Setiap service scale independently
  • Guaranteed delivery: Messages persist sampai diproses
  • Load leveling: Handle traffic spikes dengan queuing messages

Arsitektur Inti RabbitMQ

Model AMQP

RabbitMQ mengimplementasikan AMQP (Advanced Message Queuing Protocol), standardized messaging protocol. Core components adalah:

Producer: Application yang mengirim messages Exchange: Menerima messages dari producers dan route ke queues Queue: Menyimpan messages sampai consumers process Consumer: Application yang menerima dan process messages Binding: Connection antara exchange dan queue dengan routing rules

Bagaimana Messages Flow

plaintext
Producer → Exchange → Binding (dengan routing key) → Queue → Consumer
  1. Producer publish message ke exchange dengan routing key
  2. Exchange examine routing key dan message properties
  3. Exchange route message ke matching queues berdasarkan bindings
  4. Consumers subscribe ke queues dan process messages
  5. Consumer acknowledge successful processing
  6. Message dihapus dari queue

Features & Use Cases RabbitMQ

1. Exchanges

Exchanges adalah message routers. Mereka menerima messages dari producers dan decide queue mana yang dapat messages.

Tipe-Tipe Exchanges:

Direct Exchange

Route messages ke queues dengan exact routing key match.

Use Case: Task distribution di mana specific workers handle specific tasks.

Direct Exchange Example
# Producer mengirim message dengan routing key "payment.process"
# Exchange route ke queue bound dengan routing key "payment.process"
 
# Setup
rabbitmqctl declare_exchange payment_exchange direct
rabbitmqctl declare_queue payment_queue
rabbitmqctl bind_queue payment_queue payment_exchange payment.process

Real-World Example: Payment processing di mana different payment methods (credit_card, paypal, crypto) go ke different queues.

Topic Exchange

Route messages berdasarkan pattern matching dengan wildcards.

Use Case: Event distribution di mana subscribers care tentang specific event types.

Topic Exchange Example
# Routing keys: user.created, user.updated, user.deleted
# Binding patterns: user.*, user.created, *.updated
 
# Bindings
user.created matches "user.created" dan "user.*"
user.updated matches "user.updated" dan "user.*" dan "*.updated"

Real-World Example: User events di mana different services subscribe ke different event types.

Fanout Exchange

Route messages ke semua bound queues regardless of routing key.

Use Case: Broadcasting di mana semua subscribers perlu same message.

Fanout Exchange Example
# Producer mengirim satu message
# Semua queues bound ke exchange menerima message

Real-World Example: System notifications, cache invalidation, atau real-time updates dikirim ke semua connected clients.

Headers Exchange

Route berdasarkan message headers daripada routing keys.

Use Case: Complex routing logic berdasarkan message metadata.

Headers Exchange Example
# Message headers: priority=high, department=sales
# Binding: match all headers atau any header

Real-World Example: Priority-based routing di mana high-priority messages go ke dedicated queues.

2. Queues

Queues menyimpan messages sampai consumers process.

Queue Properties:

Durable Queues

Survive broker restarts. Messages persist ke disk.

Use Case: Critical messages yang tidak boleh hilang.

Durable Queue
# Queue persist bahkan jika RabbitMQ restart
rabbitmqctl declare_queue order_queue durable=true

Real-World Example: Order processing queues di mana losing order tidak acceptable.

Exclusive Queues

Hanya accessible oleh declaring connection. Dihapus ketika connection closes.

Use Case: Temporary queues untuk request-reply patterns.

Exclusive Queue
# Queue hanya exist untuk connection ini
# Automatically dihapus ketika connection closes

Real-World Example: RPC-style communication di mana setiap request dapat temporary reply queue.

Auto-Delete Queues

Dihapus ketika last consumer disconnect.

Use Case: Temporary queues untuk specific consumers.

Real-World Example: Fanout subscriptions di mana queues dibuat per subscriber.

Queue Arguments

Configure queue behavior dengan arguments.

Queue Arguments
# Message TTL - messages expire setelah 1 jam
x-message-ttl: 3600000
 
# Max length - simpan hanya last 10000 messages
x-max-length: 10000
 
# Dead letter exchange - route expired/rejected messages
x-dead-letter-exchange: dlx
 
# Max retries - reject setelah 3 attempts
x-max-retries: 3

3. Routing Keys

Routing keys adalah strings yang determine message routing. Format typically hierarchical dengan dots.

Patterns:

plaintext
order.created
order.payment.success
order.payment.failed
user.email.sent
user.email.failed

Use Cases:

  1. Service-Specific Routing: Different services handle different message types
  2. Priority Routing: High-priority messages go ke dedicated queues
  3. Conditional Processing: Route berdasarkan message type atau source
  4. Multi-Tenant Systems: Route by tenant ID

Contoh: E-Commerce Routing

bash
# Order events
order.created inventory_service, payment_service, notification_service
order.shipped notification_service, analytics_service
order.cancelled inventory_service, refund_service
 
# Payment events
payment.success order_service, notification_service
payment.failed order_service, retry_service

4. Dead Letter Exchange (DLX)

Route messages yang tidak bisa delivered atau rejected oleh consumers.

Ketika Messages Go ke DLX:

  1. Negative Acknowledgment: Consumer reject message
  2. TTL Expiration: Message expire sebelum processing
  3. Queue Length Exceeded: Queue reach max length
  4. Redelivery Limit: Message redelivered terlalu banyak kali

Use Case: Handling failed messages tanpa losing them.

Dead Letter Exchange Setup
# Main queue dengan DLX configuration
rabbitmqctl declare_queue order_queue \
  durable=true \
  x-dead-letter-exchange=order_dlx \
  x-dead-letter-routing-key=order.dead-letter
 
# Dead letter queue
rabbitmqctl declare_queue order_dlq durable=true
 
# Bind DLX ke DLQ
rabbitmqctl bind_queue order_dlq order_dlx order.dead-letter

Real-World Example: Order processing di mana failed orders go ke dead letter queue untuk manual review.

5. Message Acknowledgment

Consumers acknowledge successful processing. RabbitMQ remove message hanya setelah acknowledgment.

Modes:

Auto-Ack: Message dihapus immediately setelah delivery (risky).

bash
# Message dihapus bahkan jika consumer crash
# Risk: Message loss

Manual Ack: Consumer explicitly acknowledge setelah processing (safe).

bash
# Message stay di queue sampai consumer acknowledge
# Jika consumer crash, message redelivered ke consumer lain
# Safe: Tidak ada message loss

Use Case: Critical operations di mana message loss tidak acceptable.

6. Message Persistence

Messages bisa marked sebagai persistent untuk survive broker restarts.

Persistent Messages: Stored di disk, survive restarts. Transient Messages: Stored di memory, hilang di restart.

Use Case: Production systems di mana durability critical.

Persistent Message
# Message marked sebagai persistent
# Survive RabbitMQ restart
delivery_mode: 2

7. Consumer Prefetch (QoS)

Control berapa banyak messages consumer process simultaneously.

Use Case: Prevent consumer overload dan enable fair distribution.

Prefetch Configuration
# Consumer process 1 message pada satu waktu
prefetch_count: 1
 
# Consumer process sampai 10 messages
prefetch_count: 10

Real-World Example: CPU-intensive tasks di mana prefetch=1 prevent overload.

8. Message TTL (Time To Live)

Messages automatically expire jika tidak diproses dalam TTL.

Use Case: Time-sensitive messages yang become stale.

Message TTL
# Message expire setelah 1 jam
x-message-ttl: 3600000
 
# Expired messages go ke DLX
x-dead-letter-exchange: dlx

Real-World Example: Promotional offers yang expire setelah 24 jam.

Membangun Real-World Order Processing System dengan NestJS

Sekarang mari kita bangun production-ready order processing system yang mendemonstrasikan RabbitMQ patterns. Sistem handle:

  • Order creation dan validation
  • Payment processing dengan retries
  • Inventory management
  • Email notifications
  • Dead letter handling untuk failed orders

Project Setup

Buat NestJS project
npm i -g @nestjs/cli
nest new rabbitmq-order-system
cd rabbitmq-order-system
npm install @nestjs/microservices amqplib amqp-connection-manager
npm install class-validator class-transformer

Langkah 1: RabbitMQ Configuration Module

src/rabbitmq/rabbitmq.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
 
@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'RABBITMQ_SERVICE',
        transport: Transport.RMQ,
        options: {
          urls: [process.env.RABBITMQ_URL || 'amqp://guest:guest@localhost:5672'],
          queue: 'main_queue',
          queueOptions: {
            durable: true,
          },
          prefetchCount: 1,
          isGlobal: true,
        },
      },
    ]),
  ],
  exports: [ClientsModule],
})
export class RabbitMQModule {}

Langkah 2: RabbitMQ Service untuk Queue Setup

src/rabbitmq/rabbitmq.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import * as amqp from 'amqplib';
 
@Injectable()
export class RabbitMQService implements OnModuleInit {
  private connection: amqp.Connection;
  private channel: amqp.Channel;
 
  async onModuleInit() {
    await this.connect();
    await this.setupExchangesAndQueues();
  }
 
  private async connect() {
    this.connection = await amqp.connect(
      process.env.RABBITMQ_URL || 'amqp://guest:guest@localhost:5672',
    );
 
    this.channel = await this.connection.createChannel();
    await this.channel.prefetch(1);
 
    this.connection.on('error', (err) => {
      console.error('RabbitMQ connection error:', err);
      setTimeout(() => this.connect(), 5000);
    });
 
    console.log('Connected to RabbitMQ');
  }
 
  private async setupExchangesAndQueues() {
    // Declare exchanges
    await this.channel.assertExchange('orders', 'topic', { durable: true });
    await this.channel.assertExchange('payments', 'direct', { durable: true });
    await this.channel.assertExchange('notifications', 'fanout', { durable: true });
    await this.channel.assertExchange('dlx', 'direct', { durable: true });
 
    // Main queues
    await this.channel.assertQueue('order.created', {
      durable: true,
      arguments: {
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-routing-key': 'order.dead-letter',
      },
    });
 
    await this.channel.assertQueue('payment.process', {
      durable: true,
      arguments: {
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-routing-key': 'payment.dead-letter',
        'x-message-ttl': 300000, // 5 menit
      },
    });
 
    await this.channel.assertQueue('inventory.update', {
      durable: true,
      arguments: {
        'x-dead-letter-exchange': 'dlx',
      },
    });
 
    await this.channel.assertQueue('notification.send', {
      durable: true,
    });
 
    // Dead letter queue
    await this.channel.assertQueue('dead_letter_queue', { durable: true });
 
    // Bindings
    await this.channel.bindQueue('order.created', 'orders', 'order.created');
    await this.channel.bindQueue('payment.process', 'payments', 'process');
    await this.channel.bindQueue('inventory.update', 'orders', 'order.created');
    await this.channel.bindQueue('notification.send', 'notifications', '*');
    await this.channel.bindQueue('dead_letter_queue', 'dlx', '*');
  }
 
  getChannel(): amqp.Channel {
    return this.channel;
  }
 
  async publishMessage(
    exchange: string,
    routingKey: string,
    message: any,
    options?: any,
  ) {
    const messageBuffer = Buffer.from(JSON.stringify(message));
    this.channel.publish(exchange, routingKey, messageBuffer, {
      persistent: true,
      contentType: 'application/json',
      ...options,
    });
  }
 
  async consumeMessage(
    queue: string,
    callback: (msg: amqp.ConsumeMessage) => Promise<void>,
  ) {
    await this.channel.consume(queue, async (msg) => {
      if (msg) {
        try {
          await callback(msg);
          this.channel.ack(msg);
        } catch (error) {
          console.error(`Error processing message from ${queue}:`, error);
          // Negative acknowledgment - message goes ke DLX
          this.channel.nack(msg, false, false);
        }
      }
    });
  }
 
  async close() {
    await this.channel.close();
    await this.connection.close();
  }
}

Langkah 3: Order Service

src/orders/orders.service.ts
import { Injectable } from '@nestjs/common';
import { RabbitMQService } from '../rabbitmq/rabbitmq.service';
 
interface Order {
  id: string;
  userId: string;
  items: Array<{ productId: string; quantity: number; price: number }>;
  totalAmount: number;
  status: string;
  createdAt: Date;
}
 
@Injectable()
export class OrdersService {
  private orders: Map<string, Order> = new Map();
 
  constructor(private readonly rabbitmq: RabbitMQService) {
    this.setupConsumers();
  }
 
  private setupConsumers() {
    // Listen untuk order creation
    this.rabbitmq.consumeMessage('order.created', async (msg) => {
      const order = JSON.parse(msg.content.toString());
      console.log('Processing order:', order.id);
      
      // Store order
      this.orders.set(order.id, { ...order, status: 'pending' });
 
      // Publish ke payment service
      await this.rabbitmq.publishMessage('payments', 'process', {
        orderId: order.id,
        amount: order.totalAmount,
        userId: order.userId,
      });
 
      // Publish ke inventory service
      await this.rabbitmq.publishMessage('orders', 'order.created', {
        orderId: order.id,
        items: order.items,
      });
    });
  }
 
  async createOrder(userId: string, items: any[]): Promise<Order> {
    const orderId = `order_${Date.now()}`;
    const totalAmount = items.reduce((sum, item) => sum + item.price * item.quantity, 0);
 
    const order: Order = {
      id: orderId,
      userId,
      items,
      totalAmount,
      status: 'created',
      createdAt: new Date(),
    };
 
    // Publish order created event
    await this.rabbitmq.publishMessage('orders', 'order.created', order);
 
    return order;
  }
 
  async updateOrderStatus(orderId: string, status: string): Promise<void> {
    const order = this.orders.get(orderId);
    if (order) {
      order.status = status;
    }
  }
 
  getOrder(orderId: string): Order | undefined {
    return this.orders.get(orderId);
  }
 
  getAllOrders(): Order[] {
    return Array.from(this.orders.values());
  }
}

Langkah 4: Payment Service

src/payments/payments.service.ts
import { Injectable } from '@nestjs/common';
import { RabbitMQService } from '../rabbitmq/rabbitmq.service';
import { OrdersService } from '../orders/orders.service';
 
@Injectable()
export class PaymentsService {
  private retryCount: Map<string, number> = new Map();
  private readonly MAX_RETRIES = 3;
 
  constructor(
    private readonly rabbitmq: RabbitMQService,
    private readonly ordersService: OrdersService,
  ) {
    this.setupConsumers();
  }
 
  private setupConsumers() {
    this.rabbitmq.consumeMessage('payment.process', async (msg) => {
      const payment = JSON.parse(msg.content.toString());
      console.log('Processing payment untuk order:', payment.orderId);
 
      try {
        // Simulate payment processing
        const success = await this.processPayment(payment);
 
        if (success) {
          // Payment successful
          await this.ordersService.updateOrderStatus(payment.orderId, 'paid');
 
          // Publish success event
          await this.rabbitmq.publishMessage('notifications', '*', {
            type: 'payment_success',
            orderId: payment.orderId,
            userId: payment.userId,
            amount: payment.amount,
          });
 
          console.log('Payment successful untuk order:', payment.orderId);
        } else {
          throw new Error('Payment processing failed');
        }
      } catch (error) {
        console.error('Payment error:', error);
 
        // Retry logic
        const retries = this.retryCount.get(payment.orderId) || 0;
        if (retries < this.MAX_RETRIES) {
          this.retryCount.set(payment.orderId, retries + 1);
          
          // Republish dengan delay
          setTimeout(() => {
            this.rabbitmq.publishMessage('payments', 'process', payment);
          }, 5000 * (retries + 1)); // Exponential backoff
        } else {
          // Max retries exceeded - akan go ke DLX
          throw error;
        }
      }
    });
  }
 
  private async processPayment(payment: any): Promise<boolean> {
    // Simulate payment gateway call
    return new Promise((resolve) => {
      setTimeout(() => {
        // 90% success rate untuk demo
        resolve(Math.random() > 0.1);
      }, 1000);
    });
  }
}

Langkah 5: Inventory Service

src/inventory/inventory.service.ts
import { Injectable } from '@nestjs/common';
import { RabbitMQService } from '../rabbitmq/rabbitmq.service';
import { OrdersService } from '../orders/orders.service';
 
@Injectable()
export class InventoryService {
  private inventory: Map<string, number> = new Map([
    ['product1', 100],
    ['product2', 50],
    ['product3', 200],
  ]);
 
  constructor(
    private readonly rabbitmq: RabbitMQService,
    private readonly ordersService: OrdersService,
  ) {
    this.setupConsumers();
  }
 
  private setupConsumers() {
    this.rabbitmq.consumeMessage('inventory.update', async (msg) => {
      const event = JSON.parse(msg.content.toString());
      console.log('Updating inventory untuk order:', event.orderId);
 
      try {
        // Check dan reserve inventory
        const canReserve = this.reserveInventory(event.items);
 
        if (canReserve) {
          await this.ordersService.updateOrderStatus(event.orderId, 'inventory_reserved');
 
          // Publish success event
          await this.rabbitmq.publishMessage('notifications', '*', {
            type: 'inventory_reserved',
            orderId: event.orderId,
            items: event.items,
          });
 
          console.log('Inventory reserved untuk order:', event.orderId);
        } else {
          throw new Error('Insufficient inventory');
        }
      } catch (error) {
        console.error('Inventory error:', error);
        
        // Publish failure event
        await this.rabbitmq.publishMessage('notifications', '*', {
          type: 'inventory_failed',
          orderId: event.orderId,
          reason: error.message,
        });
 
        throw error;
      }
    });
  }
 
  private reserveInventory(items: any[]): boolean {
    for (const item of items) {
      const current = this.inventory.get(item.productId) || 0;
      if (current < item.quantity) {
        return false;
      }
    }
 
    // Reserve inventory
    for (const item of items) {
      const current = this.inventory.get(item.productId) || 0;
      this.inventory.set(item.productId, current - item.quantity);
    }
 
    return true;
  }
 
  getInventory(): Record<string, number> {
    const result: Record<string, number> = {};
    this.inventory.forEach((value, key) => {
      result[key] = value;
    });
    return result;
  }
}

Langkah 6: Notification Service

src/notifications/notifications.service.ts
import { Injectable } from '@nestjs/common';
import { RabbitMQService } from '../rabbitmq/rabbitmq.service';
 
@Injectable()
export class NotificationsService {
  private notifications: any[] = [];
 
  constructor(private readonly rabbitmq: RabbitMQService) {
    this.setupConsumers();
  }
 
  private setupConsumers() {
    this.rabbitmq.consumeMessage('notification.send', async (msg) => {
      const notification = JSON.parse(msg.content.toString());
      console.log('Sending notification:', notification.type);
 
      // Simulate email/SMS sending
      this.notifications.push({
        ...notification,
        sentAt: new Date(),
      });
 
      console.log(`Notification sent: ${notification.type} untuk order ${notification.orderId}`);
    });
  }
 
  getNotifications(): any[] {
    return this.notifications;
  }
}

Langkah 7: Dead Letter Handler

src/dead-letter/dead-letter.service.ts
import { Injectable } from '@nestjs/common';
import { RabbitMQService } from '../rabbitmq/rabbitmq.service';
 
@Injectable()
export class DeadLetterService {
  private deadLetters: any[] = [];
 
  constructor(private readonly rabbitmq: RabbitMQService) {
    this.setupConsumers();
  }
 
  private setupConsumers() {
    this.rabbitmq.consumeMessage('dead_letter_queue', async (msg) => {
      const deadLetter = {
        content: JSON.parse(msg.content.toString()),
        headers: msg.properties.headers,
        timestamp: new Date(),
        retryCount: msg.properties.headers?.['x-death']?.[0]?.count || 0,
      };
 
      console.error('Message di dead letter queue:', deadLetter);
 
      // Store untuk manual review
      this.deadLetters.push(deadLetter);
 
      // Alert monitoring system
      console.error('ALERT: Dead letter received - manual intervention mungkin diperlukan');
    });
  }
 
  getDeadLetters(): any[] {
    return this.deadLetters;
  }
 
  getDeadLetterCount(): number {
    return this.deadLetters.length;
  }
}

Langkah 8: Orders Controller

src/orders/orders.controller.ts
import { Controller, Post, Get, Body, Param } from '@nestjs/common';
import { OrdersService } from './orders.service';
 
@Controller('orders')
export class OrdersController {
  constructor(private readonly ordersService: OrdersService) {}
 
  @Post()
  async createOrder(
    @Body() createOrderDto: { userId: string; items: any[] },
  ) {
    const order = await this.ordersService.createOrder(
      createOrderDto.userId,
      createOrderDto.items,
    );
 
    return {
      message: 'Order created dan queued untuk processing',
      order,
    };
  }
 
  @Get()
  async getAllOrders() {
    return this.ordersService.getAllOrders();
  }
 
  @Get(':id')
  async getOrder(@Param('id') id: string) {
    const order = this.ordersService.getOrder(id);
    if (!order) {
      return { error: 'Order not found' };
    }
    return order;
  }
}

Langkah 9: Main Application Module

src/app.module.ts
import { Module } from '@nestjs/common';
import { RabbitMQModule } from './rabbitmq/rabbitmq.module';
import { RabbitMQService } from './rabbitmq/rabbitmq.service';
import { OrdersService } from './orders/orders.service';
import { OrdersController } from './orders/orders.controller';
import { PaymentsService } from './payments/payments.service';
import { InventoryService } from './inventory/inventory.service';
import { NotificationsService } from './notifications/notifications.service';
import { DeadLetterService } from './dead-letter/dead-letter.service';
 
@Module({
  imports: [RabbitMQModule],
  controllers: [OrdersController],
  providers: [
    RabbitMQService,
    OrdersService,
    PaymentsService,
    InventoryService,
    NotificationsService,
    DeadLetterService,
  ],
})
export class AppModule {}

Langkah 10: Docker Compose Setup

docker-compose.yml
version: '3.8'
 
services:
  rabbitmq:
    image: rabbitmq:3.12-management-alpine
    ports:
      - '5672:5672'
      - '15672:15672'
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
    healthcheck:
      test: rabbitmq-diagnostics -q ping
      interval: 30s
      timeout: 10s
      retries: 5
 
volumes:
  rabbitmq_data:

Langkah 11: Menjalankan Aplikasi

Start services
# Start RabbitMQ
docker-compose up -d
 
# Install dependencies
npm install
 
# Run application
npm run start:dev
 
# Access RabbitMQ Management UI
# http://localhost:15672 (guest/guest)

Langkah 12: Testing Sistem

Test endpoints
# Buat order
curl -X POST http://localhost:3000/orders \
  -H "Content-Type: application/json" \
  -d '{
    "userId": "user123",
    "items": [
      {"productId": "product1", "quantity": 2, "price": 29.99},
      {"productId": "product2", "quantity": 1, "price": 49.99}
    ]
  }'
 
# Response: {"message":"Order created dan queued untuk processing","order":{...}}
 
# Dapatkan semua orders
curl http://localhost:3000/orders
 
# Dapatkan specific order
curl http://localhost:3000/orders/order_1708600000000
 
# Monitor RabbitMQ
# Buka http://localhost:15672
# Username: guest
# Password: guest
# View queues, messages, dan connections

Kesalahan Umum & Pitfalls

1. Tidak Menggunakan Manual Acknowledgment

Auto-ack menghapus messages immediately, risking data loss.

ts
// ❌ Salah - auto-ack (default)
channel.consume(queue, (msg) => {
  // Jika process crash di sini, message hilang
  processMessage(msg);
});
 
// ✅ Benar - manual ack
channel.consume(queue, async (msg) => {
  try {
    await processMessage(msg);
    channel.ack(msg);
  } catch (error) {
    channel.nack(msg, false, false); // Send ke DLX
  }
});

2. Tidak Set Queue Durability

Non-durable queues hilang di broker restart.

ts
// ❌ Salah - queue hilang di restart
channel.assertQueue('orders', { durable: false });
 
// ✅ Benar - queue persist
channel.assertQueue('orders', { durable: true });

3. Mengabaikan Dead Letter Exchanges

Failed messages disappear tanpa DLX.

ts
// ❌ Salah - tidak ada DLX, failed messages hilang
channel.assertQueue('orders', { durable: true });
 
// ✅ Benar - DLX capture failed messages
channel.assertQueue('orders', {
  durable: true,
  arguments: {
    'x-dead-letter-exchange': 'dlx',
    'x-dead-letter-routing-key': 'order.dead-letter',
  },
});

4. Tidak Handle Connection Failures

RabbitMQ connections bisa fail. Implementasikan reconnection logic.

ts
// ✅ Proper error handling
connection.on('error', (err) => {
  console.error('Connection error:', err);
  setTimeout(() => reconnect(), 5000);
});
 
connection.on('close', () => {
  console.log('Connection closed');
  setTimeout(() => reconnect(), 5000);
});

5. Unbounded Prefetch

High prefetch dapat cause consumer overload.

ts
// ❌ Salah - consumer dapat 1000 messages sekaligus
channel.prefetch(1000);
 
// ✅ Benar - process satu pada satu waktu
channel.prefetch(1);
 
// Atau untuk parallel processing
channel.prefetch(10);

6. Tidak Handle Poison Pills

Messages yang selalu fail cause infinite loops.

ts
// ✅ Implementasikan max retry logic
const retryCount = msg.properties.headers?.['x-death']?.[0]?.count || 0;
 
if (retryCount > MAX_RETRIES) {
  // Send ke DLX daripada requeue
  channel.nack(msg, false, false);
} else {
  // Retry
  channel.nack(msg, false, true);
}

Best Practices

1. Gunakan Topic Exchanges untuk Event Distribution

Topic exchanges provide flexible routing dengan pattern matching.

ts
// Publish events dengan hierarchical routing keys
await rabbitmq.publishMessage('events', 'order.created', orderData);
await rabbitmq.publishMessage('events', 'order.shipped', orderData);
await rabbitmq.publishMessage('events', 'payment.success', paymentData);
 
// Consumers subscribe ke patterns
await channel.bindQueue('queue1', 'events', 'order.*');
await channel.bindQueue('queue2', 'events', '*.success');

2. Implementasikan Idempotent Consumers

Messages mungkin delivered multiple times. Ensure processing adalah idempotent.

ts
// ✅ Idempotent processing
async function processOrder(order) {
  // Check jika sudah diproses
  const existing = await db.orders.findUnique({ where: { id: order.id } });
  if (existing) {
    return; // Sudah diproses
  }
 
  // Process order
  await db.orders.create(order);
}

3. Gunakan Correlation IDs untuk Tracing

Track messages di services untuk debugging.

ts
// Publish dengan correlation ID
const correlationId = uuid();
await rabbitmq.publishMessage('orders', 'order.created', order, {
  headers: { 'x-correlation-id': correlationId },
});
 
// Log dengan correlation ID
logger.info('Processing order', { correlationId, orderId: order.id });

4. Implementasikan Circuit Breaker Pattern

Prevent cascading failures ketika services down.

ts
// ✅ Circuit breaker
const breaker = new CircuitBreaker(async (order) => {
  return await paymentService.process(order);
}, {
  threshold: 5, // Fail setelah 5 errors
  timeout: 60000, // Reset setelah 60 detik
});
 
try {
  await breaker.fire(order);
} catch (error) {
  if (error.code === 'CIRCUIT_BREAKER_OPEN') {
    // Service down, queue untuk retry
    await rabbitmq.publishMessage('payments', 'process', order);
  }
}

5. Monitor Queue Depth

Alert ketika queues grow unexpectedly.

ts
// Monitor queue depth
setInterval(async () => {
  const queue = await channel.checkQueue('orders');
  if (queue.messageCount > 1000) {
    alert('Order queue depth critical');
  }
}, 30000);

6. Gunakan Separate Exchanges untuk Different Concerns

Organisir exchanges by domain.

ts
// Domain-specific exchanges
await channel.assertExchange('orders', 'topic', { durable: true });
await channel.assertExchange('payments', 'direct', { durable: true });
await channel.assertExchange('notifications', 'fanout', { durable: true });
await channel.assertExchange('dlx', 'direct', { durable: true });

7. Implementasikan Graceful Shutdown

Close connections properly di application shutdown.

ts
// ✅ Graceful shutdown
process.on('SIGTERM', async () => {
  console.log('Shutting down gracefully');
  await channel.close();
  await connection.close();
  process.exit(0);
});

Kapan TIDAK Menggunakan RabbitMQ

1. Simple Request-Response

Untuk simple synchronous operations, HTTP lebih sederhana.

ts
// ✅ Gunakan HTTP untuk simple requests
GET /users/123
 
// ❌ Jangan gunakan RabbitMQ untuk simple requests
// Adds unnecessary complexity

2. Real-Time Bidirectional Communication

Gunakan WebSockets sebagai gantinya.

ts
// ✅ Gunakan WebSockets untuk real-time chat
// ❌ Jangan gunakan RabbitMQ untuk chat
// Message ordering dan latency issues

3. High-Frequency Trading

Gunakan specialized systems seperti Kafka untuk ultra-low latency.

ts
// ✅ Gunakan Kafka untuk high-frequency trading
// ❌ RabbitMQ memiliki higher latency

4. Simple Caching

Gunakan Redis sebagai gantinya.

ts
// ✅ Gunakan Redis untuk caching
// ❌ Jangan gunakan RabbitMQ untuk caching
// Wrong tool untuk job

Kesimpulan

RabbitMQ adalah powerful tool untuk membangun decoupled, resilient systems. Memahami core concepts—exchanges, queues, routing keys, dan dead letter handling—mengaktifkan Anda untuk design systems yang scale dan recover gracefully.

Contoh order processing mendemonstrasikan production patterns:

  • Topic exchanges untuk event distribution
  • Dead letter exchanges untuk failed message handling
  • Manual acknowledgment untuk reliability
  • Retry logic dengan exponential backoff
  • Graceful error handling

Key takeaways:

  1. Gunakan RabbitMQ untuk asynchronous, decoupled communication
  2. Selalu gunakan durable queues dan persistent messages untuk critical data
  3. Implementasikan manual acknowledgment dan dead letter handling
  4. Monitor queue depth dan implementasikan alerting
  5. Design idempotent consumers
  6. Gunakan correlation IDs untuk tracing

Mulai dengan simple patterns seperti direct exchanges dan single consumers. Seiring complexity tumbuh, explore topic exchanges, consumer groups, dan advanced routing. Fleksibilitas RabbitMQ membuatnya suitable untuk systems dari simple task queues hingga complex event-driven architectures.

Langkah selanjutnya:

  1. Setup RabbitMQ locally dengan Docker
  2. Bangun simple task queue
  3. Tambahkan dead letter handling
  4. Implementasikan retry logic
  5. Monitor dan optimize performance

RabbitMQ mentransformasi bagaimana Anda think tentang system design—dari tightly coupled synchronous calls ke loosely coupled asynchronous events. Master it, dan Anda akan bangun systems yang scale.


Related Posts