Chapter 08: Event Sourcing & CQRS
Append-Only Event Architecture
This chapter details the event sourcing architecture for the POS Platform. Event sourcing provides complete audit trails, temporal queries, and enables offline conflict resolution - critical capabilities for a retail POS system.
Why Event Sourcing for POS?
Traditional CRUD systems store only current state. Event sourcing stores every change as an immutable event, enabling:
Traditional CRUD vs Event Sourcing
==================================
CRUD Approach:
+------------------+
| inventory_items |
|------------------|
| sku: NXP001 |
| quantity: 45 | <- Only current state
| updated_at: now |
+------------------+
Event Sourcing Approach:
+------------------+
| events |
|------------------|
| InventoryReceived: +100 @ 2025-01-01 09:00 |
| ItemSold: -2 @ 2025-01-01 10:15 |
| ItemSold: -1 @ 2025-01-01 11:30 |
| ItemSold: -3 @ 2025-01-01 14:22 |
| AdjustmentMade: -49 @ 2025-01-01 16:00 | <- Caught discrepancy!
| ItemSold: -1 @ 2025-01-02 09:15 |
| Current State: 45 (sum of all events) |
+------------------+
Benefits for Retail POS
| Benefit | Description |
|---|---|
| Complete Audit Trail | Every sale, void, refund, adjustment is recorded forever |
| Temporal Queries | “What was our inventory on December 15th at 3pm?” |
| Offline Sync | Events queue locally, merge when online |
| Conflict Resolution | Compare event streams, not states |
| Debugging | Replay events to reproduce issues |
| Compliance | PCI-DSS, SOX require transaction logs |
Event Sourcing Architecture
Event Sourcing Architecture
===========================
+-------------------------------------------------------------------------+
| POS CLIENT |
| |
| +------------------+ +-------------------+ +-----------------+ |
| | Command Handler | | Event Store | | Projector | |
| | | | (Local SQLite) | | (Read Model) | |
| | CreateSale |--->| |--->| | |
| | VoidSale | | SaleCreated | | sale_summaries | |
| | AddPayment | | ItemAdded | | inventory_view | |
| +------------------+ | PaymentReceived | +-----------------+ |
| +-------------------+ |
| | |
+-------------------------------------------------------------------------+
| Sync
v
+-------------------------------------------------------------------------+
| CENTRAL API |
| |
| +------------------+ +-------------------+ +-----------------+ |
| | Command Handler | | Event Store | | Projector | |
| | (Validates) | | (PostgreSQL) | | (Read Model) | |
| | |<---| |--->| | |
| | Deduplication | | All tenant events | | sales | |
| | Conflict Check | | Append-only | | inventory_items | |
| +------------------+ | Immutable | | customers | |
| +-------------------+ +-----------------+ |
+-------------------------------------------------------------------------+
CQRS Pattern
Command Query Responsibility Segregation separates write and read models:
CQRS Pattern
============
+----------------------+
| User Action |
+----------+-----------+
|
+----------------------+----------------------+
| |
v v
+-------------------+ +-------------------+
| COMMAND | | QUERY |
| (Write) | | (Read) |
+-------------------+ +-------------------+
| |
v v
+-------------------+ +-------------------+
| Command Handler | | Query Handler |
| - Validate | | - No validation |
| - Business rules | | - Fast lookup |
| - Generate events | | - Denormalized |
+-------------------+ +-------------------+
| ^
v |
+-------------------+ +-------------------+
| Event Store |----------------------->| Read Models |
| (Append-only) | Projections | (Optimized) |
+-------------------+ +-------------------+
Write Side (Commands)
// Commands - Express intent
public record CreateSaleCommand(
Guid SaleId,
Guid LocationId,
Guid EmployeeId,
Guid? CustomerId,
List<SaleLineItemDto> LineItems
);
public record VoidSaleCommand(
Guid SaleId,
Guid EmployeeId,
string Reason
);
public record AddPaymentCommand(
Guid SaleId,
string PaymentMethod,
decimal Amount,
string? Reference
);
Read Side (Queries)
// Queries - Request data
public record GetSaleByIdQuery(Guid SaleId);
public record GetDailySalesQuery(Guid LocationId, DateTime Date);
public record GetInventoryLevelQuery(string Sku, Guid LocationId);
// Read models - Optimized for queries
public class SaleSummaryView
{
public Guid Id { get; set; }
public string SaleNumber { get; set; }
public string CustomerName { get; set; } // Denormalized
public string EmployeeName { get; set; } // Denormalized
public decimal Total { get; set; }
public string Status { get; set; }
public DateTime CreatedAt { get; set; }
}
Event Store Schema
The append-only event store is the source of truth:
-- Event Store Schema
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
event_id UUID UNIQUE NOT NULL DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL, -- 'Sale', 'Inventory', 'Customer'
aggregate_id UUID NOT NULL, -- The entity this event belongs to
event_type VARCHAR(100) NOT NULL, -- 'SaleCreated', 'ItemAdded'
event_data JSONB NOT NULL, -- Full event payload
metadata JSONB NOT NULL DEFAULT '{}', -- Correlation, causation IDs
version INTEGER NOT NULL, -- Aggregate version (for optimistic concurrency)
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_by UUID, -- Employee who caused the event
-- Optimistic concurrency: aggregate_id + version must be unique
UNIQUE (aggregate_type, aggregate_id, version)
);
-- Indexes for common queries
CREATE INDEX idx_events_aggregate ON events (aggregate_type, aggregate_id);
CREATE INDEX idx_events_type ON events (event_type);
CREATE INDEX idx_events_created_at ON events USING BRIN (created_at);
CREATE INDEX idx_events_metadata ON events USING GIN (metadata);
-- Snapshots table (for performance on long event streams)
CREATE TABLE snapshots (
id BIGSERIAL PRIMARY KEY,
aggregate_type VARCHAR(100) NOT NULL,
aggregate_id UUID NOT NULL,
version INTEGER NOT NULL,
state JSONB NOT NULL, -- Serialized aggregate state
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (aggregate_type, aggregate_id)
);
-- Outbox table (for reliable event publishing)
CREATE TABLE event_outbox (
id BIGSERIAL PRIMARY KEY,
event_id UUID NOT NULL REFERENCES events(event_id),
destination VARCHAR(100) NOT NULL, -- 'signalr', 'webhook', 'sync'
status VARCHAR(20) DEFAULT 'pending',
attempts INTEGER DEFAULT 0,
last_error TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
processed_at TIMESTAMPTZ
);
Domain Events Catalog
Sale Aggregate Events
Sale Events
===========
SaleCreated
+-----------------------+----------------------------------------+
| Field | Description |
+-----------------------+----------------------------------------+
| sale_id | UUID of the new sale |
| sale_number | Human-readable sale number |
| location_id | Where the sale occurred |
| register_id | Which register |
| employee_id | Who created the sale |
| customer_id | Customer (if any) |
| created_at | Timestamp |
+-----------------------+----------------------------------------+
SaleLineItemAdded
+-----------------------+----------------------------------------+
| sale_id | Parent sale |
| line_item_id | UUID of the line item |
| product_id | Product being sold |
| variant_id | Variant (if any) |
| sku | SKU at time of sale |
| name | Product name at time of sale |
| quantity | Quantity sold |
| unit_price | Price per unit |
| discount_amount | Line discount |
| tax_amount | Line tax |
+-----------------------+----------------------------------------+
SaleLineItemRemoved
+-----------------------+----------------------------------------+
| sale_id | Parent sale |
| line_item_id | UUID of removed item |
| reason | Why removed |
+-----------------------+----------------------------------------+
PaymentReceived
+-----------------------+----------------------------------------+
| sale_id | Parent sale |
| payment_id | UUID of payment |
| payment_method | cash, credit, debit, etc. |
| amount | Payment amount |
| reference | Card last 4, check #, etc. |
| auth_code | Authorization code |
+-----------------------+----------------------------------------+
SaleCompleted
+-----------------------+----------------------------------------+
| sale_id | The sale being completed |
| subtotal | Final subtotal |
| discount_total | Total discounts |
| tax_total | Total tax |
| total | Final total |
| completed_at | Timestamp |
+-----------------------+----------------------------------------+
SaleVoided
+-----------------------+----------------------------------------+
| sale_id | The voided sale |
| voided_by | Employee who voided |
| reason | Void reason |
| voided_at | Timestamp |
+-----------------------+----------------------------------------+
Inventory Aggregate Events
Inventory Events
================
InventoryReceived
+-----------------------+----------------------------------------+
| location_id | Where received |
| product_id | Product |
| variant_id | Variant (if any) |
| quantity | Amount received |
| cost | Unit cost |
| reference | PO number, transfer # |
| received_by | Employee |
+-----------------------+----------------------------------------+
InventoryAdjusted
+-----------------------+----------------------------------------+
| location_id | Location |
| product_id | Product |
| variant_id | Variant (if any) |
| quantity_change | +/- amount |
| new_quantity | New on-hand quantity |
| reason | count, damage, theft, return |
| adjusted_by | Employee |
| notes | Additional context |
+-----------------------+----------------------------------------+
InventorySold
+-----------------------+----------------------------------------+
| location_id | Where sold |
| product_id | Product |
| variant_id | Variant (if any) |
| quantity | Amount sold (positive) |
| sale_id | Related sale |
+-----------------------+----------------------------------------+
InventoryTransferred
+-----------------------+----------------------------------------+
| transfer_id | Transfer document |
| from_location_id | Source location |
| to_location_id | Destination location |
| product_id | Product |
| variant_id | Variant (if any) |
| quantity | Amount transferred |
| transferred_by | Employee |
+-----------------------+----------------------------------------+
InventoryCounted
+-----------------------+----------------------------------------+
| location_id | Location |
| product_id | Product |
| variant_id | Variant |
| expected_quantity | System quantity before count |
| actual_quantity | Physical count |
| variance | Difference |
| counted_by | Employee |
| count_session_id | Batch count session |
+-----------------------+----------------------------------------+
Customer Aggregate Events
Customer Events
===============
CustomerCreated
+-----------------------+----------------------------------------+
| customer_id | New customer UUID |
| customer_number | Human-readable ID |
| first_name | First name |
| last_name | Last name |
| email | Email address |
| phone | Phone number |
| created_by | Employee |
+-----------------------+----------------------------------------+
CustomerUpdated
+-----------------------+----------------------------------------+
| customer_id | Customer UUID |
| changes | Map of field -> {old, new} |
| updated_by | Employee |
+-----------------------+----------------------------------------+
LoyaltyPointsEarned
+-----------------------+----------------------------------------+
| customer_id | Customer |
| points | Points earned |
| sale_id | Related sale |
| new_balance | Updated balance |
+-----------------------+----------------------------------------+
LoyaltyPointsRedeemed
+-----------------------+----------------------------------------+
| customer_id | Customer |
| points | Points redeemed |
| sale_id | Related sale |
| new_balance | Updated balance |
+-----------------------+----------------------------------------+
StoreCreditIssued
+-----------------------+----------------------------------------+
| customer_id | Customer |
| credit_id | Credit UUID |
| amount | Credit amount |
| reason | Why issued |
| issued_by | Employee |
+-----------------------+----------------------------------------+
Employee Aggregate Events
Employee Events
===============
EmployeeClockIn
+-----------------------+----------------------------------------+
| employee_id | Employee UUID |
| location_id | Where clocking in |
| shift_id | New shift UUID |
| clocked_in_at | Timestamp |
+-----------------------+----------------------------------------+
EmployeeClockOut
+-----------------------+----------------------------------------+
| employee_id | Employee UUID |
| shift_id | Shift being closed |
| clocked_out_at | Timestamp |
| break_minutes | Total break time |
+-----------------------+----------------------------------------+
EmployeeBreakStarted
+-----------------------+----------------------------------------+
| employee_id | Employee UUID |
| shift_id | Current shift |
| started_at | Break start time |
+-----------------------+----------------------------------------+
EmployeeBreakEnded
+-----------------------+----------------------------------------+
| employee_id | Employee UUID |
| shift_id | Current shift |
| ended_at | Break end time |
| duration_minutes | Break duration |
+-----------------------+----------------------------------------+
CashDrawer Aggregate Events
Cash Drawer Events
==================
DrawerOpened
+-----------------------+----------------------------------------+
| drawer_id | Drawer UUID |
| register_id | Register UUID |
| employee_id | Who opened |
| opening_balance | Starting cash amount |
| opened_at | Timestamp |
+-----------------------+----------------------------------------+
DrawerCashDrop
+-----------------------+----------------------------------------+
| drawer_id | Drawer UUID |
| amount | Amount dropped to safe |
| employee_id | Who dropped |
| dropped_at | Timestamp |
+-----------------------+----------------------------------------+
DrawerPaidIn
+-----------------------+----------------------------------------+
| drawer_id | Drawer UUID |
| amount | Amount added |
| reason | Why (petty cash, etc.) |
| employee_id | Who added |
+-----------------------+----------------------------------------+
DrawerPaidOut
+-----------------------+----------------------------------------+
| drawer_id | Drawer UUID |
| amount | Amount removed |
| reason | Why (vendor payment, etc.) |
| employee_id | Who removed |
+-----------------------+----------------------------------------+
DrawerClosed
+-----------------------+----------------------------------------+
| drawer_id | Drawer UUID |
| employee_id | Who closed |
| closing_balance | Actual cash counted |
| expected_balance | System calculated |
| variance | Difference (over/short) |
| closed_at | Timestamp |
+-----------------------+----------------------------------------+
Event Projection Patterns
Projections transform events into read models:
Projection Architecture
=======================
+-------------------+
| Event Stream |
| |
| SaleCreated |
| ItemAdded |
| ItemAdded |
| PaymentReceived |
| SaleCompleted |
+--------+----------+
|
| Projector reads events
v
+-------------------+ +-------------------+ +-------------------+
| Sale Projector | |Inventory Projector| |Customer Projector |
| | | | | |
| - Build sale view | | - Update stock | | - Update stats |
| - Calculate totals| | - Track movements | | - Loyalty points |
+--------+----------+ +--------+----------+ +--------+----------+
| | |
v v v
+-------------------+ +-------------------+ +-------------------+
| sale_summaries | | inventory_levels | | customer_stats |
| (Read Model) | | (Read Model) | | (Read Model) |
+-------------------+ +-------------------+ +-------------------+
Example Projector Implementation
// SaleProjector.cs
public class SaleProjector : IEventHandler
{
private readonly IDbContextFactory<ReadModelDbContext> _dbFactory;
public SaleProjector(IDbContextFactory<ReadModelDbContext> dbFactory)
{
_dbFactory = dbFactory;
}
public async Task HandleAsync(SaleCreated @event)
{
await using var db = await _dbFactory.CreateDbContextAsync();
var view = new SaleSummaryView
{
Id = @event.SaleId,
SaleNumber = @event.SaleNumber,
LocationId = @event.LocationId,
EmployeeId = @event.EmployeeId,
CustomerId = @event.CustomerId,
Status = "draft",
Subtotal = 0,
Total = 0,
CreatedAt = @event.CreatedAt
};
db.SaleSummaries.Add(view);
await db.SaveChangesAsync();
}
public async Task HandleAsync(SaleLineItemAdded @event)
{
await using var db = await _dbFactory.CreateDbContextAsync();
var sale = await db.SaleSummaries.FindAsync(@event.SaleId);
if (sale == null) return;
var lineTotal = @event.Quantity * @event.UnitPrice - @event.DiscountAmount;
sale.Subtotal += lineTotal;
sale.ItemCount += @event.Quantity;
await db.SaveChangesAsync();
}
public async Task HandleAsync(SaleCompleted @event)
{
await using var db = await _dbFactory.CreateDbContextAsync();
var sale = await db.SaleSummaries.FindAsync(@event.SaleId);
if (sale == null) return;
sale.Status = "completed";
sale.DiscountTotal = @event.DiscountTotal;
sale.TaxTotal = @event.TaxTotal;
sale.Total = @event.Total;
sale.CompletedAt = @event.CompletedAt;
await db.SaveChangesAsync();
}
public async Task HandleAsync(SaleVoided @event)
{
await using var db = await _dbFactory.CreateDbContextAsync();
var sale = await db.SaleSummaries.FindAsync(@event.SaleId);
if (sale == null) return;
sale.Status = "voided";
sale.VoidedAt = @event.VoidedAt;
sale.VoidedBy = @event.VoidedBy;
sale.VoidReason = @event.Reason;
await db.SaveChangesAsync();
}
}
Temporal Queries
Event sourcing enables powerful temporal queries:
-- What was inventory on a specific date?
SELECT
product_id,
SUM(CASE
WHEN event_type = 'InventoryReceived' THEN (event_data->>'quantity')::int
WHEN event_type = 'InventorySold' THEN -(event_data->>'quantity')::int
WHEN event_type = 'InventoryAdjusted' THEN (event_data->>'quantity_change')::int
ELSE 0
END) as quantity
FROM events
WHERE aggregate_type = 'Inventory'
AND (event_data->>'location_id')::uuid = '...'
AND created_at <= '2025-12-15 15:00:00'
GROUP BY product_id;
-- Sales trend for specific product
SELECT
date_trunc('day', created_at) as date,
SUM((event_data->>'quantity')::int) as units_sold
FROM events
WHERE event_type = 'InventorySold'
AND (event_data->>'product_id')::uuid = '...'
AND created_at >= NOW() - INTERVAL '30 days'
GROUP BY date_trunc('day', created_at)
ORDER BY date;
-- Audit trail for specific sale
SELECT
event_type,
event_data,
created_at,
created_by
FROM events
WHERE aggregate_type = 'Sale'
AND aggregate_id = '...'
ORDER BY version;
Snapshots for Performance
For aggregates with many events, snapshots prevent replaying the entire stream:
Snapshot Strategy
=================
Without Snapshots:
Event 1 -> Event 2 -> ... -> Event 5000 -> Current State
(Slow for aggregates with many events)
With Snapshots:
Event 1 -> ... -> Event 1000 -> [Snapshot @ v1000]
|
-> Event 1001 -> ... -> Event 1050 -> Current State
(Load snapshot, then only replay 50 events)
Snapshot Implementation
// AggregateRepository.cs
public class AggregateRepository<T> where T : AggregateRoot
{
private readonly IEventStore _eventStore;
private readonly ISnapshotStore _snapshotStore;
private const int SNAPSHOT_THRESHOLD = 100;
public async Task<T> LoadAsync(Guid id)
{
var aggregate = Activator.CreateInstance<T>();
// 1. Try to load snapshot
var snapshot = await _snapshotStore.GetAsync<T>(id);
int fromVersion = 0;
if (snapshot != null)
{
aggregate.RestoreFromSnapshot(snapshot.State);
fromVersion = snapshot.Version;
}
// 2. Load events after snapshot
var events = await _eventStore.GetEventsAsync(id, fromVersion);
foreach (var @event in events)
{
aggregate.Apply(@event);
}
return aggregate;
}
public async Task SaveAsync(T aggregate)
{
var newEvents = aggregate.GetUncommittedEvents();
// 1. Append events
await _eventStore.AppendAsync(aggregate.Id, newEvents, aggregate.Version);
// 2. Create snapshot if threshold reached
if (aggregate.Version % SNAPSHOT_THRESHOLD == 0)
{
var snapshot = aggregate.CreateSnapshot();
await _snapshotStore.SaveAsync(aggregate.Id, aggregate.Version, snapshot);
}
aggregate.ClearUncommittedEvents();
}
}
Benefits Summary
| Capability | How Event Sourcing Enables It |
|---|---|
| Audit Trail | Every change is an immutable event |
| Temporal Queries | Replay events to any point in time |
| Offline Support | Events queue locally, merge later |
| Debugging | Reproduce any state by replaying events |
| Analytics | Rich historical data for ML/reporting |
| Compliance | PCI-DSS, SOX audit requirements |
| Undo/Redo | Apply compensating events |
| Testing | Given-When-Then with events |
Summary
Event sourcing with CQRS provides:
- Immutable audit log of every business event
- Temporal queries to answer “what was the state at time X?”
- Offline capability through local event queues
- Conflict resolution by comparing event streams
- Optimized reads through projected read models
- Performance via snapshots for long event streams
The event store schema and domain events catalog in this chapter form the foundation for the offline-first architecture described in Chapter 09.