File Processing Pipelines in NestJS

How I run streaming multipart uploads to S3, MIME and magic-number validation, and BullMQ pipelines for image, PDF, and video processing in NestJS. With a storage abstraction that doesn't lie to me.

OK so. A Wednesday morning at the creator-economy platform I worked at. The branded-mobile-app pipeline had been shipping native iOS and Android builds for thousands of creator-owned apps for about six months, and honestly it felt boring in the good way. Then the pending_apple_review queue started backing up. By lunch, hundreds of customer builds were stuck somewhere between our worker pods and the App Store. Support had a wall of tickets by 2 p.m. Pacific.

That incident is the reason I now write file pipelines the way I do. The upload is the easy part. The worker stage, the retries, and the storage abstraction are what punish you the moment you stop paying attention.

Here’s how I run file processing in NestJS in production, and the places I’ve watched it break.

Streaming uploads, not buffered

Multer’s default behavior is to buffer the whole file into memory before your handler runs. That’s fine for a 200KB avatar. It will end your day on a 4GB lecture video.

I use Fastify’s multipart adapter on NestJS and stream directly into the S3 multipart uploader from AWS SDK v3. The body never lands on disk, and never sits in a Node buffer.

import { Controller, Post, Req, BadRequestException } from '@nestjs/common';
import { FastifyRequest } from 'fastify';
import { Upload } from '@aws-sdk/lib-storage';
import { S3Client } from '@aws-sdk/client-s3';
import { randomUUID } from 'node:crypto';

@Controller('uploads')
export class UploadsController {
  constructor(private readonly s3: S3Client) {}

  @Post()
  async upload(@Req() req: FastifyRequest) {
    const file = await req.file({ limits: { fileSize: 5 * 1024 * 1024 * 1024 } });
    if (!file) throw new BadRequestException('No file');

    const key = `raw/${randomUUID()}-${file.filename}`;

    const uploader = new Upload({
      client: this.s3,
      params: {
        Bucket: process.env.UPLOAD_BUCKET,
        Key: key,
        Body: file.file,
        ContentType: file.mimetype,
      },
      queueSize: 4,
      partSize: 8 * 1024 * 1024,
      leavePartsOnError: false,
    });

    uploader.on('httpUploadProgress', (p) => {
      if (p.loaded && p.total) {
        // backpressure visibility for slow clients
      }
    });

    const result = await uploader.done();
    return { key, etag: result.ETag };
  }
}

queueSize: 4 and partSize: 8MB aren’t arbitrary. Smaller parts mean more roundtrips. Larger parts pin more memory per concurrent upload. Eight megs hit the sweet spot for our worker shape. Yours will be different. Measure.

leavePartsOnError: false matters too. Without it, an aborted upload leaves you paying S3 storage for orphaned parts forever. Set a lifecycle rule on AbortIncompleteMultipartUpload as well. Belt and braces.

Validate before you trust

Content-Type is a hint, not a fact. A curl one-liner can claim image/png while shipping you a PHP web shell. The only validation that holds is reading the actual bytes.

I run two checks. The MIME hint goes through class-validator at the controller. The magic-number check runs inside the worker, after the upload has landed in S3, and refuses the job if the file lies about itself.

import { Injectable, BadRequestException } from '@nestjs/common';
import { fileTypeFromStream } from 'file-type';
import { GetObjectCommand, S3Client } from '@aws-sdk/client-s3';

const ALLOWED = new Set([
  'image/png', 'image/jpeg', 'image/webp',
  'application/pdf',
  'video/mp4', 'video/quicktime',
]);

@Injectable()
export class FileValidator {
  constructor(private readonly s3: S3Client) {}

  async assertSafe(bucket: string, key: string, claimedMime: string) {
    if (!ALLOWED.has(claimedMime)) {
      throw new BadRequestException(`Unsupported type: ${claimedMime}`);
    }

    const obj = await this.s3.send(new GetObjectCommand({ Bucket: bucket, Key: key }));
    const detected = await fileTypeFromStream(obj.Body as NodeJS.ReadableStream);
    if (!detected) throw new BadRequestException('Cannot identify file');
    if (!ALLOWED.has(detected.mime)) {
      throw new BadRequestException(`Detected ${detected.mime} does not match claim`);
    }
    if (detected.mime !== claimedMime) {
      throw new BadRequestException(`Header lied. Claimed ${claimedMime}, got ${detected.mime}`);
    }
    return detected;
  }
}

The cost of being wrong here is not theoretical. I’ve seen “PDF” uploads that were Office macro droppers and “JPEG” thumbnails that were ZIP polyglots. Read the bytes.

A storage abstraction that holds up

Almost every NestJS service I’ve built ends up needing local disk for tests, S3 in production, and occasionally GCS for a client that lives in Google’s world. Reaching for the SDK directly inside business logic is how you end up with a permanent migration project.

I use a single interface and a module that picks the implementation at boot.

import { Injectable } from '@nestjs/common';
import { Readable } from 'node:stream';

export interface StorageDriver {
  put(key: string, body: Readable | Buffer, contentType: string): Promise<{ etag: string }>;
  get(key: string): Promise<Readable>;
  remove(key: string): Promise<void>;
  presignGet(key: string, expiresInSeconds: number): Promise<string>;
}

@Injectable()
export class StorageService {
  constructor(private readonly driver: StorageDriver) {}

  async put(key: string, body: Readable | Buffer, contentType: string) {
    return this.driver.put(key, body, contentType);
  }

  async presignGet(key: string, ttlSeconds = 300) {
    return this.driver.presignGet(key, ttlSeconds);
  }
}

S3 and GCS implementations live behind that. The local driver writes to a temp directory and is used in Jest tests only. The rule I enforce in code review: no business module imports @aws-sdk/* directly. Ever. That keeps the surface area for a future migration to a single file.

This sounds like overkill until the day a client demands data residency in europe-west1 and your nearest Google bucket is suddenly the path of least resistance.

BullMQ pipelines for the heavy lifting

The upload returns fast. The processing happens elsewhere. I push every post-upload step into BullMQ workers with a strict step contract: input is an S3 key, output is one or more S3 keys, plus a row in PostgreSQL.

import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { Logger } from '@nestjs/common';
import sharp from 'sharp';
import { Readable } from 'node:stream';

type ImageJob = { sourceKey: string; assetId: string };

@Processor('image-pipeline', { concurrency: 4 })
export class ImagePipelineProcessor extends WorkerHost {
  private readonly logger = new Logger(ImagePipelineProcessor.name);

  constructor(
    private readonly storage: StorageService,
    private readonly validator: FileValidator,
    private readonly assets: AssetRepository,
  ) { super(); }

  async process(job: Job<ImageJob>) {
    const { sourceKey, assetId } = job.data;

    await this.validator.assertSafe(process.env.UPLOAD_BUCKET!, sourceKey, 'image/jpeg');

    const source = await this.storage.get(sourceKey) as Readable;

    const variants = [
      { name: 'thumb', width: 320 },
      { name: 'medium', width: 1024 },
      { name: 'large', width: 2048 },
    ];

    for (const v of variants) {
      const out = sharp().resize({ width: v.width, withoutEnlargement: true }).webp({ quality: 82 });
      const targetKey = `derived/${assetId}/${v.name}.webp`;
      await this.storage.put(targetKey, source.pipe(out), 'image/webp');
      await this.assets.recordVariant(assetId, v.name, targetKey);
    }

    return { assetId, variants: variants.map(v => v.name) };
  }
}

PDF and video pipelines follow the same shape. PDF goes through pdf-lib for metadata and pdftoppm for page thumbnails. Video goes through ffmpeg, two-pass H.264 for archive plus a separate HLS ladder job. Same contract, different binaries.

A few rules I refuse to relax:

  • Every job is idempotent on (assetId, step). Re-running the worker must produce the same result, not a duplicate row.
  • attempts: 5 with exponential backoff. Anything that can’t be retried is not allowed on the pipeline.
  • The job removes its own temp files in a finally block, even on success. /tmp filling up is a stupid way to lose a Friday.
  • Video jobs run on a dedicated queue with concurrency: 1 per pod. ffmpeg will eat every core you give it. Two of them on the same pod fight each other.

When the queue stops being boring

Back to that Wednesday morning. The pipeline I mentioned at the top was Rails plus Python plus Fastlane, not NestJS, but the lesson is identical for a BullMQ rig.

Setting. Hundreds of branded app submissions a week, each one a multi-stage pipeline ending at an upload to the App Store. The pipeline had been quiet for months. I’d built most of it with a couple of other engineers.

What went wrong. Around mid-morning the queue started backing up. A lot of customer builds were stuck in “Waiting for Review” on Apple’s side, while our pipeline thought it had submitted them successfully. Apple’s API was silently throttling our endpoint, returning 200 OK with a body that looked normal but dropping the submission server-side.

First wrong fix. We already had auto-retry on 5xx. Someone, reasonably, extended it to retry on the “stuck” state too. That made it worse. Apple started seeing what looked like duplicate submissions, and a chunk of customers ended up with two competing review records and conflicting metadata. The retry was treating 200 OK as truth.

Real fix. Pulled the auto-retry. Added a circuit breaker that verifies submission state via a separate GET against the upstream resource, not via the response body of the POST. Wrote a one-shot reconciliation job with an idempotency key derived from app_id + version + git_sha to dedupe pending reviews against Apple’s source of truth. The mobile CX team did manual cleanup on the worst-affected creators in parallel.

What it cost. Three days of slipped releases. A lot of unhappy creators. The general rule that stuck. When the upstream is human-moderated, never trust the response of a write. Always read-after-write against the upstream’s source of truth.

There’s a cousin to this from another week at the same platform. Apple’s SubscriptionRenewal server-to-server notifications retried on us once because our renewal handler returned 200 OK just past the 30-second deadline. No idempotency check on the notification handler. Every retry created a new row in creator_subscriptions. A bunch of customers got billed twice. Fix was a database-level unique constraint on (apple_original_transaction_id, notification_uuid) plus a Sidekiq job that ack’d within five seconds and did the work async. Apple’s retries became idempotent at the queue level.

Same lesson twice. Server-to-server notifications retry. Multipart uploads retry. BullMQ retries. Build for it.

Takeaways

  • Stream uploads. Never buffer multi-gigabyte files into Node memory. AWS SDK v3 Upload plus Fastify multipart is the boring correct path.
  • Validate by magic number, not by header. The Content-Type lies whenever it’s convenient.
  • Put a storage interface between your business logic and any SDK. Local, S3, GCS behind one signature.
  • Push heavy work into BullMQ. Strict step contract, idempotent on (assetId, step), retries with exponential backoff.
  • Set AbortIncompleteMultipartUpload lifecycle rules. Orphaned parts cost real money.
  • When upstreams are human-moderated or async, read-after-write against their source of truth. Never trust the response body of the write.

Thanks for reading. If you’ve got thoughts, send them my way.

© 2026 Akin Gundogdu. All Rights Reserved.