How to make domain events first-class on the aggregate: dispatch timing, payload design, idempotent consumers, and asserting events in tests.
It was a Monday morning at the creator economy platform I worked at, and a creator opened a support ticket that started with the worst sentence in our triage rotation. Her customers had been charged twice this month, and the app showed each of them with two active subscriptions. We pulled the logs. Apple’s SubscriptionRenewal server-to-server notification had been retried after our endpoint returned a 200 OK a hair too late. Our renewal handler had no idempotency check. Every retry wrote a fresh row into creator_subscriptions. A few thousand customers across dozens of branded apps got billed twice, and the cards had already settled.
The real fix that week wasn’t a frontend patch. It was admitting that our aggregate did not actually own the truth about renewals. It just shoved a row in a table and hoped nothing else looked funny. That’s the failure mode I want to talk about. Domain events on aggregates, done properly, would have killed this bug before it landed.
If you’ve done DDD seriously you already know aggregates are the consistency boundary. They protect invariants. They’re the place a write goes through. What’s less obvious, and what I’ve watched a lot of teams get wrong, is that the same boundary should also announce what changed. Not the application service. Not some after-save hook. The aggregate itself.
I led a portfolio-wide DDD migration at a digital product agency a few years back, several dozen legacy projects pulled into bounded contexts and tactical patterns over many months. The single biggest behavior shift wasn’t the repositories or the value objects. It was treating events as a return value of command methods. The aggregate decides what happened. The infra layer just listens.
import { randomUUID } from 'node:crypto'
export abstract class AggregateRoot<TId> {
readonly id: TId
protected version = 0
private pendingEvents: DomainEvent[] = []
protected constructor(id: TId) {
this.id = id
}
protected record(event: DomainEvent): void {
this.pendingEvents.push(event)
this.version += 1
}
pullEvents(): DomainEvent[] {
const events = this.pendingEvents
this.pendingEvents = []
return events
}
}
export class Order extends AggregateRoot<string> {
private status: 'pending' | 'paid' | 'cancelled' = 'pending'
cancel(reason: string, now: Date): void {
if (this.status === 'cancelled') return
if (this.status === 'paid') {
throw new Error('cannot cancel a paid order, refund instead')
}
this.status = 'cancelled'
this.record({
type: 'OrderCancelled',
eventId: randomUUID(),
aggregateId: this.id,
aggregateVersion: this.version + 1,
occurredAt: now.toISOString(),
payload: { reason }
})
}
}
Notice what’s not there. No eventBus.emit(). No kafkaProducer.publish(). The aggregate doesn’t know about transport, and it shouldn’t. It records that something happened. Something else is responsible for getting that fact out into the world.
OK so here’s where most teams blow it. You call orderRepo.save(order), then eventBus.publish(events) on the next line. Looks fine. Works in tests. Breaks in production the day your broker hiccups and you’ve already committed the write. Now your database says the order is cancelled and your downstream services have no idea.
The fix is a transactional outbox. Same transaction that writes the aggregate writes a row to outbox_events. A separate worker drains the outbox into Kafka (or RabbitMQ, or whatever you’ve got). If the publish fails, the row stays. The worker retries.
import { DataSource, EntityManager } from 'typeorm'
import { OrderEntity } from './order.entity'
import { OutboxEvent } from './outbox.entity'
import { Order } from '../domain/order'
export class OrderRepository {
constructor(private readonly ds: DataSource) {}
async save(order: Order): Promise<void> {
await this.ds.transaction('READ COMMITTED', async (em: EntityManager) => {
await em.upsert(OrderEntity, OrderEntity.fromDomain(order), ['id'])
const events = order.pullEvents()
if (events.length === 0) return
await em.insert(
OutboxEvent,
events.map((e) => ({
eventId: e.eventId,
aggregateId: e.aggregateId,
aggregateType: 'Order',
aggregateVersion: e.aggregateVersion,
type: e.type,
payload: e.payload,
occurredAt: e.occurredAt,
publishedAt: null
}))
)
})
}
}
The point is the events and the state change live or die together. The publisher is a boring background worker that polls the outbox, marks rows as published, and moves on. It can crash. It can lag. None of that corrupts the aggregate.
Now what goes in the event? This is the part teams overthink. Two failure modes show up.
The first is the fat payload, where the event re-encodes the entire aggregate. Looks helpful. Becomes a versioning nightmare the moment your aggregate changes shape. Downstream code has to handle three versions of the same OrderCancelled event in production at once.
The second is the thin payload, the one with just an id. Consumers have to call back to the writer to figure out what actually changed. That defeats the point of events, and on a hot path it’ll smoke your write side.
What I want is the minimum a typical downstream needs, with identifiers and a version. Nothing more.
export type DomainEvent =
| OrderPlaced
| OrderCancelled
| OrderPaid
export interface DomainEventBase {
eventId: string
aggregateId: string
aggregateVersion: number
occurredAt: string
schemaVersion: 1
}
export interface OrderPlaced extends DomainEventBase {
type: 'OrderPlaced'
payload: {
customerId: string
totalCents: number
currency: 'USD' | 'EUR' | 'TRY'
lineItemCount: number
}
}
export interface OrderCancelled extends DomainEventBase {
type: 'OrderCancelled'
payload: {
reason: string
}
}
export interface OrderPaid extends DomainEventBase {
type: 'OrderPaid'
payload: {
paymentId: string
paidAt: string
}
}
The schemaVersion field looks unnecessary. It isn’t. The day you need to add a field with non-trivial semantics, you’ll bump to schemaVersion: 2 and consumers will branch cleanly. Without it you’re guessing.
Here’s where the Apple incident actually got fixed. The structural change was a processed_events table with a unique constraint on (consumer_name, event_id), and every consumer wraps its handler in a transaction that inserts that row first.
import { DataSource } from 'typeorm'
import { ProcessedEvent } from './processed-event.entity'
export class SubscriptionRenewalHandler {
constructor(
private readonly ds: DataSource,
private readonly consumerName = 'subscription_renewal'
) {}
async handle(event: AppleRenewalEvent): Promise<void> {
await this.ds.transaction('READ COMMITTED', async (em) => {
try {
await em.insert(ProcessedEvent, {
consumerName: this.consumerName,
eventId: event.eventId,
processedAt: new Date()
})
} catch (err: unknown) {
if (isUniqueViolation(err)) return
throw err
}
const subscription = await em.findOne(SubscriptionEntity, {
where: { appleOriginalTransactionId: event.payload.originalTransactionId }
})
if (!subscription) {
throw new Error('subscription not found, requeue')
}
subscription.extendUntil(event.payload.expiresAt)
await em.save(subscription)
})
}
}
function isUniqueViolation(err: unknown): boolean {
return typeof err === 'object' && err !== null && 'code' in err && (err as { code: string }).code === '23505'
}
A few things to clock. The dedupe insert is the first thing inside the transaction. If it throws on the unique constraint, the event was already processed and we exit clean. We don’t check in-memory, we don’t keep a Redis set with a TTL. The database is the source of truth because the database is the only thing that survives a pod restart with consistent guarantees.
For the Apple case specifically, the idempotency key was apple_original_transaction_id combined with the notification UUID. Same shape. Apple retries became no-ops at the queue level. The duplicate billing class of bug went away.
The other thing aggregates as event sources buy you is testability that doesn’t make you sad. You drive the command. You pull the events. You assert.
import { Order } from '../src/domain/order'
describe('Order.cancel', () => {
it('records OrderCancelled with reason', () => {
const order = Order.place({ id: 'ord_1', customerId: 'cus_1', items: [/* ... */] })
order.pullEvents()
order.cancel('customer_request', new Date())
const events = order.pullEvents()
expect(events).toHaveLength(1)
expect(events[0]).toMatchObject({
type: 'OrderCancelled',
aggregateId: 'ord_1',
aggregateVersion: 2,
payload: { reason: 'customer_request' }
})
})
it('is a no-op on a cancelled order', () => {
const order = Order.place({ id: 'ord_2', customerId: 'cus_1', items: [/* ... */] })
order.cancel('first', new Date())
order.pullEvents()
order.cancel('second', new Date())
expect(order.pullEvents()).toHaveLength(0)
})
})
No mocks. No infrastructure. The aggregate is just a class that takes input and produces output, and the output happens to be events. That’s the bit teams underrate. At the creator economy platform I worked at the testing culture was strict, BDD with high coverage and mandatory tests before ship, and this style of unit test is the cheapest, fastest piece of the pyramid. Pair it with an integration test that asserts the outbox row exists after OrderRepository.save() and you’ve covered both the domain rule and the infra contract.
I want to be honest about the trade. If you’ve got a small service that’s basically a CRUD wrapper around a single table, you don’t need any of this. No aggregate, no events, no outbox. Just a clean controller and a repository. I’ve shipped plenty of those, and I’m not going to pretend they’re a sin.
The pattern earns its complexity when invariants matter, when downstreams care about state changes, and when the business language has real verbs in it. Place, cancel, refund, ship, dispute. If your domain is mostly “create, read, update, soft-delete”, you’re probably better off without the ceremony. In the agency portfolio migration I led, we kept a handful of bounded contexts thin on purpose because forcing aggregates onto them would have added cost with no return.
Thanks for reading. If you’ve got thoughts, send them my way.