How I use process managers and sagas to keep aggregates consistent across long-running workflows, with compensation, watchdogs, and tests for partial failure.
A creator opened a support ticket on a Wednesday. “All my customers got charged twice this month and the app shows them as having two active subscriptions.” This was on the branded mobile app pipeline at the creator economy platform I worked at. Apple had retried a renewal notification because our endpoint took a hair too long to return 200. We had no idempotency check on the renewal handler. Every retry inserted a new subscription row, hit a separate Billing aggregate path, and skipped the notification aggregate entirely. Across dozens of branded apps, customers had two active subscriptions and a real charge against both.
The fix wasn’t “add a unique constraint.” Well, that was part of it. The deeper fix was admitting that we didn’t have a process manager for this workflow. We had a controller, three aggregates, and a prayer.
That’s what this post is about. When you have a workflow that touches more than one aggregate, you either write a process manager (or a saga, depending on the shape) or you write a future incident.
Aggregates protect their own invariants inside a transaction boundary. That’s the whole point of an aggregate. The downside is that any business workflow spanning two or more aggregates can’t be transactional, at least not without distributed two-phase commits, which I have not seen work cleanly at any meaningful scale.
So you need somewhere to live the workflow. Not the aggregate (it doesn’t know about the others). Not the controller (it’s request-scoped and dies after the response). The domain layer needs an object whose job is to react to events from one aggregate and issue commands to others. That’s a process manager.
Sagas are the special case where the workflow is a linear N-step transaction with compensations. Process manager is the general case. I default to process manager unless the steps are obviously a chain.
I’ll take a position here. A saga is the right pattern when you have an Order, a Payment, an Inventory reservation, and a Shipment, and they need to happen in order with a clean compensation for each step backward. That’s e-commerce. Linear, well-understood, you’ve seen the diagram.
A process manager is what you want when the workflow has branches, retries, timeouts, and reacts to events that may not arrive in order. Subscription billing is process manager work, not saga work. The renewal might succeed, fail with a billing retry, fail permanently, or arrive twice. There’s no clean “step 3 of 5.” There’s a state machine.
People conflate them because Java frameworks did. Don’t.
Here’s the shape I land on, in NestJS. State is persisted, transitions are explicit, idempotency keys come from the upstream system not from us.
import { Injectable, Logger } from '@nestjs/common';
import { EventBus, ofType, Saga, ICommand } from '@nestjs/cqrs';
import { Observable } from 'rxjs';
import { map, filter } from 'rxjs/operators';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { AppleRenewalReceived } from '../events/apple-renewal.received';
import { ChargeSubscriptionCommand } from '../commands/charge-subscription.command';
import { NotifyCreatorCommand } from '../commands/notify-creator.command';
import { CompensateChargeCommand } from '../commands/compensate-charge.command';
import { SubscriptionRenewalProcess } from './subscription-renewal.process';
type State = 'STARTED' | 'CHARGED' | 'NOTIFIED' | 'FAILED' | 'COMPENSATING' | 'DONE';
@Injectable()
export class SubscriptionRenewalManager {
private readonly logger = new Logger(SubscriptionRenewalManager.name);
constructor(
@InjectRepository(SubscriptionRenewalProcess)
private readonly repo: Repository<SubscriptionRenewalProcess>,
private readonly bus: EventBus,
) {}
@Saga()
onRenewalReceived = (events$: Observable<any>): Observable<ICommand> =>
events$.pipe(
ofType(AppleRenewalReceived),
filter((e) => !!e.originalTransactionId && !!e.notificationUUID),
map(async (e) => {
// idempotency key comes from Apple, never from us
const idempotencyKey = `${e.originalTransactionId}:${e.notificationUUID}`;
const existing = await this.repo.findOne({ where: { idempotencyKey } });
if (existing && existing.state !== 'FAILED') {
this.logger.log(`skipping duplicate renewal ${idempotencyKey}`);
return null;
}
const process = existing ?? this.repo.create({
idempotencyKey,
subscriptionId: e.subscriptionId,
state: 'STARTED' as State,
});
await this.repo.save(process);
return new ChargeSubscriptionCommand({
subscriptionId: e.subscriptionId,
processId: process.id,
amountCents: e.amountCents,
});
}),
filter((c): c is ICommand => c !== null),
);
}
A few things that matter here. The idempotency key is originalTransactionId:notificationUUID. Both come from Apple. If Apple retries, we hit the same row. If we generate our own UUIDs we are back to square one. Second, the state is persisted in a real table, not in memory. A pod can die mid-workflow and another pod picks it up. Third, this is @Saga() in NestJS naming, but conceptually it’s a process manager because the workflow has branches that the saga decorator just maps the entry edge of.
The most common DDD-tutorial mistake I see is treating compensations as transactional rollbacks. They are not. You can’t rollback a charge to Apple from your side. You can only request a refund, which is itself a forward operation that may succeed, fail, or queue.
import { Injectable } from '@nestjs/common';
import { CommandHandler, ICommandHandler, EventBus } from '@nestjs/cqrs';
import { CompensateChargeCommand } from '../commands/compensate-charge.command';
import { RefundRequested } from '../events/refund-requested.event';
@CommandHandler(CompensateChargeCommand)
export class CompensateChargeHandler
implements ICommandHandler<CompensateChargeCommand>
{
constructor(private readonly bus: EventBus) {}
async execute(cmd: CompensateChargeCommand): Promise<void> {
// forward recovery: we don't 'undo' the charge, we request a domain refund
this.bus.publish(
new RefundRequested({
chargeId: cmd.chargeId,
reason: cmd.reason ?? 'process_manager_compensation',
requestedAt: new Date(),
}),
);
}
}
Then the Refund aggregate has its own lifecycle. RefundRequested becomes RefundApproved becomes RefundSettled. The process manager listens for the terminal state and marks itself DONE or FAILED. If the refund itself fails, you don’t try to “uncompensate.” You page a human.
A compensation step that pretends to be a rollback will eventually leave you with a charge on one side and a phantom refund on the other. That’s how you get a creator escalating to legal.
Here’s the thing every process manager tutorial skips. A process that’s failing loud is fine. A process that’s silently stuck is a slow-burning fire.
In the same renewal pipeline, I’d seen this pattern at a different employer too. Submissions came back as 200 OK from Apple’s App Store Connect API but were actually being silently throttled. We had auto-retry. The auto-retry made it worse because the upstream was treating duplicates as new submissions. Hundreds of customer apps ended up stuck with conflicting metadata. The lesson, and it carries directly: don’t trust the response of a write to a human-moderated upstream. Read after write against the upstream’s source of truth.
For process managers, that translates to: every active process has a heartbeat row in your DB. A watchdog runs on a schedule and flags anything that hasn’t transitioned in N minutes.
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { Queue } from 'bullmq';
import { InjectQueue } from '@nestjs/bullmq';
import { InjectRepository } from '@nestjs/typeorm';
import { LessThan, Not, In, Repository } from 'typeorm';
import { SubscriptionRenewalProcess } from './subscription-renewal.process';
@Injectable()
export class StuckProcessWatchdog {
private readonly logger = new Logger(StuckProcessWatchdog.name);
private readonly STUCK_AFTER_MS = 15 * 60 * 1000;
constructor(
@InjectRepository(SubscriptionRenewalProcess)
private readonly repo: Repository<SubscriptionRenewalProcess>,
@InjectQueue('alerts') private readonly alerts: Queue,
) {}
@Cron(CronExpression.EVERY_MINUTE)
async sweep(): Promise<void> {
const threshold = new Date(Date.now() - this.STUCK_AFTER_MS);
const stuck = await this.repo.find({
where: {
state: Not(In(['DONE', 'FAILED'])),
updatedAt: LessThan(threshold),
},
take: 200,
});
if (stuck.length === 0) return;
this.logger.warn(`found ${stuck.length} stuck processes`);
await this.alerts.add('stuck-processes', {
count: stuck.length,
ids: stuck.map((p) => p.id),
oldestUpdatedAt: stuck[0].updatedAt,
});
}
}
This is not glamorous code. It’s also the one piece that’s saved me hours of customer-pain triage every time I’ve shipped it. A frozen workflow is worse than a failed one because nobody’s paging on it.
Happy-path tests for a process manager are basically a smoke test. The whole point of a process manager is to handle partial failure. So that’s what the tests need to do.
import { Test } from '@nestjs/testing';
import { CqrsModule, EventBus } from '@nestjs/cqrs';
import { SubscriptionRenewalManager } from './subscription-renewal.manager';
import { AppleRenewalReceived } from '../events/apple-renewal.received';
import { ChargeFailed } from '../events/charge-failed.event';
describe('SubscriptionRenewalManager - partial failure', () => {
it('fires compensation when charge fails after subscription mark', async () => {
const module = await Test.createTestingModule({
imports: [CqrsModule],
providers: [SubscriptionRenewalManager /* + mocked repo, queues */],
}).compile();
await module.init();
const bus = module.get(EventBus);
const issued: any[] = [];
bus.subscribe((e) => issued.push(e));
const renewal = new AppleRenewalReceived({
originalTransactionId: 'apple_123',
notificationUUID: 'n_abc',
subscriptionId: 'sub_42',
amountCents: 1999,
});
bus.publish(renewal);
bus.publish(renewal); // duplicate delivery, must be idempotent
bus.publish(
new ChargeFailed({
subscriptionId: 'sub_42',
reason: 'card_declined',
processId: 'proc_1',
}),
);
// assert: exactly one ChargeSubscriptionCommand, exactly one CompensateChargeCommand
// assert: state machine reaches COMPENSATING then DONE
});
});
I don’t write tests for “the renewal succeeded and everything was fine.” I write tests for “the renewal arrived twice, the charge failed on retry, the refund came back delayed.” Those tests catch the cases that page you at 3 a.m.
Thanks for reading. If you’ve got thoughts, send them my way.