Microservice Data Replication With Events

How I use event-carried state transfer plus a bootstrap snapshot to keep materialized views in sync across services, and the lag metrics I actually page on.

The combat-sports tournament platform I CTO’d in London had hundreds of microservices and a rankings service that could not call the athletes service on the read path. Saturday tournaments, public leaderboards, federations watching live. So we did the obvious thing. Replicated the slice of athlete data rankings cared about into rankings’ own Postgres, off Kafka events. Worked. Until it didn’t.

This pattern has a name now, event-carried state transfer. Back then we called it “stop calling each other on the hot path.” If your services need to read another service’s data fast, this is the article.

Why replicate at all

Textbook answer is “each service owns its data.” Great. Now checkout needs the customer’s tier, shipping needs the address, recommendations need the locale, and all of them hit customer-service on every request. Distributed monolith with extra network hops.

Fix isn’t shared databases. Each consumer keeps its own materialized view of the bits it needs, refreshed via events. Reads stay local. Source of truth lives in one place. Eventual consistency is the price.

I went back and forth on this for years. At a real-time trading platform I architected, everything stayed synchronous because milliseconds mattered. At the creator-economy platform later, replication was the default. For most product backends, replication wins.

The shape of an event

The event has to carry enough state for the consumer to update its view without calling back. If a consumer has to call the producer to enrich the event, you’ve rebuilt synchronous fan-out with extra latency.

// shared/events/athlete.ts
export type AthleteUpdatedV1 = {
  schemaVersion: 1;
  eventId: string;             // uuid v4, for idempotency
  occurredAt: string;          // ISO 8601 in UTC
  aggregateId: string;         // athleteId
  aggregateVersion: number;    // monotonic per athlete
  payload: {
    athleteId: string;
    displayName: string;
    countryCode: string;
    weightClass: 'flyweight' | 'bantamweight' | 'featherweight' | 'lightweight' | 'welterweight';
    status: 'active' | 'retired' | 'suspended';
    updatedAt: string;
  };
};

aggregateVersion is the part people skip and regret. It’s how the consumer figures out if the event is newer than what it has. Kafka gives you ordering per partition, but you still get duplicates from retries, and during backfill you can process an older snapshot after a newer live event. Version check is your guardrail.

Producer side, transactional outbox

Do not publish to Kafka right after a database write from app code. The docs make it look fine. It is not. Database commits, publish fails, replica misses the update. Or publish succeeds and the database rolls back, and consumers materialize a customer that doesn’t exist. Outbox table, same transaction as the write, separate worker drains it to Kafka.

// athletes-service/src/athletes/athletes.service.ts
import { Injectable } from '@nestjs/common';
import { DataSource } from 'typeorm';
import { Athlete } from './athlete.entity';
import { OutboxEvent } from '../outbox/outbox.entity';

@Injectable()
export class AthletesService {
  constructor(private readonly ds: DataSource) {}

  async updateStatus(id: string, status: Athlete['status']) {
    return this.ds.transaction(async (m) => {
      const athlete = await m.findOneByOrFail(Athlete, { id });
      athlete.status = status;
      athlete.version = athlete.version + 1;
      athlete.updatedAt = new Date();
      await m.save(athlete);

      await m.insert(OutboxEvent, {
        topic: 'athletes.athlete-updated.v1',
        key: athlete.id,
        payload: {
          schemaVersion: 1,
          eventId: crypto.randomUUID(),
          occurredAt: athlete.updatedAt.toISOString(),
          aggregateId: athlete.id,
          aggregateVersion: athlete.version,
          payload: {
            athleteId: athlete.id,
            displayName: athlete.displayName,
            countryCode: athlete.countryCode,
            weightClass: athlete.weightClass,
            status: athlete.status,
            updatedAt: athlete.updatedAt.toISOString(),
          },
        },
      });

      return athlete;
    });
  }
}

A worker tails the outbox (poll or logical replication, your call), publishes to Kafka, marks the row sent. If it crashes mid-publish, the next run picks it up. Consumer-side idempotency handles the duplicate. You sleep at night.

Consumer side, idempotent upsert

Pick a partition key matching the aggregate so updates for the same athlete land on the same partition in order. Upsert by aggregate id. Reject anything with a lower version than what’s already there.

// rankings-service/src/replicas/athletes-replica.consumer.ts
import { Injectable, Logger } from '@nestjs/common';
import { Kafka, EachMessagePayload } from 'kafkajs';
import { DataSource } from 'typeorm';
import { AthleteReplica } from './athlete-replica.entity';

@Injectable()
export class AthletesReplicaConsumer {
  private readonly log = new Logger(AthletesReplicaConsumer.name);

  constructor(
    private readonly kafka: Kafka,
    private readonly ds: DataSource,
  ) {}

  async start() {
    const consumer = this.kafka.consumer({
      groupId: 'rankings.athletes-replica.v1',
      sessionTimeout: 30_000,
      maxWaitTimeInMs: 500,
    });
    await consumer.subscribe({ topic: 'athletes.athlete-updated.v1' });
    await consumer.run({ eachMessage: (m) => this.handle(m) });
  }

  private async handle({ message }: EachMessagePayload) {
    if (!message.value) return;
    const evt = JSON.parse(message.value.toString());

    const result = await this.ds
      .createQueryBuilder()
      .insert()
      .into(AthleteReplica)
      .values({
        athleteId: evt.payload.athleteId,
        displayName: evt.payload.displayName,
        countryCode: evt.payload.countryCode,
        weightClass: evt.payload.weightClass,
        status: evt.payload.status,
        sourceVersion: evt.aggregateVersion,
        sourceUpdatedAt: evt.payload.updatedAt,
        replicatedAt: new Date(),
      })
      .orUpdate(
        ['display_name', 'country_code', 'weight_class', 'status',
         'source_version', 'source_updated_at', 'replicated_at'],
        ['athlete_id'],
      )
      .where('athlete_replica.source_version < :v', { v: evt.aggregateVersion })
      .execute();

    if (result.raw.affectedRows === 0) {
      this.log.debug(`stale or duplicate event ${evt.eventId} ignored`);
    }
  }
}

Note the WHERE source_version < clause. That’s the entire idempotency contract. Duplicates and out-of-order events become no-ops. No dedupe table, no Redis lock.

Bootstrap, the part that gets forgotten

The bit that trips up every team trying this for the first time. When a new consumer comes online, or you reset a group, the events on the topic only go back as far as retention. You can’t replay the entire history of athletes from Kafka. Snapshot endpoint on the producer. Bootstrap once, then switch to streaming.

// athletes-service/src/snapshot/snapshot.controller.ts
import { Controller, Get, Query, Res } from '@nestjs/common';
import { Response } from 'express';
import { DataSource } from 'typeorm';

@Controller('internal/snapshot/athletes')
export class AthletesSnapshotController {
  constructor(private readonly ds: DataSource) {}

  @Get()
  async stream(
    @Query('after') after: string | undefined,
    @Res() res: Response,
  ) {
    res.setHeader('Content-Type', 'application/x-ndjson');

    const q = this.ds
      .createQueryBuilder(Athlete, 'a')
      .where(after ? 'a.id > :after' : '1=1', { after })
      .orderBy('a.id', 'ASC')
      .limit(1000);

    const rows = await q.getMany();
    for (const a of rows) {
      res.write(JSON.stringify({
        aggregateId: a.id,
        aggregateVersion: a.version,
        payload: {
          athleteId: a.id,
          displayName: a.displayName,
          countryCode: a.countryCode,
          weightClass: a.weightClass,
          status: a.status,
          updatedAt: a.updatedAt.toISOString(),
        },
      }) + '\n');
    }

    res.end();
  }
}

On first boot the consumer pages through the snapshot, upserts each row with the same source_version < guard, then starts the Kafka consumer with auto.offset.reset=earliest and catches up. Live events that arrived during the snapshot get re-applied. Version check makes that safe.

Monitoring replication lag without lying

This is where most teams pay tuition. You need three metrics, not one. Consumer offset lag is necessary but not sufficient. A consumer can be “caught up” on offsets while still serving stale data, because the producer hasn’t published.

// rankings-service/src/replicas/replica-health.service.ts
@Injectable()
export class ReplicaHealthService {
  constructor(private readonly ds: DataSource) {}

  // Emitted every 10s to Datadog
  async metrics() {
    const row = await this.ds.query(`
      SELECT
        EXTRACT(EPOCH FROM (now() - max(source_updated_at))) AS source_age_seconds,
        EXTRACT(EPOCH FROM (now() - max(replicated_at)))   AS replicated_age_seconds,
        EXTRACT(EPOCH FROM (max(replicated_at) - max(source_updated_at))) AS apply_lag_seconds
      FROM athlete_replica
    `);
    return row[0];
  }
}

source_age_seconds is end-to-end freshness. apply_lag_seconds is source-update to applied. Apply lag climbing while offset lag is flat means blame the producer. Both climbing means blame the consumer. Everything flat but source age huge means no events at all, usually a misconfigured topic or a dead outbox drainer.

When the replica went stale for eight hours

The combat-sports platform, early on. Rankings index served from Elasticsearch, fed by a consumer reading off Kafka. Postgres was system of record, ES the query layer. Federation tournament finished on a Saturday night. New champion. Eight hours later the page still showed the previous number one. The athlete saw it before we did and tweeted a screenshot tagging the federation.

Indexer pod was still consuming from Kafka. That was the trap. Consumer lag was green. Nothing paged. The bulk-write client to ES had entered a circuit-open state after a transient blip overnight, breaker had no automated retry path. Reading messages, acking them, dropping the writes on the floor. We restarted the indexer and new events resumed correctly, but the eight hours of dropped writes were gone. Triggered a full reindex from Postgres into a new index, atomic-aliased the read over, about 25 minutes.

The fix that mattered wasn’t the rebuild. It was adding source_age_seconds as a first-class alert. We’d been measuring throughput, not freshness. A derived index can be perfectly busy and perfectly wrong at the same time.

When a refactor broke ordering

Different platform, years later. The creator-economy product I worked at had a users event topic feeding several consumers. Someone shipped a refactor that changed the partition key from user_id to account_id. Test envs looked fine. Rolled it out on a Tuesday. By Wednesday morning support had tickets about user profile changes “reverting” in one downstream product.

Two updates to the same user had landed on different partitions because the account had changed in between. Consumer processed them out of order. The version-check guard would have caught it, but this consumer predated our aggregateVersion standard and did a blind upsert by user_id. The older update won.

First instinct was to roll back the producer change. We did. Problem stayed, because the bad rows were already in the replica and wouldn’t be overwritten until each user got another update. Real fix was a one-shot reconciliation that paged through the source users table and re-emitted current state for every affected aggregate with a fresh aggregateVersion. Replica caught up in about 40 minutes. We added a CI check that fails any PR touching partition keys without an explicit topic version bump.

Local setup for this pattern

A docker-compose.yml like this gets you running:

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports: ['9092:9092']
  postgres-source:
    image: postgres:16
    environment:
      POSTGRES_DB: athletes
      POSTGRES_PASSWORD: dev
    ports: ['5433:5432']
  postgres-replica:
    image: postgres:16
    environment:
      POSTGRES_DB: rankings
      POSTGRES_PASSWORD: dev
    ports: ['5434:5432']

Two Postgres instances on purpose. They don’t share. Event stream is the only path.

Takeaways

  • Replicate via events when consumer reads need to be fast, local, and resilient to producer outages.
  • Always include aggregateVersion. The version check is what makes upserts idempotent and reordering safe.
  • Use the transactional outbox pattern. Application-level dual writes will burn you.
  • Bootstrap new consumers from a snapshot endpoint, then switch to streaming. Don’t rely on infinite Kafka retention.
  • Measure freshness, not just throughput. source_age_seconds is the alert that catches silent breakage.
  • Partition keys are part of your event schema. Treat changes to them like schema migrations.

Thanks for reading. If you’ve got thoughts, send them my way.

© 2026 Akin Gundogdu. All Rights Reserved.