Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Chapter 08: Event Sourcing & CQRS

Append-Only Event Architecture

This chapter details the event sourcing architecture for the POS Platform. Event sourcing provides complete audit trails, temporal queries, and enables offline conflict resolution - critical capabilities for a retail POS system.


Why Event Sourcing for POS?

Traditional CRUD systems store only current state. Event sourcing stores every change as an immutable event, enabling:

Traditional CRUD vs Event Sourcing
==================================

CRUD Approach:
+------------------+
| inventory_items  |
|------------------|
| sku: NXP001      |
| quantity: 45     |  <- Only current state
| updated_at: now  |
+------------------+

Event Sourcing Approach:
+------------------+
| events           |
|------------------|
| InventoryReceived: +100 @ 2025-01-01 09:00  |
| ItemSold: -2 @ 2025-01-01 10:15             |
| ItemSold: -1 @ 2025-01-01 11:30             |
| ItemSold: -3 @ 2025-01-01 14:22             |
| AdjustmentMade: -49 @ 2025-01-01 16:00      |  <- Caught discrepancy!
| ItemSold: -1 @ 2025-01-02 09:15             |
| Current State: 45 (sum of all events)       |
+------------------+

Benefits for Retail POS

BenefitDescription
Complete Audit TrailEvery sale, void, refund, adjustment is recorded forever
Temporal Queries“What was our inventory on December 15th at 3pm?”
Offline SyncEvents queue locally, merge when online
Conflict ResolutionCompare event streams, not states
DebuggingReplay events to reproduce issues
CompliancePCI-DSS, SOX require transaction logs

Event Sourcing Architecture

Event Sourcing Architecture
===========================

+-------------------------------------------------------------------------+
|                              POS CLIENT                                  |
|                                                                          |
|   +------------------+    +-------------------+    +-----------------+   |
|   |  Command Handler |    |   Event Store     |    |   Projector     |   |
|   |                  |    |   (Local SQLite)  |    |   (Read Model)  |   |
|   |  CreateSale      |--->|                   |--->|                 |   |
|   |  VoidSale        |    | SaleCreated       |    | sale_summaries  |   |
|   |  AddPayment      |    | ItemAdded         |    | inventory_view  |   |
|   +------------------+    | PaymentReceived   |    +-----------------+   |
|                           +-------------------+                          |
|                                    |                                     |
+-------------------------------------------------------------------------+
                                     | Sync
                                     v
+-------------------------------------------------------------------------+
|                            CENTRAL API                                   |
|                                                                          |
|   +------------------+    +-------------------+    +-----------------+   |
|   |  Command Handler |    |   Event Store     |    |   Projector     |   |
|   |  (Validates)     |    |   (PostgreSQL)    |    |   (Read Model)  |   |
|   |                  |<---|                   |--->|                 |   |
|   |  Deduplication   |    | All tenant events |    | sales           |   |
|   |  Conflict Check  |    | Append-only       |    | inventory_items |   |
|   +------------------+    | Immutable         |    | customers       |   |
|                           +-------------------+    +-----------------+   |
+-------------------------------------------------------------------------+

CQRS Pattern

Command Query Responsibility Segregation separates write and read models:

CQRS Pattern
============

                          +----------------------+
                          |     User Action      |
                          +----------+-----------+
                                     |
              +----------------------+----------------------+
              |                                             |
              v                                             v
    +-------------------+                        +-------------------+
    |     COMMAND       |                        |      QUERY        |
    |     (Write)       |                        |      (Read)       |
    +-------------------+                        +-------------------+
              |                                             |
              v                                             v
    +-------------------+                        +-------------------+
    | Command Handler   |                        | Query Handler     |
    | - Validate        |                        | - No validation   |
    | - Business rules  |                        | - Fast lookup     |
    | - Generate events |                        | - Denormalized    |
    +-------------------+                        +-------------------+
              |                                             ^
              v                                             |
    +-------------------+                        +-------------------+
    |   Event Store     |----------------------->|   Read Models     |
    |   (Append-only)   |     Projections       |   (Optimized)     |
    +-------------------+                        +-------------------+

Write Side (Commands)

// Commands - Express intent
public record CreateSaleCommand(
    Guid SaleId,
    Guid LocationId,
    Guid EmployeeId,
    Guid? CustomerId,
    List<SaleLineItemDto> LineItems
);

public record VoidSaleCommand(
    Guid SaleId,
    Guid EmployeeId,
    string Reason
);

public record AddPaymentCommand(
    Guid SaleId,
    string PaymentMethod,
    decimal Amount,
    string? Reference
);

Read Side (Queries)

// Queries - Request data
public record GetSaleByIdQuery(Guid SaleId);
public record GetDailySalesQuery(Guid LocationId, DateTime Date);
public record GetInventoryLevelQuery(string Sku, Guid LocationId);

// Read models - Optimized for queries
public class SaleSummaryView
{
    public Guid Id { get; set; }
    public string SaleNumber { get; set; }
    public string CustomerName { get; set; }  // Denormalized
    public string EmployeeName { get; set; }  // Denormalized
    public decimal Total { get; set; }
    public string Status { get; set; }
    public DateTime CreatedAt { get; set; }
}

Event Store Schema

The append-only event store is the source of truth:

-- Event Store Schema

CREATE TABLE events (
    id              BIGSERIAL PRIMARY KEY,
    event_id        UUID UNIQUE NOT NULL DEFAULT gen_random_uuid(),
    aggregate_type  VARCHAR(100) NOT NULL,     -- 'Sale', 'Inventory', 'Customer'
    aggregate_id    UUID NOT NULL,             -- The entity this event belongs to
    event_type      VARCHAR(100) NOT NULL,     -- 'SaleCreated', 'ItemAdded'
    event_data      JSONB NOT NULL,            -- Full event payload
    metadata        JSONB NOT NULL DEFAULT '{}', -- Correlation, causation IDs
    version         INTEGER NOT NULL,          -- Aggregate version (for optimistic concurrency)
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    created_by      UUID,                      -- Employee who caused the event

    -- Optimistic concurrency: aggregate_id + version must be unique
    UNIQUE (aggregate_type, aggregate_id, version)
);

-- Indexes for common queries
CREATE INDEX idx_events_aggregate ON events (aggregate_type, aggregate_id);
CREATE INDEX idx_events_type ON events (event_type);
CREATE INDEX idx_events_created_at ON events USING BRIN (created_at);
CREATE INDEX idx_events_metadata ON events USING GIN (metadata);

-- Snapshots table (for performance on long event streams)
CREATE TABLE snapshots (
    id              BIGSERIAL PRIMARY KEY,
    aggregate_type  VARCHAR(100) NOT NULL,
    aggregate_id    UUID NOT NULL,
    version         INTEGER NOT NULL,
    state           JSONB NOT NULL,            -- Serialized aggregate state
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    UNIQUE (aggregate_type, aggregate_id)
);

-- Outbox table (for reliable event publishing)
CREATE TABLE event_outbox (
    id              BIGSERIAL PRIMARY KEY,
    event_id        UUID NOT NULL REFERENCES events(event_id),
    destination     VARCHAR(100) NOT NULL,     -- 'signalr', 'webhook', 'sync'
    status          VARCHAR(20) DEFAULT 'pending',
    attempts        INTEGER DEFAULT 0,
    last_error      TEXT,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    processed_at    TIMESTAMPTZ
);

Domain Events Catalog

Sale Aggregate Events

Sale Events
===========

SaleCreated
+-----------------------+----------------------------------------+
| Field                 | Description                            |
+-----------------------+----------------------------------------+
| sale_id               | UUID of the new sale                   |
| sale_number           | Human-readable sale number             |
| location_id           | Where the sale occurred                |
| register_id           | Which register                         |
| employee_id           | Who created the sale                   |
| customer_id           | Customer (if any)                      |
| created_at            | Timestamp                              |
+-----------------------+----------------------------------------+

SaleLineItemAdded
+-----------------------+----------------------------------------+
| sale_id               | Parent sale                            |
| line_item_id          | UUID of the line item                  |
| product_id            | Product being sold                     |
| variant_id            | Variant (if any)                       |
| sku                   | SKU at time of sale                    |
| name                  | Product name at time of sale           |
| quantity              | Quantity sold                          |
| unit_price            | Price per unit                         |
| discount_amount       | Line discount                          |
| tax_amount            | Line tax                               |
+-----------------------+----------------------------------------+

SaleLineItemRemoved
+-----------------------+----------------------------------------+
| sale_id               | Parent sale                            |
| line_item_id          | UUID of removed item                   |
| reason                | Why removed                            |
+-----------------------+----------------------------------------+

PaymentReceived
+-----------------------+----------------------------------------+
| sale_id               | Parent sale                            |
| payment_id            | UUID of payment                        |
| payment_method        | cash, credit, debit, etc.              |
| amount                | Payment amount                         |
| reference             | Card last 4, check #, etc.             |
| auth_code             | Authorization code                     |
+-----------------------+----------------------------------------+

SaleCompleted
+-----------------------+----------------------------------------+
| sale_id               | The sale being completed               |
| subtotal              | Final subtotal                         |
| discount_total        | Total discounts                        |
| tax_total             | Total tax                              |
| total                 | Final total                            |
| completed_at          | Timestamp                              |
+-----------------------+----------------------------------------+

SaleVoided
+-----------------------+----------------------------------------+
| sale_id               | The voided sale                        |
| voided_by             | Employee who voided                    |
| reason                | Void reason                            |
| voided_at             | Timestamp                              |
+-----------------------+----------------------------------------+

Inventory Aggregate Events

Inventory Events
================

InventoryReceived
+-----------------------+----------------------------------------+
| location_id           | Where received                         |
| product_id            | Product                                |
| variant_id            | Variant (if any)                       |
| quantity              | Amount received                        |
| cost                  | Unit cost                              |
| reference             | PO number, transfer #                  |
| received_by           | Employee                               |
+-----------------------+----------------------------------------+

InventoryAdjusted
+-----------------------+----------------------------------------+
| location_id           | Location                               |
| product_id            | Product                                |
| variant_id            | Variant (if any)                       |
| quantity_change       | +/- amount                             |
| new_quantity          | New on-hand quantity                   |
| reason                | count, damage, theft, return           |
| adjusted_by           | Employee                               |
| notes                 | Additional context                     |
+-----------------------+----------------------------------------+

InventorySold
+-----------------------+----------------------------------------+
| location_id           | Where sold                             |
| product_id            | Product                                |
| variant_id            | Variant (if any)                       |
| quantity              | Amount sold (positive)                 |
| sale_id               | Related sale                           |
+-----------------------+----------------------------------------+

InventoryTransferred
+-----------------------+----------------------------------------+
| transfer_id           | Transfer document                      |
| from_location_id      | Source location                        |
| to_location_id        | Destination location                   |
| product_id            | Product                                |
| variant_id            | Variant (if any)                       |
| quantity              | Amount transferred                     |
| transferred_by        | Employee                               |
+-----------------------+----------------------------------------+

InventoryCounted
+-----------------------+----------------------------------------+
| location_id           | Location                               |
| product_id            | Product                                |
| variant_id            | Variant                                |
| expected_quantity     | System quantity before count           |
| actual_quantity       | Physical count                         |
| variance              | Difference                             |
| counted_by            | Employee                               |
| count_session_id      | Batch count session                    |
+-----------------------+----------------------------------------+

Customer Aggregate Events

Customer Events
===============

CustomerCreated
+-----------------------+----------------------------------------+
| customer_id           | New customer UUID                      |
| customer_number       | Human-readable ID                      |
| first_name            | First name                             |
| last_name             | Last name                              |
| email                 | Email address                          |
| phone                 | Phone number                           |
| created_by            | Employee                               |
+-----------------------+----------------------------------------+

CustomerUpdated
+-----------------------+----------------------------------------+
| customer_id           | Customer UUID                          |
| changes               | Map of field -> {old, new}             |
| updated_by            | Employee                               |
+-----------------------+----------------------------------------+

LoyaltyPointsEarned
+-----------------------+----------------------------------------+
| customer_id           | Customer                               |
| points                | Points earned                          |
| sale_id               | Related sale                           |
| new_balance           | Updated balance                        |
+-----------------------+----------------------------------------+

LoyaltyPointsRedeemed
+-----------------------+----------------------------------------+
| customer_id           | Customer                               |
| points                | Points redeemed                        |
| sale_id               | Related sale                           |
| new_balance           | Updated balance                        |
+-----------------------+----------------------------------------+

StoreCreditIssued
+-----------------------+----------------------------------------+
| customer_id           | Customer                               |
| credit_id             | Credit UUID                            |
| amount                | Credit amount                          |
| reason                | Why issued                             |
| issued_by             | Employee                               |
+-----------------------+----------------------------------------+

Employee Aggregate Events

Employee Events
===============

EmployeeClockIn
+-----------------------+----------------------------------------+
| employee_id           | Employee UUID                          |
| location_id           | Where clocking in                      |
| shift_id              | New shift UUID                         |
| clocked_in_at         | Timestamp                              |
+-----------------------+----------------------------------------+

EmployeeClockOut
+-----------------------+----------------------------------------+
| employee_id           | Employee UUID                          |
| shift_id              | Shift being closed                     |
| clocked_out_at        | Timestamp                              |
| break_minutes         | Total break time                       |
+-----------------------+----------------------------------------+

EmployeeBreakStarted
+-----------------------+----------------------------------------+
| employee_id           | Employee UUID                          |
| shift_id              | Current shift                          |
| started_at            | Break start time                       |
+-----------------------+----------------------------------------+

EmployeeBreakEnded
+-----------------------+----------------------------------------+
| employee_id           | Employee UUID                          |
| shift_id              | Current shift                          |
| ended_at              | Break end time                         |
| duration_minutes      | Break duration                         |
+-----------------------+----------------------------------------+

CashDrawer Aggregate Events

Cash Drawer Events
==================

DrawerOpened
+-----------------------+----------------------------------------+
| drawer_id             | Drawer UUID                            |
| register_id           | Register UUID                          |
| employee_id           | Who opened                             |
| opening_balance       | Starting cash amount                   |
| opened_at             | Timestamp                              |
+-----------------------+----------------------------------------+

DrawerCashDrop
+-----------------------+----------------------------------------+
| drawer_id             | Drawer UUID                            |
| amount                | Amount dropped to safe                 |
| employee_id           | Who dropped                            |
| dropped_at            | Timestamp                              |
+-----------------------+----------------------------------------+

DrawerPaidIn
+-----------------------+----------------------------------------+
| drawer_id             | Drawer UUID                            |
| amount                | Amount added                           |
| reason                | Why (petty cash, etc.)                 |
| employee_id           | Who added                              |
+-----------------------+----------------------------------------+

DrawerPaidOut
+-----------------------+----------------------------------------+
| drawer_id             | Drawer UUID                            |
| amount                | Amount removed                         |
| reason                | Why (vendor payment, etc.)             |
| employee_id           | Who removed                            |
+-----------------------+----------------------------------------+

DrawerClosed
+-----------------------+----------------------------------------+
| drawer_id             | Drawer UUID                            |
| employee_id           | Who closed                             |
| closing_balance       | Actual cash counted                    |
| expected_balance      | System calculated                      |
| variance              | Difference (over/short)                |
| closed_at             | Timestamp                              |
+-----------------------+----------------------------------------+

Event Projection Patterns

Projections transform events into read models:

Projection Architecture
=======================

+-------------------+
|   Event Stream    |
|                   |
| SaleCreated       |
| ItemAdded         |
| ItemAdded         |
| PaymentReceived   |
| SaleCompleted     |
+--------+----------+
         |
         | Projector reads events
         v
+-------------------+     +-------------------+     +-------------------+
| Sale Projector    |     |Inventory Projector|     |Customer Projector |
|                   |     |                   |     |                   |
| - Build sale view |     | - Update stock    |     | - Update stats    |
| - Calculate totals|     | - Track movements |     | - Loyalty points  |
+--------+----------+     +--------+----------+     +--------+----------+
         |                         |                         |
         v                         v                         v
+-------------------+     +-------------------+     +-------------------+
| sale_summaries    |     | inventory_levels  |     | customer_stats    |
| (Read Model)      |     | (Read Model)      |     | (Read Model)      |
+-------------------+     +-------------------+     +-------------------+

Example Projector Implementation

// SaleProjector.cs

public class SaleProjector : IEventHandler
{
    private readonly IDbContextFactory<ReadModelDbContext> _dbFactory;

    public SaleProjector(IDbContextFactory<ReadModelDbContext> dbFactory)
    {
        _dbFactory = dbFactory;
    }

    public async Task HandleAsync(SaleCreated @event)
    {
        await using var db = await _dbFactory.CreateDbContextAsync();

        var view = new SaleSummaryView
        {
            Id = @event.SaleId,
            SaleNumber = @event.SaleNumber,
            LocationId = @event.LocationId,
            EmployeeId = @event.EmployeeId,
            CustomerId = @event.CustomerId,
            Status = "draft",
            Subtotal = 0,
            Total = 0,
            CreatedAt = @event.CreatedAt
        };

        db.SaleSummaries.Add(view);
        await db.SaveChangesAsync();
    }

    public async Task HandleAsync(SaleLineItemAdded @event)
    {
        await using var db = await _dbFactory.CreateDbContextAsync();

        var sale = await db.SaleSummaries.FindAsync(@event.SaleId);
        if (sale == null) return;

        var lineTotal = @event.Quantity * @event.UnitPrice - @event.DiscountAmount;
        sale.Subtotal += lineTotal;
        sale.ItemCount += @event.Quantity;

        await db.SaveChangesAsync();
    }

    public async Task HandleAsync(SaleCompleted @event)
    {
        await using var db = await _dbFactory.CreateDbContextAsync();

        var sale = await db.SaleSummaries.FindAsync(@event.SaleId);
        if (sale == null) return;

        sale.Status = "completed";
        sale.DiscountTotal = @event.DiscountTotal;
        sale.TaxTotal = @event.TaxTotal;
        sale.Total = @event.Total;
        sale.CompletedAt = @event.CompletedAt;

        await db.SaveChangesAsync();
    }

    public async Task HandleAsync(SaleVoided @event)
    {
        await using var db = await _dbFactory.CreateDbContextAsync();

        var sale = await db.SaleSummaries.FindAsync(@event.SaleId);
        if (sale == null) return;

        sale.Status = "voided";
        sale.VoidedAt = @event.VoidedAt;
        sale.VoidedBy = @event.VoidedBy;
        sale.VoidReason = @event.Reason;

        await db.SaveChangesAsync();
    }
}

Temporal Queries

Event sourcing enables powerful temporal queries:

-- What was inventory on a specific date?
SELECT
    product_id,
    SUM(CASE
        WHEN event_type = 'InventoryReceived' THEN (event_data->>'quantity')::int
        WHEN event_type = 'InventorySold' THEN -(event_data->>'quantity')::int
        WHEN event_type = 'InventoryAdjusted' THEN (event_data->>'quantity_change')::int
        ELSE 0
    END) as quantity
FROM events
WHERE aggregate_type = 'Inventory'
  AND (event_data->>'location_id')::uuid = '...'
  AND created_at <= '2025-12-15 15:00:00'
GROUP BY product_id;

-- Sales trend for specific product
SELECT
    date_trunc('day', created_at) as date,
    SUM((event_data->>'quantity')::int) as units_sold
FROM events
WHERE event_type = 'InventorySold'
  AND (event_data->>'product_id')::uuid = '...'
  AND created_at >= NOW() - INTERVAL '30 days'
GROUP BY date_trunc('day', created_at)
ORDER BY date;

-- Audit trail for specific sale
SELECT
    event_type,
    event_data,
    created_at,
    created_by
FROM events
WHERE aggregate_type = 'Sale'
  AND aggregate_id = '...'
ORDER BY version;

Snapshots for Performance

For aggregates with many events, snapshots prevent replaying the entire stream:

Snapshot Strategy
=================

Without Snapshots:
Event 1 -> Event 2 -> ... -> Event 5000 -> Current State
(Slow for aggregates with many events)

With Snapshots:
Event 1 -> ... -> Event 1000 -> [Snapshot @ v1000]
                                      |
                                      -> Event 1001 -> ... -> Event 1050 -> Current State
(Load snapshot, then only replay 50 events)

Snapshot Implementation

// AggregateRepository.cs

public class AggregateRepository<T> where T : AggregateRoot
{
    private readonly IEventStore _eventStore;
    private readonly ISnapshotStore _snapshotStore;
    private const int SNAPSHOT_THRESHOLD = 100;

    public async Task<T> LoadAsync(Guid id)
    {
        var aggregate = Activator.CreateInstance<T>();

        // 1. Try to load snapshot
        var snapshot = await _snapshotStore.GetAsync<T>(id);
        int fromVersion = 0;

        if (snapshot != null)
        {
            aggregate.RestoreFromSnapshot(snapshot.State);
            fromVersion = snapshot.Version;
        }

        // 2. Load events after snapshot
        var events = await _eventStore.GetEventsAsync(id, fromVersion);

        foreach (var @event in events)
        {
            aggregate.Apply(@event);
        }

        return aggregate;
    }

    public async Task SaveAsync(T aggregate)
    {
        var newEvents = aggregate.GetUncommittedEvents();

        // 1. Append events
        await _eventStore.AppendAsync(aggregate.Id, newEvents, aggregate.Version);

        // 2. Create snapshot if threshold reached
        if (aggregate.Version % SNAPSHOT_THRESHOLD == 0)
        {
            var snapshot = aggregate.CreateSnapshot();
            await _snapshotStore.SaveAsync(aggregate.Id, aggregate.Version, snapshot);
        }

        aggregate.ClearUncommittedEvents();
    }
}

Benefits Summary

CapabilityHow Event Sourcing Enables It
Audit TrailEvery change is an immutable event
Temporal QueriesReplay events to any point in time
Offline SupportEvents queue locally, merge later
DebuggingReproduce any state by replaying events
AnalyticsRich historical data for ML/reporting
CompliancePCI-DSS, SOX audit requirements
Undo/RedoApply compensating events
TestingGiven-When-Then with events

Summary

Event sourcing with CQRS provides:

  1. Immutable audit log of every business event
  2. Temporal queries to answer “what was the state at time X?”
  3. Offline capability through local event queues
  4. Conflict resolution by comparing event streams
  5. Optimized reads through projected read models
  6. Performance via snapshots for long event streams

The event store schema and domain events catalog in this chapter form the foundation for the offline-first architecture described in Chapter 09.


Next: Chapter 09: Offline-First Design