Skip to content

Event-Driven User Data Synchronization

In a Domain-Driven Design (DDD) microservices architecture, each bounded context owns its data. However, there are common scenarios where multiple services need access to shared reference data, such as user information for audit trails (created_by, updated_by, deleted_by columns).

This guide explains the event-driven synchronization pattern using BullMQ to replicate user data from the IAM service (data owner) to lookup_users tables in consumer services (Data Owner BC, Data Processing BC, System Admin BC, etc.).


Consider this common scenario in a multi-service system:

// In Data Owner Bounded Context (app_core_db database)
@Entity({ name: 'resources', database: AppDatabases.APP_CORE })
export class Resource {
@PrimaryGeneratedColumn('uuid')
id: string;
@Column()
reference_number: string;
@Column()
first_name: string;
// Audit fields - WHO created/modified this record?
@Column({ type: 'uuid' })
created_by: string; // ❌ Problem: Just a UUID, no user details
@Column({ type: 'uuid' })
updated_by: string; // ❌ Problem: Just a UUID, no user details
}

Business Requirement: When displaying resource records, we need to show:

Direct Database Access Across Services:

// WRONG: Data Owner service directly accessing IAM database
@Entity({ name: 'resources', database: AppDatabases.APP_CORE })
export class Resource {
@ManyToOne(() => User) // ❌ User entity is in IAM database (app_iam_db)
@JoinColumn({ name: 'created_by' })
creator: User; // ❌ Violates bounded context boundaries
}

Why This Fails:

  1. Violates Bounded Context: Data Owner service should not access IAM’s database directly
  2. Tight Coupling: Schema changes in IAM break the Data Owner service
  3. Cross-Database JOINs: PostgreSQL cannot JOIN across separate databases efficiently
  4. Scalability Issues: When services move to different servers/regions, direct database access becomes impossible

Solution Architecture: Event-Driven Synchronization

Section titled “Solution Architecture: Event-Driven Synchronization”
graph TB
    subgraph "IAM Service (Data Owner)"
        A[UsersService<br/>app_iam_db.users]
        B[BullMQ Producer]
    end

    subgraph "Message Queue"
        C[(Redis + BullMQ<br/>Queue: USER_SYNC)]
    end

    subgraph "Data Owner BC (Consumer)"
        D[LookupUsersProcessor]
        E[lookup_users table<br/>app_core_db]
        F[Resource Entity]
    end

    subgraph "Data Processing BC (Consumer)"
        G[LookupUsersProcessor]
        H[lookup_users table<br/>app_processing_db]
    end

    subgraph "System Admin BC (Consumer)"
        I[LookupUsersProcessor]
        J[lookup_users table<br/>app_admin_db]
    end

    A -->|1. User Created/Updated/Deleted| B
    B -->|2. Publish Event| C
    C -->|3. Deliver Event| D
    C -->|3. Deliver Event| G
    C -->|3. Deliver Event| I
    D -->|4. Upsert| E
    G -->|4. Upsert| H
    I -->|4. Upsert| J
    E -->|5. JOIN| F

    style A fill:#e1f5ff
    style C fill:#fff4e1
    style E fill:#e8f5e9
    style H fill:#e8f5e9
    style J fill:#e8f5e9
  1. Event Trigger: When a user is created/updated/deleted in IAM service
  2. Event Publishing: IAM service publishes event to BullMQ queue
  3. Event Distribution: BullMQ delivers event to all consumer services
  4. Local Replication: Each consumer upserts the user data into its own lookup_users table
  5. JOIN Operations: Consumer services can now perform local JOINs for audit trails

Pattern: Publish-Subscribe with BullMQ

  • Publisher: IAM Service (single source of truth)
  • Subscribers: All consumer services (Data Owner BC, Processing BC, etc.)
  • Message Broker: Redis + BullMQ (reliable, persistent, retryable)

IAM Service is the Data Owner:

  • ✅ Manages the authoritative users table in app_iam_db
  • ✅ Enforces business rules (password hashing, validation)
  • ✅ Publishes change events to other services
  • ✅ No other service can modify user data

Consumer Services are Data Consumers:

  • ✅ Maintain read-only replicas in lookup_users tables
  • ✅ Use replicated data for JOINs and queries
  • ❌ Never modify user data locally
  • ❌ Never trust local data as authoritative

Consistency Model: Eventual consistency with bounded staleness

  • Synchronization Delay: Typically < 100ms (Redis in same datacenter)
  • Acceptable Trade-off: Slight delay is acceptable for audit trails
  • Guaranteed Delivery: BullMQ ensures events are not lost (retry with exponential backoff)

Create a shared base entity that all services can extend:

File: libs/common/src/entities/base-lookup-user.entity.ts

import { Column } from 'typeorm';
import { BaseEntity } from '@lib/common/abstracts/base-entity.abstract';
import { EmployeeType } from '@lib/common/enum/auth/employee-type.enum';
import { Gender } from '@lib/common/enum/auth/gender.enum';
import { PreferredLanguage } from '@lib/common/enum/auth/preferred-language.enum';
import { IUser } from '@lib/common/interfaces/user.interface';
/**
* Base entity for lookup_users table
* Contains essential user information for audit trails and display
* Synchronized from IAM service via BullMQ events
*/
export abstract class BaseLookupUser extends BaseEntity implements IUser {
// --- Unique Identifiers ---
@Column({ type: 'uuid', nullable: true })
employee_id: string | null;
@Column({ type: 'varchar', length: 20, nullable: false })
employee_code: string;
@Column({ type: 'varchar', length: 13, nullable: false })
national_id: string;
@Column({ type: 'varchar', length: 50, nullable: false })
username: string;
// --- Personal Information ---
@Column({ type: 'varchar', length: 10, nullable: true })
title: string | null;
@Column({ type: 'varchar', length: 50, nullable: false })
first_name: string;
@Column({ type: 'varchar', length: 50, nullable: false })
last_name: string;
// --- Contact Information ---
@Column({ type: 'varchar', length: 100, nullable: true })
email: string | null;
@Column({ type: 'varchar', length: 15, nullable: true })
phone: string | null;
// --- Organization Information ---
@Column({ type: 'uuid', nullable: true })
department_id: string | null;
@Column({ type: 'varchar', length: 200, nullable: true })
position: string | null;
@Column({ type: 'varchar', length: 20, enum: EmployeeType, nullable: true })
employee_type: EmployeeType | null;
// --- Profile Information ---
@Column({ type: 'varchar', length: 500, nullable: true })
profile_image_url: string | null;
@Column({ type: 'varchar', length: 10, enum: Gender, nullable: true })
gender: Gender | null;
// --- Status ---
@Column({ type: 'boolean', default: true })
is_active: boolean;
// Note: Passwords, MFA secrets, and other sensitive data are NOT replicated
}

Design Decisions:

  1. Extends BaseEntity: Includes id, created_at, updated_at, is_deleted
  2. Implements IUser: Type-safe interface for user data
  3. Essential Fields Only: Only includes data needed for display/audit (no passwords, MFA secrets)
  4. Nullable Fields: Most fields nullable to handle partial updates gracefully
  5. Abstract Class: Forces each service to create its own concrete entity

Step 2: Create Service-Specific Lookup Entity

Section titled “Step 2: Create Service-Specific Lookup Entity”

Each consumer service extends the base entity and specifies its database:

Example — Data Owner BC:

File: apps/data-owner-bc/src/entities/lookup-user.entity.ts

import { Entity } from 'typeorm';
import { BaseLookupUser } from '@lib/common/entities/base-lookup-user.entity';
import { AppDatabases } from '@lib/common/enum/app-databases.enum';
/**
* Data Owner BC's local copy of user data for audit trails
* Synchronized from IAM service via BullMQ events
* Database: app_core_db
*/
@Entity({ name: 'lookup_users', database: AppDatabases.APP_CORE })
export class LookupUser extends BaseLookupUser {}

Example — Data Processing BC:

@Entity({ name: 'lookup_users', database: AppDatabases.APP_PROCESSING })
export class LookupUser extends BaseLookupUser {}

Example — System Admin BC:

@Entity({ name: 'lookup_users', database: AppDatabases.APP_ADMIN })
export class LookupUser extends BaseLookupUser {}

Key Points:

  • ✅ Same table name (lookup_users) across all services for consistency
  • ✅ Different databases per service to maintain bounded context isolation
  • ✅ Minimal code duplication (inherit everything from BaseLookupUser)

Define the queue name and job types in a shared enum:

File: libs/common/src/enum/app-queue.enum.ts

export const AppQueue = {
Iam: {
cmd: {
User: {
name: 'users', // Queue name
jobs: {
CreateUser: 'user.created',
UpdateUser: 'user.updated',
DeleteUser: 'user.deleted',
},
},
},
},
};

Why This Structure:

  • Centralized: Single source of truth for queue/job names
  • Type-Safe: Autocomplete and refactoring support
  • Namespaced: Clear ownership (Iam → User)
  • Event Naming: Past-tense verbs (created, updated, deleted)

The IAM service publishes events whenever user data changes:

File: apps/iam/src/modules/user/services/users.service.ts

import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import { Queue } from 'bullmq';
import { AppQueue } from '@lib/common/enum/app-queue.enum';
import { BaseServiceOperations } from '@lib/common/utils/base-operations/base-service-operations.util';
@Injectable()
export class UsersService extends BaseServiceOperations<User, CreateUserDTO, UpdateUserDTO> {
constructor(
@InjectQueue(AppQueue.Iam.cmd.User.name) private readonly usersQueue: Queue,
// ... other dependencies
) {
super();
}
/**
* Create a new user and publish 'user.created' event
*/
async create(data: CreateUserDTO, session: IUserSession): Promise<User> {
// 1. Create user in IAM database (source of truth)
const created = await super.create(
{
...data,
password_hash: await this.hashPassword(data.password),
employee_code: await this.generateEmpCode(),
},
session,
);
// 2. Publish event to BullMQ (fire-and-forget, non-blocking)
await this.usersQueue.add(
AppQueue.Iam.cmd.User.jobs.CreateUser, // Job name
created, // Payload: Full user object
{
attempts: 3, // Retry up to 3 times
backoff: { type: 'exponential', delay: 1000 }, // 1s, 2s, 4s
},
);
return created;
}
/**
* Update existing user and publish 'user.updated' event
*/
async update(id: string, data: UpdateUserDTO, session: IUserSession): Promise<User> {
// 1. Update user in IAM database
const updated = await super.update(id, data, session);
// 2. Publish update event
await this.usersQueue.add(AppQueue.Iam.cmd.User.jobs.UpdateUser, updated, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
});
return updated;
}
/**
* Soft delete user and publish 'user.deleted' event
*/
async softDelete(id: string, session: IUserSession): Promise<void> {
// 1. Soft delete in IAM database
await super.delete(id, true, session);
// 2. Publish delete event (only ID needed)
await this.usersQueue.add(
AppQueue.Iam.cmd.User.jobs.DeleteUser,
{ id }, // Minimal payload for deletion
{
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
},
);
}
}

Key Implementation Details:

  1. Non-Blocking: Event publishing does not block the main operation
  2. Retry Logic: Automatic retry with exponential backoff on failure
  3. Full Payload: Send complete user object for create/update (consumers need all fields)
  4. Minimal Payload: Send only ID for delete (consumers only need to know which record to remove)
  5. Error Handling: BullMQ handles failures, retries, and dead-letter queue automatically

Step 5: Implement Processor (Consumer Services)

Section titled “Step 5: Implement Processor (Consumer Services)”

Each consumer service implements a processor to handle incoming events:

File: apps/data-owner-bc/src/modules/lookup/lookup-user.processor.ts

import { Processor, WorkerHost } from '@nestjs/bullmq';
import { InjectRepository } from '@nestjs/typeorm';
import { Job } from 'bullmq';
import { plainToInstance } from 'class-transformer';
import { Repository } from 'typeorm';
import { AppQueue } from '@lib/common/enum/app-queue.enum';
import { AppDatabases } from '@lib/common/enum/app-databases.enum';
import { IUser } from '@lib/common/interfaces/user.interface';
import { LookupUser } from '../entities/lookup-user.entity';
/**
* Processes user synchronization events from IAM service
* Maintains a local read-only copy of user data in lookup_users table
*/
@Processor(AppQueue.Iam.cmd.User.name)
export class LookupUsersProcessor extends WorkerHost {
constructor(
@InjectRepository(LookupUser, AppDatabases.APP_CORE)
private readonly userRepository: Repository<LookupUser>,
) {
super();
}
/**
* Process incoming job from BullMQ
* @param job - Job containing user data and operation type
*/
async process(job: Job<IUser, unknown, string>): Promise<void> {
switch (job.name) {
case AppQueue.Iam.cmd.User.jobs.CreateUser:
case AppQueue.Iam.cmd.User.jobs.UpdateUser: {
// Transform plain object to LookupUser entity instance
const lookupUserInstance = plainToInstance(LookupUser, job.data, {
excludeExtraneousValues: false,
});
// Upsert: Insert if new, update if exists (based on primary key)
await this.userRepository.save(lookupUserInstance);
break;
}
case AppQueue.Iam.cmd.User.jobs.DeleteUser: {
// Hard delete from lookup table (no soft delete needed for replicas)
await this.userRepository.delete(job.data.id);
break;
}
default:
console.warn(`Unknown job name: ${job.name}`);
}
}
}

Implementation Details:

  1. @Processor Decorator: Registers this class as a BullMQ worker for the queue
  2. Switch Statement: Handles different job types (create, update, delete)
  3. plainToInstance: Safely transforms JSON payload to typed entity instance
  4. Upsert Pattern: .save() inserts or updates based on primary key existence
  5. Hard Delete: Lookup tables use hard delete (replicas don’t need soft delete history)
  6. Error Handling: If processor throws error, BullMQ automatically retries

Create a global module to register the processor:

File: apps/data-owner-bc/src/modules/lookup/lookup.module.ts

import { BullModule } from '@nestjs/bullmq';
import { Global, Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { AppQueue } from '@lib/common/enum/app-queue.enum';
import { AppDatabases } from '@lib/common/enum/app-databases.enum';
import { LookupUser } from '../entities/lookup-user.entity';
import { LookupUsersProcessor } from './lookup-user.processor';
/**
* Global module for lookup table management
* Registers BullMQ processors to sync data from other services
*/
@Global()
@Module({
imports: [
// Register entity for database access
TypeOrmModule.forFeature([LookupUser], AppDatabases.APP_CORE),
// Register BullMQ queue for receiving events
BullModule.registerQueue({
name: AppQueue.Iam.cmd.User.name,
}),
],
providers: [LookupUsersProcessor],
exports: [],
})
export class LookupModule {}

Import in Root Module:

@Module({
imports: [
// ... other imports
LookupModule, // ✅ Import globally
],
})
export class DataOwnerBCModule {}

Repeat for all consumer services (Data Processing BC, System Admin BC, etc.)


Now you can create relations to the lookup_users table:

File: apps/data-owner-bc/src/modules/resource/entities/resource.entity.ts

import { Entity, JoinColumn, ManyToOne } from 'typeorm';
import { LookupUser } from '../../lookup/entities/lookup-user.entity';
@Entity({ name: 'resources', database: AppDatabases.APP_CORE })
export class Resource {
@PrimaryGeneratedColumn('uuid')
id: string;
@Column()
reference_number: string;
@Column()
first_name: string;
// --- Audit Trail with Relations ---
/**
* UUID of user who created this record
*/
@Column({ type: 'uuid', nullable: true })
created_by: string | null;
/**
* Relation to lookup_users for display purposes
* Used to JOIN user details (name, email, position) for audit display
*/
@ManyToOne(() => LookupUser, { nullable: true, eager: false })
@JoinColumn({ name: 'created_by' })
creator?: LookupUser;
@Column({ type: 'uuid', nullable: true })
updated_by: string | null;
@ManyToOne(() => LookupUser, { nullable: true, eager: false })
@JoinColumn({ name: 'updated_by' })
updater?: LookupUser;
}

Result: Now you can load user details alongside any resource:

const resource = await resourceRepository.findOne({
where: { id: resourceId },
relations: { creator: true, updater: true },
});
// Display full audit trail
console.log(`Created by: ${resource.creator?.first_name} ${resource.creator?.last_name}`);
console.log(`Email: ${resource.creator?.email}`);

When a new service is deployed (or after extended downtime), the lookup_users table may be empty or stale. Trigger a full sync via the IAM service’s bulk-sync endpoint:

// In IAM service: bulk sync all users to all consumers
@Post('sync-all')
async syncAllUsers(): Promise<void> {
const allUsers = await this.usersService.findAll();
for (const user of allUsers) {
await this.usersQueue.add(AppQueue.Iam.cmd.User.jobs.UpdateUser, user, {
attempts: 3,
});
}
}
// Check queue status in Bull Board or programmatically
const waiting = await usersQueue.getWaiting();
const failed = await usersQueue.getFailed();
const completed = await usersQueue.getCompleted();
console.log({ waiting: waiting.length, failed: failed.length, completed: completed.length });
ScenarioExpected Delay
Same datacenter Redis< 100ms
Cross-datacenter Redis< 500ms
Service under heavy load< 2 seconds
Redis temporarily unavailableQueued on retry