A practical guide to running an event-sourced aggregate inside a DDD monolith on Postgres: append-only log, replay, snapshots, projections.
It was a Wednesday afternoon at the agency I led engineering at in London. A client lawyer was on a call asking why our system couldn’t tell her what the price of a specific subscription was at 11:42 a.m. on a Tuesday three months ago. We had an updated_at. We had a Sentry breadcrumb. Nothing that could rebuild the row. The bug wasn’t a bug, it was a missing capability. That call is the reason I started taking event sourcing seriously.
Event sourcing is a pattern you should mostly avoid, until you can’t. Audit trails that can’t be argued with. Temporal queries. Replayable projections. This piece is how I run an event-sourced aggregate inside a DDD monolith on Postgres. No Kafka. Just a table, a transaction, and discipline.
In the portfolio of legacy projects I migrated to DDD at the agency, two services genuinely needed event sourcing. Subscription billing was one. The other was a healthcare portal where “who changed the dosage at 3 a.m.” was a question that could end someone’s career. CRUD with an audit table didn’t cut it. Audit tables drift, people disable triggers, migrations break them.
The shape I keep coming back to is boring. One table. Append-only. One row per fact. The aggregate owns invariants, you load it by replaying events, you call a method, the method produces new events, you write them back in the same transaction. Everything else is plumbing.
Here’s the table I’ve shipped twice now. Postgres, nothing exotic.
create table domain_events (
id bigserial primary key,
aggregate_id uuid not null,
aggregate_type text not null,
version int not null,
event_type text not null,
payload jsonb not null,
metadata jsonb not null default '{}'::jsonb,
occurred_at timestamptz not null default now(),
unique (aggregate_id, version)
);
create index domain_events_agg_idx
on domain_events (aggregate_id, version);
create index domain_events_type_idx
on domain_events (aggregate_type, id);
The unique constraint on (aggregate_id, version) is the whole optimistic-concurrency story in one line. Two writers trying to append version 17, one wins, the other gets a duplicate key error and retries. No distributed lock needed.
I always put causation_id, correlation_id, and actor in metadata. Six months later when someone asks who triggered the refund cascade on order 882349, you’ll thank past-you.
TypeScript example, lifted from the subscription billing context. Trimmed for the article, but the shape is real.
import { randomUUID } from "node:crypto";
type DomainEvent =
| { type: "SubscriptionStarted"; planId: string; priceCents: number; currency: string; startedAt: string }
| { type: "SubscriptionPriceChanged"; newPriceCents: number; effectiveAt: string }
| { type: "SubscriptionCancelled"; reason: string; cancelledAt: string };
export class Subscription {
private constructor(
public readonly id: string,
public version: number,
private state: { status: "active" | "cancelled"; priceCents: number; planId: string } | null,
private pending: DomainEvent[] = []
) {}
static rehydrate(id: string, events: { version: number; event: DomainEvent }[]): Subscription {
const sub = new Subscription(id, 0, null, []);
for (const { version, event } of events) {
sub.apply(event);
sub.version = version;
}
return sub;
}
static start(planId: string, priceCents: number, currency: string): Subscription {
if (priceCents <= 0) throw new Error("price must be positive");
const sub = new Subscription(randomUUID(), 0, null, []);
sub.record({
type: "SubscriptionStarted",
planId,
priceCents,
currency,
startedAt: new Date().toISOString(),
});
return sub;
}
changePrice(newPriceCents: number, effectiveAt: Date) {
if (!this.state || this.state.status !== "active") {
throw new Error("cannot reprice an inactive subscription");
}
if (newPriceCents === this.state.priceCents) return;
this.record({
type: "SubscriptionPriceChanged",
newPriceCents,
effectiveAt: effectiveAt.toISOString(),
});
}
cancel(reason: string) {
if (!this.state || this.state.status === "cancelled") return;
this.record({ type: "SubscriptionCancelled", reason, cancelledAt: new Date().toISOString() });
}
pendingEvents(): DomainEvent[] {
return this.pending;
}
private record(event: DomainEvent) {
this.apply(event);
this.pending.push(event);
}
private apply(event: DomainEvent) {
switch (event.type) {
case "SubscriptionStarted":
this.state = { status: "active", priceCents: event.priceCents, planId: event.planId };
break;
case "SubscriptionPriceChanged":
if (this.state) this.state.priceCents = event.newPriceCents;
break;
case "SubscriptionCancelled":
if (this.state) this.state.status = "cancelled";
break;
}
}
}
Notice the split. Invariant checks live in command methods. State transitions live in apply. On rehydrate, only call apply. Never record. Cross those wires and you’ll re-emit events during replay and corrupt the log. I did exactly that on my second event-sourced service. Took embarrassingly long to figure out.
import type { Pool } from "pg";
export class SubscriptionRepository {
constructor(private readonly db: Pool) {}
async load(id: string): Promise<Subscription | null> {
const { rows } = await this.db.query(
`select version, payload as event
from domain_events
where aggregate_id = $1 and aggregate_type = 'Subscription'
order by version asc`,
[id]
);
if (rows.length === 0) return null;
return Subscription.rehydrate(id, rows);
}
async save(sub: Subscription, metadata: Record<string, unknown>) {
const events = sub.pendingEvents();
if (events.length === 0) return;
const client = await this.db.connect();
try {
await client.query("begin");
let v = sub.version;
for (const event of events) {
v += 1;
await client.query(
`insert into domain_events
(aggregate_id, aggregate_type, version, event_type, payload, metadata)
values ($1, 'Subscription', $2, $3, $4, $5)`,
[sub.id, v, event.type, event, metadata]
);
}
await client.query("commit");
sub.version = v;
} catch (err: any) {
await client.query("rollback");
if (err.code === "23505") throw new Error("ConcurrencyConflict");
throw err;
} finally {
client.release();
}
}
}
The save runs inside a single Postgres transaction. If you’re also writing to a projection table in the same domain, do it in the same transaction. That’s what lets you skip a separate broker for a long time. Outbox pattern if and only if you actually need to cross the process boundary.
For most aggregates, replay is fine. Subscription rarely had more than a few dozen events per lifetime. Pain shows up on long-lived aggregates. At the trading platform I architected, a portfolio aggregate crossed a hundred thousand events. Loading one started taking 600 ms. That’s when you snapshot.
create table aggregate_snapshots (
aggregate_id uuid primary key,
aggregate_type text not null,
version int not null,
state jsonb not null,
created_at timestamptz not null default now()
);
Load path becomes: read latest snapshot, replay events with version > snapshot.version. Write a snapshot every N events, or on a schedule. I keep N around 200 for active aggregates, never below 50. Below 50 the maintenance cost dominates. Don’t put snapshotting inline in the command path. Background job. Always.
Projections turn the log into something the UI can query in 5 ms. A denormalized table built by a consumer reading domain_events in order. Rule I repeat to engineers I mentor: projections are derived. If one is wrong, nuke it and rebuild from the log.
export async function projectSubscriptionEvent(
db: Pool,
ev: { version: number; payload: DomainEvent; aggregate_id: string }
) {
const e = ev.payload;
switch (e.type) {
case "SubscriptionStarted":
await db.query(
`insert into subscription_read
(id, status, plan_id, price_cents, currency, version)
values ($1, 'active', $2, $3, $4, $5)
on conflict (id) do nothing`,
[ev.aggregate_id, e.planId, e.priceCents, e.currency, ev.version]
);
break;
case "SubscriptionPriceChanged":
await db.query(
`update subscription_read
set price_cents = $1, version = $2
where id = $3 and version < $2`,
[e.newPriceCents, ev.version, ev.aggregate_id]
);
break;
case "SubscriptionCancelled":
await db.query(
`update subscription_read
set status = 'cancelled', version = $1
where id = $2 and version < $1`,
[ev.version, ev.aggregate_id]
);
break;
}
}
The version < $2 clause on the update is the idempotency safety net. If a consumer retries an event it already projected, the update is a no-op. Cheap, effective, no distributed coordination.
A lesson I bring up every time I see someone building a projection: derived indexes need a freshness metric, not a throughput metric. “Consumer is consuming” is not the same as “projection is correct”. Add a heartbeat comparing max(domain_events.id) against max(projection.last_event_id) and page when the gap exceeds 5 minutes in business hours.
On aggregates with high write volume, the schema is the projection, not the truth. When you need to add a field to a read model, the right move is to introduce it as a derived projection change: build a fresh projection table next to the old one, populate it from the event log, switch reads when caught up, then drop the old one. Instead of a single blocking migration, you get a zero-downtime cutover. The event log is your source of truth; the table is just a view over it.
Most aggregates don’t need event sourcing. If your domain is “users have settings they update”, an audit table and updated_at are fine. Cost is real: more code per aggregate, harder debugging, projections to keep healthy, schema versioning for events. Across the portfolio I migrated into DDD, maybe one in ten aggregates earned the treatment.
Signal I use. If someone in product or legal could plausibly ask “what did this look like at this moment in the past”, you want events. Otherwise you don’t.
(aggregate_id, version) replaces distributed locks.Thanks for reading. If you’ve got thoughts, send them my way.