Kafka Streams (Lab 08)
Learning Goal
Build an order enrichment + aggregation pipeline using Kafka Streams. You'll implement: - Stream-Table join (enrich orders with customer data) - Stateful aggregation (count orders per customer per minute) - Interactive queries (REST endpoint to query order counts)
Estimated time: 40 minutes
Prerequisites:
- Docker & Kafka running (docker-compose up)
- Lab 01–07 completed (or at least familiar with topics/producers)
Architecture Overview
Orders Topic
↓
┌─────────────────────────────────┐
│ Kafka Streams Topology │
├─────────────────────────────────┤
│ 1. KStream (orders) │
│ 2. KTable (customers) │
│ 3. Join: enrich order │
│ 4. Aggregate: count per minute │
│ 5. Interactive query store │
└─────────────────────────────────┘
↓ ↓
Enriched Orders Order Counts
Topic Topic
Step 0: Verify Prerequisites
Ensure Kafka, Zookeeper, and Schema Registry are running:
cd /Users/sgovinda/Learn/Kafka-Learning
docker-compose up -d
# Check containers
docker ps | grep -E "kafka|zookeeper|registry"
Expected output: 3+ containers running.
Step 1: Create Required Topics
Create topics for orders, customers, enriched orders, and order counts:
# Orders topic
docker exec -it kafka kafka-topics.sh \
--create \
--topic orders-topic \
--bootstrap-server localhost:9092 \
--partitions 2 \
--replication-factor 1 \
--if-not-exists
# Customers topic (will be used as KTable)
docker exec -it kafka kafka-topics.sh \
--create \
--topic customers-topic \
--bootstrap-server localhost:9092 \
--partitions 2 \
--replication-factor 1 \
--if-not-exists
# Output topic — enriched orders
docker exec -it kafka kafka-topics.sh \
--create \
--topic enriched-orders-topic \
--bootstrap-server localhost:9092 \
--partitions 2 \
--replication-factor 1 \
--if-not-exists
# Output topic — order counts
docker exec -it kafka kafka-topics.sh \
--create \
--topic order-counts-topic \
--bootstrap-server localhost:9092 \
--partitions 2 \
--replication-factor 1 \
--if-not-exists
# Verify
docker exec -it kafka kafka-topics.sh \
--list \
--bootstrap-server localhost:9092 | grep -E "orders|customers|enriched|counts"
Expected output:
Step 2: Create Domain Classes
Add the Order and Customer classes to your project:
File: src/main/java/com/example/kafka/streams/domain/Order.java
package com.example.kafka.streams.domain;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Order {
@JsonProperty("order_id")
private String orderId;
@JsonProperty("customer_id")
private String customerId;
@JsonProperty("amount")
private BigDecimal amount;
@JsonProperty("timestamp")
private long timestamp;
}
File: src/main/java/com/example/kafka/streams/domain/Customer.java
package com.example.kafka.streams.domain;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Customer {
@JsonProperty("customer_id")
private String customerId;
@JsonProperty("name")
private String name;
@JsonProperty("email")
private String email;
@JsonProperty("tier")
private String tier; // "GOLD", "SILVER", "BRONZE"
}
File: src/main/java/com/example/kafka/streams/domain/EnrichedOrder.java
package com.example.kafka.streams.domain;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class EnrichedOrder {
@JsonProperty("order_id")
private String orderId;
@JsonProperty("customer_id")
private String customerId;
@JsonProperty("customer_name")
private String customerName;
@JsonProperty("customer_tier")
private String customerTier;
@JsonProperty("amount")
private BigDecimal amount;
@JsonProperty("timestamp")
private long timestamp;
}
File: src/main/java/com/example/kafka/streams/domain/OrderStats.java
package com.example.kafka.streams.domain;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderStats {
@JsonProperty("customer_id")
private String customerId;
@JsonProperty("count")
private long count;
@JsonProperty("window_start")
private long windowStart;
@JsonProperty("window_end")
private long windowEnd;
}
Step 3: Configure Kafka Streams
File: src/main/java/com/example/kafka/streams/config/StreamsConfig.java
package com.example.kafka.streams.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.config.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.support.serializer.JsonSerde;
import com.example.kafka.streams.domain.Order;
import com.example.kafka.streams.domain.Customer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.application.name:streams-app}")
private String applicationId;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
// Use at-least-once for this lab (simpler for debugging)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.AT_LEAST_ONCE_CONFIG);
// State store cleanup
props.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 10000L);
// Default serdes
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new StreamsConfig(props);
}
}
Step 4: Build the Kafka Streams Topology
File: src/main/java/com/example/kafka/streams/topology/OrderEnrichmentTopology.java
package com.example.kafka.streams.topology;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.example.kafka.streams.domain.Customer;
import com.example.kafka.streams.domain.EnrichedOrder;
import com.example.kafka.streams.domain.Order;
import com.example.kafka.streams.domain.OrderStats;
import org.springframework.kafka.support.serializer.JsonSerde;
import java.time.Duration;
@Component
public class OrderEnrichmentTopology {
@Autowired
public void buildTopology(StreamsBuilder builder) {
// Serdes
JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
JsonSerde<Customer> customerSerde = new JsonSerde<>(Customer.class);
JsonSerde<EnrichedOrder> enrichedOrderSerde = new JsonSerde<>(EnrichedOrder.class);
JsonSerde<OrderStats> statsSerde = new JsonSerde<>(OrderStats.class);
// 1. Build KStream from orders topic
KStream<String, Order> orders = builder.stream(
"orders-topic",
org.apache.kafka.streams.kstream.Consumed.with(
Serdes.String(),
orderSerde
)
);
// 2. Build KTable from customers (for enrichment)
KTable<String, Customer> customers = builder.table(
"customers-topic",
org.apache.kafka.streams.kstream.Consumed.with(
Serdes.String(),
customerSerde
)
);
// 3. Join: enrich orders with customer data
KStream<String, EnrichedOrder> enrichedOrders = orders
.leftJoin(
customers,
(order, customer) -> {
if (customer == null) {
// Customer not found
return new EnrichedOrder(
order.getOrderId(),
order.getCustomerId(),
"UNKNOWN",
"UNKNOWN",
order.getAmount(),
order.getTimestamp()
);
}
return new EnrichedOrder(
order.getOrderId(),
order.getCustomerId(),
customer.getName(),
customer.getTier(),
order.getAmount(),
order.getTimestamp()
);
},
org.apache.kafka.streams.kstream.Joined.with(
Serdes.String(),
orderSerde,
customerSerde
)
);
// 4. Output enriched orders
enrichedOrders.to(
"enriched-orders-topic",
org.apache.kafka.streams.kstream.Produced.with(
Serdes.String(),
enrichedOrderSerde
)
);
// 5. Aggregate: count orders per customer per minute
enrichedOrders
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.count(Materialized.as("order-counts-store"))
.toStream()
.map((windowedKey, count) -> {
String customerId = windowedKey.key();
long windowStart = windowedKey.window().startTime().toEpochMilli();
long windowEnd = windowedKey.window().endTime().toEpochMilli();
OrderStats stats = new OrderStats(
customerId,
count,
windowStart,
windowEnd
);
return new org.apache.kafka.streams.KeyValue<>(
windowedKey.key(),
stats
);
})
.to(
"order-counts-topic",
org.apache.kafka.streams.kstream.Produced.with(
Serdes.String(),
statsSerde
)
);
// Log topology
System.out.println("\n=== Kafka Streams Topology ===");
System.out.println(builder.build().describe());
System.out.println("===============================\n");
}
}
Step 5: Create Interactive Query Service
File: src/main/java/com/example/kafka/streams/service/InteractiveQueryService.java
package com.example.kafka.streams.service;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.stereotype.Service;
@Service
public class InteractiveQueryService {
@Autowired
private StreamsBuilderFactoryBean factoryBean;
public Long getOrderCount(String customerId) {
KafkaStreams streams = factoryBean.getKafkaStreams();
if (streams == null || !streams.state().isRunning()) {
throw new IllegalStateException("Kafka Streams is not running");
}
try {
ReadOnlyKeyValueStore<String, Long> store = streams.getQueryableStore(
"order-counts-store",
QueryableStoreTypes.keyValueStore()
);
Long count = store.get(customerId);
return count != null ? count : 0L;
} catch (Exception e) {
throw new RuntimeException("Failed to query store: " + e.getMessage(), e);
}
}
}
Step 6: Create REST Controllers
File: src/main/java/com/example/kafka/streams/controller/OrderProducerController.java
package com.example.kafka.streams.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.example.kafka.streams.domain.Order;
@RestController
@RequestMapping("/api/streams/orders")
public class OrderProducerController {
@Autowired
private KafkaTemplate<String, Order> kafkaTemplate;
@PostMapping
public String produceOrder(@RequestBody Order order) {
kafkaTemplate.send("orders-topic", order.getCustomerId(), order);
return "Order produced: " + order.getOrderId();
}
}
File: src/main/java/com/example/kafka/streams/controller/CustomerProducerController.java
package com.example.kafka.streams.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.example.kafka.streams.domain.Customer;
@RestController
@RequestMapping("/api/streams/customers")
public class CustomerProducerController {
@Autowired
private KafkaTemplate<String, Customer> kafkaTemplate;
@PostMapping
public String produceCustomer(@RequestBody Customer customer) {
kafkaTemplate.send("customers-topic", customer.getCustomerId(), customer);
return "Customer produced: " + customer.getCustomerId();
}
}
File: src/main/java/com/example/kafka/streams/controller/InteractiveQueryController.java
package com.example.kafka.streams.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.example.kafka.streams.service.InteractiveQueryService;
@RestController
@RequestMapping("/api/streams/queries")
public class InteractiveQueryController {
@Autowired
private InteractiveQueryService queryService;
@GetMapping("/order-count/{customerId}")
public QueryResult getOrderCount(@PathVariable String customerId) {
Long count = queryService.getOrderCount(customerId);
return new QueryResult(customerId, count);
}
public static class QueryResult {
public String customerId;
public Long orderCount;
public QueryResult(String customerId, Long orderCount) {
this.customerId = customerId;
this.orderCount = orderCount;
}
}
}
Step 7: Configure Application Properties
File: src/main/resources/application.yml
spring:
application:
name: kafka-streams-demo
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
bootstrap-servers: localhost:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: com.example.kafka.streams.domain
server:
port: 8080
logging:
level:
org.apache.kafka: INFO
org.springframework.kafka: INFO
Step 8: Run the Application
Build and start the Spring Boot application:
Wait for the startup message:
The app is now running and consuming from orders-topic and customers-topic.
Step 9: Test the Streams Application
Produce a Customer
curl -X POST http://localhost:8080/api/streams/customers \
-H "Content-Type: application/json" \
-d '{
"customer_id": "CUST_001",
"name": "Alice Johnson",
"email": "alice@example.com",
"tier": "GOLD"
}'
Produce Orders
curl -X POST http://localhost:8080/api/streams/orders \
-H "Content-Type: application/json" \
-d '{
"order_id": "ORD_001",
"customer_id": "CUST_001",
"amount": 250.50,
"timestamp": '$(date +%s)'000'
}'
curl -X POST http://localhost:8080/api/streams/orders \
-H "Content-Type: application/json" \
-d '{
"order_id": "ORD_002",
"customer_id": "CUST_001",
"amount": 120.75,
"timestamp": '$(date +%s)'000'
}'
Query Order Count (Interactive Query)
curl http://localhost:8080/api/streams/queries/order-count/CUST_001
# Expected response:
# {
# "customerId": "CUST_001",
# "orderCount": 2
# }
Verify Output Topics
Enriched Orders:
docker exec -it kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic enriched-orders-topic \
--from-beginning \
--property print.key=true
Expected output:
CUST_001 {"order_id":"ORD_001","customer_id":"CUST_001","customer_name":"Alice Johnson","customer_tier":"GOLD","amount":250.50,"timestamp":...}
CUST_001 {"order_id":"ORD_002","customer_id":"CUST_001","customer_name":"Alice Johnson","customer_tier":"GOLD","amount":120.75,"timestamp":...}
Order Counts:
docker exec -it kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic order-counts-topic \
--from-beginning \
--property print.key=true
Expected output:
CUST_001 {"customer_id":"CUST_001","count":1,"window_start":...,"window_end":...}
CUST_001 {"customer_id":"CUST_001","count":2,"window_start":...,"window_end":...}
Step 10: Checkpoint
✅ You've successfully: 1. Created a Kafka Streams topology with stream-table join 2. Enriched order data with customer information 3. Aggregated orders per customer per time window 4. Queried state stores via an interactive REST endpoint
Key observations: - Join happens as orders arrive - If a customer exists in the customers topic, orders are enriched; if not, the left join outputs null customer data - Window-based aggregation counts orders within 1-minute windows - Interactive queries bypass the output topic (sub-millisecond latency)
Bonus: Advanced Exercises
Exercise 1: Stream-Stream Join
Modify the topology to join orders with shipments (both KStreams) within a 1-hour window:
KStream<String, Shipment> shipments = builder.stream("shipments-topic");
enrichedOrders.join(
shipments,
(order, shipment) -> new OrderShipmentMatched(order, shipment),
JoinWindows.of(Duration.ofHours(1))
)
.to("order-shipment-matched-topic");
Exercise 2: GlobalKTable for Geo Codes
Create a GlobalKTable of country codes and join orders to add geo information:
GlobalKTable<String, GeoData> geos = builder.globalTable("geo-codes-topic");
enrichedOrders.join(
geos,
(order) -> order.getCountryCode(),
(order, geo) -> new GeoEnrichedOrder(order, geo)
)
.to("geo-enriched-orders");
Exercise 3: Multiple Aggregations
In the same topology, compute: - Count per customer per minute (done ✓) - Sum of amounts per customer per minute - Max order amount per customer per minute
Key Takeaways
What You Learned
- Topology Building — Define processors, sources, sinks
- Stream-Table Joins — Enrich streams with stateful data
- Windowing — Group events by time
- Stateful Aggregations — Count, sum, reduce with state stores
- Interactive Queries — Query RocksDB stores without output topics
- Spring Integration — @EnableKafkaStreams, StreamsBuilder, InteractiveQueryService
Troubleshooting
| Issue | Solution |
|---|---|
| "Kafka Streams is not running" | Check application logs; ensure Kafka broker is up |
| Empty output topics | Check topic names in topology vs. property files |
| Interactive query returns 0 | Window may not have closed; wait 1+ minute |
| Schema mismatch errors | Ensure JSON serdes match domain classes |
Next Steps
- Read Kafka Streams (09) for deeper concepts
- Explore Session Windows (dynamic based on inactivity)
- Implement Exactly-Once Semantics (EOS) and measure performance impact
- Add Prometheus metrics to monitor your topology