Event Sourcing — Deep Dive

Level: Advanced
Pre-reading: 04 · Event-Driven Architecture · 04.03 · CQRS


What is Event Sourcing?

Event Sourcing stores all changes to application state as a sequence of events. The current state is derived by replaying events.

graph LR
    E1[OrderCreated] --> E2[ItemAdded]
    E2 --> E3[ItemAdded]
    E3 --> E4[ItemRemoved]
    E4 --> E5[OrderPlaced]
    E5 --> S[Current State: Order with 2 items, status=PLACED]
Traditional Event Sourcing
Store current state only Store all events
UPDATE overwrites history Events are immutable
"What is the state?" "How did we get here?"
Audit trail separate Audit built-in

Event Store

The event store is an append-only log of events, partitioned by aggregate.

CREATE TABLE event_store (
    id BIGSERIAL PRIMARY KEY,
    aggregate_id UUID NOT NULL,
    aggregate_type VARCHAR(255) NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    event_data JSONB NOT NULL,
    version INT NOT NULL,
    occurred_at TIMESTAMP NOT NULL,

    UNIQUE (aggregate_id, version)  -- Optimistic concurrency
);

Event Structure

public record StoredEvent(
    UUID id,
    UUID aggregateId,
    String aggregateType,
    String eventType,
    String eventData,        // Serialized event
    int version,             // Aggregate version
    Instant occurredAt,
    Map<String, String> metadata  // correlationId, causationId
) {}

Append-Only Semantics

public interface EventStore {
    // Append events (expects version for optimistic concurrency)
    void append(UUID aggregateId, List<DomainEvent> events, int expectedVersion);

    // Load events for an aggregate
    List<StoredEvent> load(UUID aggregateId);

    // Load events from a position (for projections)
    List<StoredEvent> loadFrom(long position, int batchSize);
}

Reconstructing State

To get current state, replay all events for the aggregate.

public class Order {
    private OrderId id;
    private CustomerId customerId;
    private List<OrderLine> lines = new ArrayList<>();
    private OrderStatus status;
    private int version;

    // Apply events to build state
    public void apply(OrderCreated event) {
        this.id = event.orderId();
        this.customerId = event.customerId();
        this.status = OrderStatus.DRAFT;
    }

    public void apply(ItemAdded event) {
        this.lines.add(new OrderLine(event.productId(), event.quantity(), event.price()));
    }

    public void apply(ItemRemoved event) {
        this.lines.removeIf(l -> l.productId().equals(event.productId()));
    }

    public void apply(OrderPlaced event) {
        this.status = OrderStatus.PLACED;
    }

    // Reconstruct from events
    public static Order fromEvents(List<DomainEvent> events) {
        Order order = new Order();
        for (DomainEvent event : events) {
            order.apply(event);
            order.version++;
        }
        return order;
    }
}

Repository Pattern

public class EventSourcedOrderRepository {
    private final EventStore eventStore;

    public Order load(OrderId id) {
        List<StoredEvent> events = eventStore.load(id.value());
        if (events.isEmpty()) {
            throw new OrderNotFoundException(id);
        }
        return Order.fromEvents(deserializeEvents(events));
    }

    public void save(Order order) {
        List<DomainEvent> newEvents = order.getUncommittedEvents();
        eventStore.append(order.getId().value(), newEvents, order.getVersion());
        order.markEventsCommitted();
    }
}

Snapshots

Replaying thousands of events is slow. Snapshots cache state at a point in time.

graph LR
    subgraph Event Stream
        E1[Event 1]
        E2[Event 2]
        E3[...]
        E100[Event 100]
        S[Snapshot @ 100]
        E101[Event 101]
        E102[Event 102]
    end
    S --> R[Replay from 101]

Snapshot Implementation

public class SnapshotStore {
    void save(UUID aggregateId, int version, String serializedState);
    Optional<Snapshot> load(UUID aggregateId);
}

public Order load(OrderId id) {
    Optional<Snapshot> snapshot = snapshotStore.load(id.value());

    int fromVersion;
    Order order;

    if (snapshot.isPresent()) {
        order = deserialize(snapshot.get().state());
        fromVersion = snapshot.get().version() + 1;
    } else {
        order = new Order();
        fromVersion = 0;
    }

    List<StoredEvent> events = eventStore.loadFrom(id.value(), fromVersion);
    for (StoredEvent event : events) {
        order.apply(deserialize(event));
    }

    return order;
}

Snapshot Strategies

Strategy When to Snapshot
Every N events After 100 events
Time-based Every hour
On read If replay takes > X ms
On write When version % N == 0

Projections

Projections are read models built from events. They're independent views of the event stream.

graph LR
    ES[(Event Store)] --> P1[Order Summary Projection]
    ES --> P2[Customer Activity Projection]
    ES --> P3[Sales Dashboard Projection]
    P1 --> DB1[(PostgreSQL)]
    P2 --> DB2[(Elasticsearch)]
    P3 --> DB3[(Redis)]

Projection Handler

@Component
public class OrderSummaryProjection {
    private final OrderSummaryRepository repository;
    private long lastProcessedPosition;

    @Scheduled(fixedDelay = 100)
    public void process() {
        List<StoredEvent> events = eventStore.loadFrom(lastProcessedPosition, 100);

        for (StoredEvent event : events) {
            switch (event.eventType()) {
                case "OrderCreated" -> createSummary(event);
                case "OrderPlaced" -> updateStatus(event, "PLACED");
                case "OrderShipped" -> updateStatus(event, "SHIPPED");
            }
            lastProcessedPosition = event.id();
        }

        saveCheckpoint(lastProcessedPosition);
    }
}

Projection Characteristics

Characteristic Implication
Async Eventually consistent
Rebuildable Can recreate from events
Independent Each projection has own schema
Additive Add new projections anytime

Event Versioning

Events are immutable, but requirements change. Handle schema evolution.

Upcasting

Transform old events to new schema at read time.

public class EventUpcaster {
    public DomainEvent upcast(StoredEvent stored) {
        if (stored.eventType().equals("OrderCreated") && stored.version() == 1) {
            // V1 didn't have currency, default to USD
            OrderCreatedV1 v1 = deserialize(stored, OrderCreatedV1.class);
            return new OrderCreatedV2(v1.orderId(), v1.customerId(), v1.amount(), "USD");
        }
        return deserialize(stored);
    }
}

Event Type Versioning

// V1 of the event
public record OrderCreated(OrderId orderId, CustomerId customerId, BigDecimal amount) {}

// V2 adds currency
public record OrderCreatedV2(OrderId orderId, CustomerId customerId, Money amount) {}

Migration Strategies

Strategy Description
Upcasting Transform at read time; lazy
Copy-transform Migrate to new event stream
Dual write Write both versions temporarily
Version field Include version in event

Concurrency Control

Prevent concurrent modifications to same aggregate.

Optimistic Locking

public void append(UUID aggregateId, List<Event> events, int expectedVersion) {
    int actualVersion = getLatestVersion(aggregateId);
    if (actualVersion != expectedVersion) {
        throw new ConcurrencyException("Expected version " + expectedVersion 
            + " but was " + actualVersion);
    }
    // Proceed with append
}

Handling Conflicts

public void save(Order order) {
    try {
        eventStore.append(order.getId(), order.getUncommittedEvents(), order.getVersion());
    } catch (ConcurrencyException e) {
        // Reload and retry (or fail)
        Order fresh = load(order.getId());
        // Re-apply command on fresh aggregate
    }
}

When to Use Event Sourcing

Good Fit

Scenario Why Event Sourcing Helps
Audit requirements Full history built-in
Regulatory compliance Immutable, verifiable log
Complex state machines Easier to reason about transitions
Debugging "Time travel" to any point
Multiple projections Build any view from events
Event-driven architecture Events are the data model

Poor Fit

Scenario Why Not
Simple CRUD Overkill; just use a database
Team unfamiliar High learning curve
High frequency updates Event volume explosion
No audit needs Complexity not justified

Event Sourcing Challenges

Challenge Mitigation
Replay time Snapshots
Event schema evolution Upcasting, versioning
Eventual consistency CQRS read models
Complexity Start simple; evolve
Debugging Good tooling; event browser
Storage growth Archiving; compaction (careful!)

Event Sourcing Frameworks

Framework Language Notes
Axon Framework Java Full CQRS + ES; saga support
EventStoreDB Any Purpose-built event database
Marten .NET PostgreSQL-based
Eventuous .NET Lightweight; EventStoreDB
eventstore Node.js EventStoreDB client

What's the difference between Event Sourcing and CQRS?

Event Sourcing is about storing events as the source of truth and deriving state by replay. CQRS is about separating read and write models. They're often used together (events feed projections) but are independent patterns. You can use CQRS without Event Sourcing and vice versa.

How do you handle event schema changes in Event Sourcing?

(1) Upcasting: Transform old events to new schema at read time. (2) Versioned events: Include version number; handle each version. (3) New event types: Introduce OrderCreatedV2 instead of changing OrderCreated. (4) Copy-transform: Migrate entire stream (expensive; rarely needed). Always design events with evolution in mind.

How do you prevent replay from taking too long?

(1) Snapshots: Store state periodically; replay only from last snapshot. (2) Smaller aggregates: Fewer events per aggregate. (3) Projection checkpoints: Read models track their position; don't replay fully. (4) Parallel replay: Multiple projections process in parallel. (5) In-memory caching: Keep hot aggregates loaded.