Messaging Patterns — Deep Dive

Level: Intermediate Pre-reading: 04 · Event-Driven Architecture · 04.02 · Kafka Deep Dive


Message Ordering Guarantees

In distributed systems, message order matters. But networks don't guarantee it.

Producer sends: [Event1, Event2, Event3]

Network chaos:
  Event2 arrives first
  Event3 arrives second
  Event1 arrives last ← Out of order!

Consumer sees: [Event2, Event3, Event1] ← Wrong order

Kafka Ordering Guarantees

Scope Guarantee How
Within one partition Messages ordered Kafka appends to single log; offset increases monotonically
Across partitions No guarantee Different partitions = different brokers; no global ordering
Consumer group Each partition → exactly one consumer in group One consumer per partition preserves order

Ensuring Message Order

Solution: Use partition key to route related messages to same partition.

// Spring Kafka
@RestController
public class OrderController {

    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;

    @PostMapping("/orders")
    public Order createOrder(@RequestBody OrderRequest request) {
        Order order = saveOrder(request);

        // Send events; use orderId as partition key
        // All events for order-123 go to partition 0 → preserved order
        kafkaTemplate.send(
            new ProducerRecord<>(
                "order-events",
                order.getId(),           // Partition key
                new OrderCreatedEvent(order)
            )
        );

        return order;
    }
}

Effect:

orderId=ORD-123 → Partition 0
orderId=ORD-124 → Partition 1
orderId=ORD-125 → Partition 0

Partition 0 events:
  [ORD-123 created, ORD-123 paid, ORD-123 shipped]  ← Ordered within partition


Idempotency (Exactly-Once Semantics)

Problem: At-least-once delivery means events might arrive twice.

Timeline:
  Producer sends OrderConfirmed event
  Consumer processes: updates database, sends email
  Network hiccup; Kafka doesn't receive ack from consumer
  Kafka retries: same event arrives again
  Consumer processes AGAIN: duplicate email sent ✗

Solution: Idempotency Key

Every event has a unique ID. Consumer deduplicates using it.

public class OrderConfirmedEvent {
    public String eventId;        // Unique per event
    public String orderId;
    public LocalDateTime timestamp;
}

@RestController
public class OrderConsumer {

    @Autowired
    private ProcessedEventRepository processedEvents;

    @KafkaListener(topics = "order-confirmed")
    public void handleOrderConfirmed(OrderConfirmedEvent event) {
        // Idempotency check
        if (processedEvents.exists(event.eventId)) {
            log.info("Already processed event {}", event.eventId);
            return;  // Idempotent; safe to skip
        }

        // Process
        Order order = orderRepository.findById(event.orderId).orElseThrow();
        sendConfirmationEmail(order);

        // Record that we processed this event
        processedEvents.save(new ProcessedEvent(event.eventId));
    }
}

Idempotency Strategies

Strategy Pros Cons
Event ID + dedup table Guaranteed dedup Requires database lookup per event
Event ID + cache (Redis) Fast dedup; TTL cleanup Risk of eviction; memory overhead
Natural idempotency (see below) No dedup needed Not all operations are naturally idempotent

Natural Idempotency

Some operations are naturally idempotent: calling them multiple times has same effect as once.

// IDEMPOTENT: "Confirm order 123"
updateOrderStatus(order123, CONFIRMED);  // Idempotent; status stays CONFIRMED

// NOT IDEMPOTENT: "Increment order count"
order.count++;                             // First call: count=5; second call: count=6 ✗

// IDEMPOTENT: "Set order count to 5"
order.count = 5;                           // First call: count=5; second call: count=5 ✓

Dead Letter Queue (DLQ)

Problem: Some messages fail processing forever (e.g., malformed JSON, missing dependency).

Timeline:
  Consumer receives event with malformed data
  Tries to deserialize → error
  Retries 3x → still fails
  Message buried in Kafka offset; never processed again ✗

Solution: Route Failures to DLQ

@RestController
public class OrderConsumer {

    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;

    @KafkaListener(topics = "orders")
    public void handleOrder(OrderEvent event) {
        try {
            // Process order
            Order order = buildOrder(event);
            repository.save(order);
        } catch (Exception e) {
            log.error("Failed to process order event {}", event, e);

            // Send to DLQ for manual inspection
            kafkaTemplate.send("orders.dlq", event);
        }
    }
}

DLQ Monitoring

@RestController
public class DLQMonitor {

    @KafkaListener(topics = "orders.dlq")
    public void monitorDLQ(OrderEvent event) {
        // Alert ops team
        alerting.send(
            "Order DLQ received message: " + event.orderId +
            "\nReason: " + extractErrorFromHeader()
        );

        // Quarantine in separate database for manual review
        dlqRepository.save(new QuarantinedMessage(event));
    }
}

Message Ordering + Idempotency Combined

Real-world scenario: need both ordering AND at-least-once delivery.

@Component
public class OrderEventProcessor {

    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;

    @Autowired
    private ProcessedEventRepository processedEvents;

    @Autowired
    private OrderRepository orderRepository;

    @KafkaListener(topics = "order-events")
    public void handleOrderEvent(OrderEvent event) {
        // 1. Idempotency check
        if (processedEvents.exists(event.eventId)) {
            log.info("Skipping duplicate event {}", event.eventId);
            return;
        }

        try {
            // 2. Process
            processEvent(event);

            // 3. Record as processed (inside transaction with business logic)
            processedEvents.save(new ProcessedEvent(event.eventId));

        } catch (RetryableException e) {
            // Transient failure; Kafka will retry
            throw e;
        } catch (Exception e) {
            // Non-retryable; send to DLQ
            kafkaTemplate.send("order-events.dlq", event);
            processedEvents.save(new ProcessedEvent(event.eventId));
        }
    }

    private void processEvent(OrderEvent event) {
        if (event instanceof OrderCreatedEvent) {
            Order order = new Order(event.orderId);
            orderRepository.save(order);
        } else if (event instanceof OrderPaidEvent) {
            Order order = orderRepository.findById(event.orderId)
                .orElseThrow(() -> new OrderNotFoundException());
            order.status = OrderStatus.PAID;
            orderRepository.save(order);
        }
    }
}

Key design:

  • Partition key = orderId → preserves order per order
  • Idempotency key (eventId) + dedup table → prevents duplicates
  • DLQ for poison messages
  • Transactional processing → dedup record saved with business data

Batch Processing vs Stream Processing

Aspect Batch Stream
Trigger Time-based (hourly) or size-based (1000 msgs) Every message immediately
Latency High (1+ hour) Low (milliseconds)
Throughput High (efficient batching) Lower per-message
Example Daily reconciliation Real-time fraud detection

Batch Example

@Component
public class OrderBatchProcessor {

    @Scheduled(cron = "0 0 * * * *")  // Every hour
    public void processOrdersBatch() {
        // Fetch 1000 orders from Kafka (batch)
        List<OrderEvent> batch = kafkaConsumer.poll(
            Duration.ofSeconds(10),
            maxRecords = 1000
        );

        // Process in bulk
        List<Order> orders = batch.stream()
            .map(this::buildOrder)
            .collect(Collectors.toList());

        orderRepository.saveAll(orders);  // Bulk insert
    }
}

Stream Example

@Component
public class OrderStreamProcessor {

    @KafkaListener(topics = "orders")
    public void processOrderStream(OrderEvent event) {
        // Process immediately
        Order order = buildOrder(event);
        orderRepository.save(order);

        // Publish downstream
        eventPublisher.publish(new OrderConfirmedEvent(order));
    }
}

Consumer Group Coordination

Multiple consumers processing same topic; need to distribute partitions.

Topic: orders (4 partitions)

Scenario 1: 1 consumer
  Consumer 1 → Partition 0, 1, 2, 3 (handles all)

Scenario 2: 2 consumers
  Consumer 1 → Partition 0, 1
  Consumer 2 → Partition 2, 3 (rebalancing occurs)

Scenario 3: Consumer 1 crashes
  Rebalancing: partitions redistribute to healthy consumers

Rebalancing Tuning

@Bean
public ConsumerFactory<String, OrderEvent> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service");

    // Rebalancing behavior
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

    return new DefaultKafkaConsumerFactory<>(props);
}
Config Default Tuning
session.timeout.ms 30s Increase for slow processing (max 5min)
heartbeat.interval.ms 3s Set to 1/3 of session timeout
max.poll.interval.ms 5min Increase if processing one message takes > 5min

How do I choose between Kafka topics vs RabbitMQ queues?

Kafka: Event streaming, replay, multiple consumers. RabbitMQ: Work queues, task distribution, simpler setup. For microservices events, Kafka. For job processing, RabbitMQ.

What happens if a consumer can't keep up with message rate?

Consumer lag increases (offset falls behind latest). Monitor with Kafka metrics consumer_lag. If persistent, scale up consumer instances or optimize processing logic.

Is exactly-once processing possible in Kafka?

Kafka guarantees at-least-once delivery out of the box. Exactly-once requires idempotent processing (dedup key) + transactional writes. Not supported by Kafka itself; implement in application.