Kuasai Kafka dari konsep inti hingga produksi. Pelajari topics, partitions, consumer groups, exactly-once semantics, dan bangun platform analytics real-time lengkap dengan NestJS.

Aplikasi modern yang data-driven menghasilkan massive volumes dari events: user clicks, transactions, sensor readings, system logs. Processing data ini secara real-time adalah critical untuk competitive advantage. Netflix menggunakan Kafka untuk process lebih dari 1 trillion events setiap hari. Uber track jutaan rides secara real-time. LinkedIn power seluruh data infrastructure dengan Kafka.
Tidak seperti traditional message brokers seperti RabbitMQ yang focus pada message delivery, Kafka adalah distributed event streaming platform yang didesain untuk high-throughput, low-latency data pipelines. Ini bukan hanya queue—ini adalah commit log yang store events permanently, enabling replay, time-travel analysis, dan multiple independent consumers.
Dalam artikel ini, kita akan mengeksplorasi arsitektur Kafka, memahami setiap core concept dari topics hingga exactly-once semantics, dan membangun production-ready real-time analytics platform dengan NestJS yang mendemonstrasikan event streaming at scale.
RabbitMQ dan similar brokers bekerja baik untuk task queues tetapi memiliki limitations:
Limited Throughput: Dioptimalkan untuk reliability, bukan speed. Typical throughput: 50K-100K messages/second.
Message Deletion: Sekali dikonsumsi, messages dihapus. Tidak bisa replay atau analyze historical data.
Single Consumer Model: Setiap message go ke satu consumer. Broadcasting memerlukan fanout exchanges.
Scaling Challenges: Menambahkan consumers tidak improve throughput. Rebalancing complex.
Kafka dibangun untuk different requirements:
Massive Throughput: Jutaan messages per second di commodity hardware.
Permanent Storage: Events stored di disk indefinitely. Replay anytime.
Multiple Consumers: Same event dikonsumsi oleh multiple independent services.
Horizontal Scaling: Tambahkan partitions dan consumers untuk scale linearly.
Exactly-Once Semantics: Guarantee tidak ada data loss atau duplication.
Broker: Kafka server yang store dan serve events. Cluster dari brokers provide redundancy.
Topic: Named stream dari events. Similar dengan table di database.
Partition: Ordered, immutable sequence dari events dalam topic. Enable parallelism.
Producer: Application yang publish events ke topics.
Consumer: Application yang read events dari topics.
Consumer Group: Set dari consumers yang work together untuk consume topic.
Offset: Position di partition. Consumers track offsets untuk tahu apa yang sudah dibaca.
Producer → Topic (Partition 0, 1, 2, ...) → Consumer Group (Consumer A, B, C, ...)Kafka replicate setiap partition di multiple brokers:
Leader: Broker yang handle reads/writes untuk partition.
Replicas: Copies di brokers lain untuk fault tolerance.
In-Sync Replicas (ISR): Replicas yang caught up dengan leader.
Jika leader fail, ISR dipromosikan ke leader. Tidak ada data loss.
Topics adalah named event streams. Think dari mereka sebagai tables di database.
Topic Characteristics:
Use Cases:
Contoh Topic Structure:
# E-commerce events
user-events
order-events
payment-events
inventory-events
# Analytics
page-views
user-clicks
search-queries
# System
application-logs
error-events
performance-metricsPartitions enable parallelism dan scalability. Setiap partition adalah ordered log.
Partition Key: Determine partition mana event go ke.
# Events dengan same key go ke same partition
Key: user_123 → Partition 0
Key: user_456 → Partition 1
Key: user_789 → Partition 0
# Guarantee ordering per key
# Semua user_123 events dalam orderBenefits:
Partition Count Considerations:
Throughput needed: 1M events/second
Broker capacity: 100K events/second per partition
Partitions needed: 1M / 100K = 10 partitions
Consumers: 5
Partitions per consumer: 10 / 5 = 2Consumer groups enable multiple independent consumers untuk read same topic.
Bagaimana Ini Bekerja:
Topic: user-events (3 partitions)
Consumer Group A:
Consumer A1 → Partition 0
Consumer A2 → Partition 1
Consumer A3 → Partition 2
Consumer Group B:
Consumer B1 → Partition 0
Consumer B2 → Partition 1
Consumer B3 → Partition 2Setiap consumer group maintain independent offsets. Same event dikonsumsi oleh multiple groups.
Use Cases:
Rebalancing:
Ketika consumer join/leave group, partitions reassigned:
Before: Consumer A (P0, P1), Consumer B (P2)
Add Consumer C:
After: Consumer A (P0), Consumer B (P1), Consumer C (P2)Offsets track consumer progress. Critical untuk exactly-once semantics.
Offset Storage:
# Kafka store offsets di __consumer_offsets topic
# Consumer Group: analytics-group
# Topic: user-events
# Partition 0: offset 1000
# Partition 1: offset 950
# Partition 2: offset 1050Offset Strategies:
Earliest: Start dari beginning dari topic
# Replay semua historical events
auto.offset.reset=earliestLatest: Start dari newest events
# Hanya process new events
auto.offset.reset=latestSpecific Offset: Start dari known position
# Resume dari checkpoint
consumer.seek(partition, offset)Kafka guarantee exactly-once processing: tidak ada data loss, tidak ada duplication.
Bagaimana Ini Bekerja:
Configuration:
# Producer
enable.idempotence=true
acks=all
# Consumer
isolation.level=read_committed
enable.auto.commit=falseProcessing Guarantee:
At-Most-Once: Message processed 0 atau 1 times (data loss possible)
At-Least-Once: Message processed 1+ times (duplication possible)
Exactly-Once: Message processed exactly 1 time (tidak ada loss, tidak ada duplication)Kafka retain events berdasarkan configurable policies.
Time-Based Retention:
# Keep events untuk 7 hari
retention.ms=604800000
# Keep events untuk 30 hari
retention.ms=2592000000Size-Based Retention:
# Keep last 1GB dari events
retention.bytes=1073741824Compacted Topics:
# Keep hanya latest value per key
cleanup.policy=compact
# Use case: State snapshots
# Key: user_123
# Values: {name: "John"} → {name: "John", age: 30} → {name: "John", age: 31}
# Compacted: {name: "John", age: 31}Use Cases:
Kafka compress events untuk reduce storage dan network usage.
Compression Algorithms:
# Tidak ada compression
compression.type=none
# Snappy (fast, moderate compression)
compression.type=snappy
# LZ4 (very fast, low compression)
compression.type=lz4
# GZIP (slow, high compression)
compression.type=gzip
# Zstandard (balanced)
compression.type=zstdCompression Ratio:
JSON events: 500 bytes
Snappy: 150 bytes (70% reduction)
GZIP: 80 bytes (84% reduction)Kafka transactions ensure atomic writes di partitions.
Use Case: Exactly-once processing dengan side effects.
# Read dari input topic
# Process event
# Write ke output topic
# Update database
# Semua succeed atau semua failConfiguration:
transactional.id=unique-id
enable.idempotence=trueKafka tidak enforce schemas. Schema Registry add schema management.
Benefits:
Schema Formats:
Critical metrics untuk Kafka operations:
Producer Metrics:
# Records sent per second
kafka.producer:type=producer-metrics,client-id=*,metric-name=record-send-rate
# Average latency
kafka.producer:type=producer-metrics,client-id=*,metric-name=record-send-total-time-ms
# Failed sends
kafka.producer:type=producer-metrics,client-id=*,metric-name=record-error-rateConsumer Metrics:
# Records consumed per second
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*,metric-name=records-consumed-rate
# Consumer lag (how far behind)
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,metric-name=commit-latency-avg
# Rebalance latency
kafka.consumer:type=consumer-coordinator-metrics,client-id=*,metric-name=assigned-partitionsSekarang mari kita bangun production-ready real-time analytics platform yang mendemonstrasikan Kafka patterns. Sistem handle:
npm i -g @nestjs/cli
nest new kafka-analytics-platform
cd kafka-analytics-platform
npm install @nestjs/microservices kafkajs class-validator class-transformer
npm install @nestjs/common @nestjs/coreimport { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
@Module({
imports: [
ClientsModule.register([
{
name: 'KAFKA_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'analytics-app',
brokers: [process.env.KAFKA_BROKER || 'localhost:9092'],
},
consumer: {
groupId: 'analytics-consumer-group',
allowAutoTopicCreation: true,
},
},
},
]),
],
exports: [ClientsModule],
})
export class KafkaModule {}import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { Kafka, Producer, Consumer, Admin } from 'kafkajs';
@Injectable()
export class KafkaService implements OnModuleInit, OnModuleDestroy {
private kafka: Kafka;
private producer: Producer;
private consumer: Consumer;
private admin: Admin;
async onModuleInit() {
this.kafka = new Kafka({
clientId: 'analytics-app',
brokers: [process.env.KAFKA_BROKER || 'localhost:9092'],
retry: {
initialRetryTime: 100,
retries: 8,
},
});
this.producer = this.kafka.producer({
idempotent: true,
maxInFlightRequests: 5,
compression: 1, // Gzip
});
this.admin = this.kafka.admin();
this.consumer = this.kafka.consumer({
groupId: 'analytics-consumer-group',
sessionTimeout: 30000,
rebalanceTimeout: 60000,
});
await this.producer.connect();
await this.admin.connect();
await this.consumer.connect();
await this.setupTopics();
console.log('Kafka connected');
}
private async setupTopics() {
const topics = [
{
name: 'user-events',
numPartitions: 3,
replicationFactor: 1,
configEntries: [
{ name: 'retention.ms', value: '604800000' }, // 7 hari
{ name: 'compression.type', value: 'gzip' },
],
},
{
name: 'page-views',
numPartitions: 3,
replicationFactor: 1,
configEntries: [
{ name: 'retention.ms', value: '86400000' }, // 1 hari
],
},
{
name: 'analytics-events',
numPartitions: 3,
replicationFactor: 1,
configEntries: [
{ name: 'cleanup.policy', value: 'compact' },
],
},
{
name: 'error-events',
numPartitions: 1,
replicationFactor: 1,
},
];
try {
await this.admin.createTopics({
topics,
validateOnly: false,
timeout: 30000,
});
console.log('Topics created successfully');
} catch (error) {
if (error.message.includes('already exists')) {
console.log('Topics already exist');
} else {
throw error;
}
}
}
async publishEvent(topic: string, event: any, key?: string) {
await this.producer.send({
topic,
messages: [
{
key: key || null,
value: JSON.stringify(event),
timestamp: Date.now().toString(),
headers: {
'correlation-id': this.generateCorrelationId(),
'source': 'analytics-app',
},
},
],
});
}
async publishBatch(topic: string, events: any[]) {
await this.producer.send({
topic,
messages: events.map((event) => ({
key: event.key || null,
value: JSON.stringify(event),
timestamp: Date.now().toString(),
})),
});
}
async subscribe(
topic: string,
callback: (message: any) => Promise<void>,
fromBeginning: boolean = false,
) {
await this.consumer.subscribe({
topic,
fromBeginning,
});
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const event = JSON.parse(message.value.toString());
await callback(event);
} catch (error) {
console.error(`Error processing message dari ${topic}:`, error);
throw error;
}
},
});
}
async subscribeToMultiple(
topics: string[],
callback: (message: any, topic: string) => Promise<void>,
) {
await this.consumer.subscribe({
topics,
fromBeginning: false,
});
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const event = JSON.parse(message.value.toString());
await callback(event, topic);
} catch (error) {
console.error(`Error processing message dari ${topic}:`, error);
throw error;
}
},
});
}
async getConsumerGroupOffsets(groupId: string) {
const offsets = await this.admin.fetchOffsets({ groupId });
return offsets;
}
async resetConsumerGroupOffset(groupId: string, topic: string, partition: number, offset: number) {
await this.admin.setOffsets({
groupId,
topic,
partitions: [{ partition, offset: offset.toString() }],
});
}
private generateCorrelationId(): string {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
async onModuleDestroy() {
await this.producer.disconnect();
await this.consumer.disconnect();
await this.admin.disconnect();
}
}import { Injectable } from '@nestjs/common';
import { KafkaService } from '../kafka/kafka.service';
interface UserEvent {
userId: string;
eventType: 'login' | 'logout' | 'purchase' | 'view';
timestamp: Date;
metadata?: Record<string, any>;
}
interface PageView {
userId: string;
url: string;
referrer?: string;
timestamp: Date;
duration: number;
}
@Injectable()
export class EventsService {
private eventStats = {
totalEvents: 0,
eventsByType: {} as Record<string, number>,
eventsByUser: {} as Record<string, number>,
};
constructor(private readonly kafka: KafkaService) {
this.setupConsumers();
}
private setupConsumers() {
// Consume user events untuk real-time analytics
this.kafka.subscribe('user-events', async (event: UserEvent) => {
this.updateStats(event);
console.log('User event processed:', event.eventType, event.userId);
});
// Consume page views untuk analytics
this.kafka.subscribe('page-views', async (pageView: PageView) => {
console.log('Page view processed:', pageView.url, pageView.userId);
});
}
private updateStats(event: UserEvent) {
this.eventStats.totalEvents++;
this.eventStats.eventsByType[event.eventType] =
(this.eventStats.eventsByType[event.eventType] || 0) + 1;
this.eventStats.eventsByUser[event.userId] =
(this.eventStats.eventsByUser[event.userId] || 0) + 1;
}
async publishUserEvent(event: UserEvent): Promise<void> {
await this.kafka.publishEvent('user-events', event, event.userId);
}
async publishPageView(pageView: PageView): Promise<void> {
await this.kafka.publishEvent('page-views', pageView, pageView.userId);
}
async publishBatchEvents(events: UserEvent[]): Promise<void> {
await this.kafka.publishBatch(
'user-events',
events.map((e) => ({ ...e, key: e.userId })),
);
}
getStats() {
return this.eventStats;
}
resetStats() {
this.eventStats = {
totalEvents: 0,
eventsByType: {},
eventsByUser: {},
};
}
}import { Injectable } from '@nestjs/common';
import { KafkaService } from '../kafka/kafka.service';
interface AnalyticsData {
timestamp: Date;
activeUsers: Set<string>;
eventCounts: Record<string, number>;
topPages: Map<string, number>;
userSessions: Map<string, { loginTime: Date; lastActivity: Date }>;
}
@Injectable()
export class AnalyticsService {
private analytics: AnalyticsData = {
timestamp: new Date(),
activeUsers: new Set(),
eventCounts: {},
topPages: new Map(),
userSessions: new Map(),
};
constructor(private readonly kafka: KafkaService) {
this.setupAnalyticsConsumers();
}
private setupAnalyticsConsumers() {
// Real-time analytics consumer group
this.kafka.subscribe('user-events', async (event: any) => {
this.updateAnalytics(event);
});
this.kafka.subscribe('page-views', async (pageView: any) => {
this.updatePageAnalytics(pageView);
});
}
private updateAnalytics(event: any) {
this.analytics.activeUsers.add(event.userId);
this.analytics.eventCounts[event.eventType] =
(this.analytics.eventCounts[event.eventType] || 0) + 1;
if (event.eventType === 'login') {
this.analytics.userSessions.set(event.userId, {
loginTime: new Date(event.timestamp),
lastActivity: new Date(event.timestamp),
});
} else if (event.eventType === 'logout') {
this.analytics.userSessions.delete(event.userId);
} else {
const session = this.analytics.userSessions.get(event.userId);
if (session) {
session.lastActivity = new Date(event.timestamp);
}
}
}
private updatePageAnalytics(pageView: any) {
const count = this.analytics.topPages.get(pageView.url) || 0;
this.analytics.topPages.set(pageView.url, count + 1);
}
getActiveUsers(): number {
return this.analytics.activeUsers.size;
}
getEventCounts(): Record<string, number> {
return this.analytics.eventCounts;
}
getTopPages(limit: number = 10): Array<[string, number]> {
return Array.from(this.analytics.topPages.entries())
.sort((a, b) => b[1] - a[1])
.slice(0, limit);
}
getUserSessions(): number {
return this.analytics.userSessions.size;
}
getAnalyticsSummary() {
return {
timestamp: this.analytics.timestamp,
activeUsers: this.analytics.activeUsers.size,
activeSessions: this.analytics.userSessions.size,
eventCounts: this.analytics.eventCounts,
topPages: this.getTopPages(5),
};
}
}import { Controller, Post, Get, Body } from '@nestjs/common';
import { EventsService } from './events.service';
import { AnalyticsService } from '../analytics/analytics.service';
@Controller('events')
export class EventsController {
constructor(
private readonly eventsService: EventsService,
private readonly analyticsService: AnalyticsService,
) {}
@Post('user-event')
async publishUserEvent(@Body() event: any) {
await this.eventsService.publishUserEvent({
userId: event.userId,
eventType: event.eventType,
timestamp: new Date(),
metadata: event.metadata,
});
return { message: 'User event published', event };
}
@Post('page-view')
async publishPageView(@Body() pageView: any) {
await this.eventsService.publishPageView({
userId: pageView.userId,
url: pageView.url,
referrer: pageView.referrer,
timestamp: new Date(),
duration: pageView.duration || 0,
});
return { message: 'Page view published', pageView };
}
@Post('batch-events')
async publishBatchEvents(@Body() events: any[]) {
await this.eventsService.publishBatchEvents(
events.map((e) => ({
userId: e.userId,
eventType: e.eventType,
timestamp: new Date(),
metadata: e.metadata,
})),
);
return { message: `${events.length} events published` };
}
@Get('stats')
getStats() {
return this.eventsService.getStats();
}
@Get('analytics')
getAnalytics() {
return this.analyticsService.getAnalyticsSummary();
}
@Get('analytics/active-users')
getActiveUsers() {
return {
activeUsers: this.analyticsService.getActiveUsers(),
};
}
@Get('analytics/top-pages')
getTopPages() {
return {
topPages: this.analyticsService.getTopPages(10),
};
}
@Get('analytics/event-counts')
getEventCounts() {
return {
eventCounts: this.analyticsService.getEventCounts(),
};
}
}import { Module } from '@nestjs/common';
import { KafkaModule } from './kafka/kafka.module';
import { KafkaService } from './kafka/kafka.service';
import { EventsService } from './events/events.service';
import { EventsController } from './events/events.controller';
import { AnalyticsService } from './analytics/analytics.service';
@Module({
imports: [KafkaModule],
controllers: [EventsController],
providers: [KafkaService, EventsService, AnalyticsService],
})
export class AppModule {}version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- '2181:2181'
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- '9092:9092'
- '29092:29092'
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
volumes:
- kafka_data:/var/lib/kafka/data
kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- '8080:8080'
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
volumes:
kafka_data:# Start Kafka dan Zookeeper
docker-compose up -d
# Install dependencies
npm install
# Run application
npm run start:dev
# Access Kafka UI
# http://localhost:8080# Publish user event
curl -X POST http://localhost:3000/events/user-event \
-H "Content-Type: application/json" \
-d '{
"userId": "user123",
"eventType": "login",
"metadata": {"ip": "192.168.1.1"}
}'
# Publish page view
curl -X POST http://localhost:3000/events/page-view \
-H "Content-Type: application/json" \
-d '{
"userId": "user123",
"url": "/products",
"referrer": "/home",
"duration": 5000
}'
# Publish batch events
curl -X POST http://localhost:3000/events/batch-events \
-H "Content-Type: application/json" \
-d '[
{"userId": "user123", "eventType": "purchase"},
{"userId": "user456", "eventType": "login"},
{"userId": "user789", "eventType": "view"}
]'
# Get event stats
curl http://localhost:3000/events/stats
# Get analytics summary
curl http://localhost:3000/events/analytics
# Get active users
curl http://localhost:3000/events/analytics/active-users
# Get top pages
curl http://localhost:3000/events/analytics/top-pages
# Get event counts
curl http://localhost:3000/events/analytics/event-countsWrong partition count limit throughput dan scalability.
// ❌ Salah - single partition bottleneck
const topic = 'events';
numPartitions: 1;
// ✅ Benar - partition berdasarkan throughput needs
const topic = 'events';
numPartitions: 10; // 1M events/sec ÷ 100K per partitionHigh consumer lag indicate processing issues.
// ❌ Salah - tidak ada lag monitoring
// Consumer fall behind, data pile up
// ✅ Benar - monitor dan alert di lag
const lag = currentOffset - committedOffset;
if (lag > 10000) {
alert('Consumer lag critical');
}Incorrect offset management cause message loss atau duplication.
// ❌ Salah - auto-commit (risky)
enable.auto.commit=true;
auto.commit.interval.ms=5000;
// ✅ Benar - manual commit setelah processing
await processMessage(message);
await consumer.commitOffsets([{
topic,
partition,
offset: (parseInt(message.offset) + 1).toString(),
}]);Rebalancing tanpa proper handling cause message loss.
// ❌ Salah - tidak ada rebalance handling
await consumer.subscribe({ topic });
// ✅ Benar - handle rebalance events
await consumer.subscribe({
topic,
fromBeginning: false,
});
consumer.on('consumer.connect', () => {
console.log('Consumer connected');
});
consumer.on('consumer.disconnect', () => {
console.log('Consumer disconnected');
});Incorrect compression waste CPU atau storage.
// ❌ Salah - tidak ada compression untuk high-volume data
compression.type=none;
// ✅ Benar - pilih berdasarkan trade-offs
// High throughput: compression.type=lz4
// Storage optimization: compression.type=gzip
// Balanced: compression.type=snappyUnhandled errors cause silent failures.
// ❌ Salah - errors ignored
await consumer.run({
eachMessage: async ({ message }) => {
processMessage(message); // Error silently fail
},
});
// ✅ Benar - proper error handling
await consumer.run({
eachMessage: async ({ message }) => {
try {
await processMessage(message);
} catch (error) {
console.error('Processing error:', error);
// Send ke error topic atau DLQ
await kafka.publishEvent('error-events', {
originalMessage: message,
error: error.message,
});
}
},
});Organisir topics by business domain untuk clarity.
// ✅ Domain-organized topics
// User domain
user-events
user-registrations
user-deletions
// Order domain
order-events
order-payments
order-shipments
// Analytics domain
analytics-events
analytics-aggregationsPartition keys determine ordering dan distribution.
// ✅ Good partition key - ensure ordering per user
await kafka.publishEvent('user-events', event, event.userId);
// ❌ Bad partition key - random distribution
await kafka.publishEvent('user-events', event, Math.random());
// ✅ Good partition key - ensure ordering per order
await kafka.publishEvent('order-events', event, event.orderId);Handle duplicate messages gracefully.
// ✅ Idempotent processing
async function processEvent(event) {
// Check jika sudah diproses
const existing = await db.events.findUnique({
where: { messageId: event.messageId },
});
if (existing) {
return; // Sudah diproses
}
// Process event
await db.events.create(event);
}Track seberapa jauh behind consumers.
// ✅ Monitor lag
setInterval(async () => {
const offsets = await kafka.getConsumerGroupOffsets('analytics-group');
offsets.forEach((offset) => {
const lag = offset.high - offset.offset;
console.log(`Topic: ${offset.topic}, Partition: ${offset.partition}, Lag: ${lag}`);
if (lag > 10000) {
alert('High consumer lag detected');
}
});
}, 60000);Multiple consumers process partitions secara parallel.
// ✅ Scalable consumer group
const consumer = kafka.consumer({
groupId: 'analytics-group',
sessionTimeout: 30000,
});
// Tambahkan lebih banyak consumers ke same group
// Partitions automatically rebalanced
// Setiap consumer process subset dari partitionsPrevent cascading failures ketika downstream services fail.
// ✅ Circuit breaker pattern
const breaker = new CircuitBreaker(
async (event) => {
return await analyticsService.process(event);
},
{
threshold: 5,
timeout: 60000,
},
);
try {
await breaker.fire(event);
} catch (error) {
if (error.code === 'CIRCUIT_BREAKER_OPEN') {
// Service down, send ke retry topic
await kafka.publishEvent('retry-events', event);
}
}Enforce schema compatibility di producers/consumers.
// ✅ Schema validation
const schema = {
type: 'record',
name: 'UserEvent',
fields: [
{ name: 'userId', type: 'string' },
{ name: 'eventType', type: 'string' },
{ name: 'timestamp', type: 'long' },
],
};
// Validate sebelum publishing
validateSchema(event, schema);
await kafka.publishEvent('user-events', event);Properly close connections di shutdown.
// ✅ Graceful shutdown
process.on('SIGTERM', async () => {
console.log('Shutting down gracefully');
await consumer.disconnect();
await producer.disconnect();
process.exit(0);
});| Feature | Kafka | RabbitMQ | Redis |
|---|---|---|---|
| Throughput | Jutaan/sec | 50K-100K/sec | 1M+/sec |
| Persistence | Permanent | Optional | Optional |
| Replay | Ya | Tidak | Tidak |
| Ordering | Per partition | Per queue | N/A |
| Consumer Groups | Ya | Tidak | Tidak |
| Latency | Low | Very low | Ultra-low |
| Use Case | Event streaming | Task queues | Caching |
Pilih Kafka ketika:
Pilih RabbitMQ ketika:
Pilih Redis ketika:
Kafka adalah powerful event streaming platform yang didesain untuk modern data-driven applications. Memahami core concepts—topics, partitions, consumer groups, dan exactly-once semantics—mengaktifkan Anda untuk build scalable, reliable systems.
Contoh analytics platform mendemonstrasikan production patterns:
Key takeaways:
Mulai dengan simple use cases seperti event ingestion. Seiring complexity tumbuh, explore advanced patterns seperti exactly-once semantics, transactions, dan stream processing. Fleksibilitas Kafka membuatnya suitable untuk systems dari simple event logs hingga complex real-time analytics platforms.
Langkah selanjutnya:
Kafka mentransformasi bagaimana Anda think tentang data pipelines—dari batch processing ke real-time event streaming. Master it, dan Anda akan bangun systems yang scale ke billions dari events.