Messaging Patterns — Deep Dive
Level: Intermediate Pre-reading: 04 · Event-Driven Architecture · 04.02 · Kafka Deep Dive
Message Ordering Guarantees
In distributed systems, message order matters. But networks don't guarantee it.
Producer sends: [Event1, Event2, Event3]
Network chaos:
Event2 arrives first
Event3 arrives second
Event1 arrives last ← Out of order!
Consumer sees: [Event2, Event3, Event1] ← Wrong order
Kafka Ordering Guarantees
| Scope | Guarantee | How |
|---|---|---|
| Within one partition | Messages ordered | Kafka appends to single log; offset increases monotonically |
| Across partitions | No guarantee | Different partitions = different brokers; no global ordering |
| Consumer group | Each partition → exactly one consumer in group | One consumer per partition preserves order |
Ensuring Message Order
Solution: Use partition key to route related messages to same partition.
// Spring Kafka
@RestController
public class OrderController {
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
@PostMapping("/orders")
public Order createOrder(@RequestBody OrderRequest request) {
Order order = saveOrder(request);
// Send events; use orderId as partition key
// All events for order-123 go to partition 0 → preserved order
kafkaTemplate.send(
new ProducerRecord<>(
"order-events",
order.getId(), // Partition key
new OrderCreatedEvent(order)
)
);
return order;
}
}
Effect:
orderId=ORD-123 → Partition 0
orderId=ORD-124 → Partition 1
orderId=ORD-125 → Partition 0
Partition 0 events:
[ORD-123 created, ORD-123 paid, ORD-123 shipped] ← Ordered within partition
Idempotency (Exactly-Once Semantics)
Problem: At-least-once delivery means events might arrive twice.
Timeline:
Producer sends OrderConfirmed event
Consumer processes: updates database, sends email
Network hiccup; Kafka doesn't receive ack from consumer
Kafka retries: same event arrives again
Consumer processes AGAIN: duplicate email sent ✗
Solution: Idempotency Key
Every event has a unique ID. Consumer deduplicates using it.
public class OrderConfirmedEvent {
public String eventId; // Unique per event
public String orderId;
public LocalDateTime timestamp;
}
@RestController
public class OrderConsumer {
@Autowired
private ProcessedEventRepository processedEvents;
@KafkaListener(topics = "order-confirmed")
public void handleOrderConfirmed(OrderConfirmedEvent event) {
// Idempotency check
if (processedEvents.exists(event.eventId)) {
log.info("Already processed event {}", event.eventId);
return; // Idempotent; safe to skip
}
// Process
Order order = orderRepository.findById(event.orderId).orElseThrow();
sendConfirmationEmail(order);
// Record that we processed this event
processedEvents.save(new ProcessedEvent(event.eventId));
}
}
Idempotency Strategies
| Strategy | Pros | Cons |
|---|---|---|
| Event ID + dedup table | Guaranteed dedup | Requires database lookup per event |
| Event ID + cache (Redis) | Fast dedup; TTL cleanup | Risk of eviction; memory overhead |
| Natural idempotency (see below) | No dedup needed | Not all operations are naturally idempotent |
Natural Idempotency
Some operations are naturally idempotent: calling them multiple times has same effect as once.
// IDEMPOTENT: "Confirm order 123"
updateOrderStatus(order123, CONFIRMED); // Idempotent; status stays CONFIRMED
// NOT IDEMPOTENT: "Increment order count"
order.count++; // First call: count=5; second call: count=6 ✗
// IDEMPOTENT: "Set order count to 5"
order.count = 5; // First call: count=5; second call: count=5 ✓
Dead Letter Queue (DLQ)
Problem: Some messages fail processing forever (e.g., malformed JSON, missing dependency).
Timeline:
Consumer receives event with malformed data
Tries to deserialize → error
Retries 3x → still fails
Message buried in Kafka offset; never processed again ✗
Solution: Route Failures to DLQ
@RestController
public class OrderConsumer {
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
@KafkaListener(topics = "orders")
public void handleOrder(OrderEvent event) {
try {
// Process order
Order order = buildOrder(event);
repository.save(order);
} catch (Exception e) {
log.error("Failed to process order event {}", event, e);
// Send to DLQ for manual inspection
kafkaTemplate.send("orders.dlq", event);
}
}
}
DLQ Monitoring
@RestController
public class DLQMonitor {
@KafkaListener(topics = "orders.dlq")
public void monitorDLQ(OrderEvent event) {
// Alert ops team
alerting.send(
"Order DLQ received message: " + event.orderId +
"\nReason: " + extractErrorFromHeader()
);
// Quarantine in separate database for manual review
dlqRepository.save(new QuarantinedMessage(event));
}
}
Message Ordering + Idempotency Combined
Real-world scenario: need both ordering AND at-least-once delivery.
@Component
public class OrderEventProcessor {
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
@Autowired
private ProcessedEventRepository processedEvents;
@Autowired
private OrderRepository orderRepository;
@KafkaListener(topics = "order-events")
public void handleOrderEvent(OrderEvent event) {
// 1. Idempotency check
if (processedEvents.exists(event.eventId)) {
log.info("Skipping duplicate event {}", event.eventId);
return;
}
try {
// 2. Process
processEvent(event);
// 3. Record as processed (inside transaction with business logic)
processedEvents.save(new ProcessedEvent(event.eventId));
} catch (RetryableException e) {
// Transient failure; Kafka will retry
throw e;
} catch (Exception e) {
// Non-retryable; send to DLQ
kafkaTemplate.send("order-events.dlq", event);
processedEvents.save(new ProcessedEvent(event.eventId));
}
}
private void processEvent(OrderEvent event) {
if (event instanceof OrderCreatedEvent) {
Order order = new Order(event.orderId);
orderRepository.save(order);
} else if (event instanceof OrderPaidEvent) {
Order order = orderRepository.findById(event.orderId)
.orElseThrow(() -> new OrderNotFoundException());
order.status = OrderStatus.PAID;
orderRepository.save(order);
}
}
}
Key design:
- Partition key = orderId → preserves order per order
- Idempotency key (eventId) + dedup table → prevents duplicates
- DLQ for poison messages
- Transactional processing → dedup record saved with business data
Batch Processing vs Stream Processing
| Aspect | Batch | Stream |
|---|---|---|
| Trigger | Time-based (hourly) or size-based (1000 msgs) | Every message immediately |
| Latency | High (1+ hour) | Low (milliseconds) |
| Throughput | High (efficient batching) | Lower per-message |
| Example | Daily reconciliation | Real-time fraud detection |
Batch Example
@Component
public class OrderBatchProcessor {
@Scheduled(cron = "0 0 * * * *") // Every hour
public void processOrdersBatch() {
// Fetch 1000 orders from Kafka (batch)
List<OrderEvent> batch = kafkaConsumer.poll(
Duration.ofSeconds(10),
maxRecords = 1000
);
// Process in bulk
List<Order> orders = batch.stream()
.map(this::buildOrder)
.collect(Collectors.toList());
orderRepository.saveAll(orders); // Bulk insert
}
}
Stream Example
@Component
public class OrderStreamProcessor {
@KafkaListener(topics = "orders")
public void processOrderStream(OrderEvent event) {
// Process immediately
Order order = buildOrder(event);
orderRepository.save(order);
// Publish downstream
eventPublisher.publish(new OrderConfirmedEvent(order));
}
}
Consumer Group Coordination
Multiple consumers processing same topic; need to distribute partitions.
Topic: orders (4 partitions)
Scenario 1: 1 consumer
Consumer 1 → Partition 0, 1, 2, 3 (handles all)
Scenario 2: 2 consumers
Consumer 1 → Partition 0, 1
Consumer 2 → Partition 2, 3 (rebalancing occurs)
Scenario 3: Consumer 1 crashes
Rebalancing: partitions redistribute to healthy consumers
Rebalancing Tuning
@Bean
public ConsumerFactory<String, OrderEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service");
// Rebalancing behavior
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
return new DefaultKafkaConsumerFactory<>(props);
}
| Config | Default | Tuning |
|---|---|---|
| session.timeout.ms | 30s | Increase for slow processing (max 5min) |
| heartbeat.interval.ms | 3s | Set to 1/3 of session timeout |
| max.poll.interval.ms | 5min | Increase if processing one message takes > 5min |
How do I choose between Kafka topics vs RabbitMQ queues?
Kafka: Event streaming, replay, multiple consumers. RabbitMQ: Work queues, task distribution, simpler setup. For microservices events, Kafka. For job processing, RabbitMQ.
What happens if a consumer can't keep up with message rate?
Consumer lag increases (offset falls behind latest). Monitor with Kafka metrics consumer_lag. If persistent, scale up consumer instances or optimize processing logic.
Is exactly-once processing possible in Kafka?
Kafka guarantees at-least-once delivery out of the box. Exactly-once requires idempotent processing (dedup key) + transactional writes. Not supported by Kafka itself; implement in application.