Event-Driven User Data Synchronization
Overview
Section titled “Overview”In a Domain-Driven Design (DDD) microservices architecture, each bounded context owns its data. However, there are common scenarios where multiple services need access to shared reference data, such as user information for audit trails (created_by, updated_by, deleted_by columns).
This guide explains the event-driven synchronization pattern using BullMQ to replicate user data from the IAM service (data owner) to lookup_users tables in consumer services (Data Owner BC, Data Processing BC, System Admin BC, etc.).
Problem Statement
Section titled “Problem Statement”The Challenge
Section titled “The Challenge”Consider this common scenario in a multi-service system:
// In Data Owner Bounded Context (app_core_db database)@Entity({ name: 'resources', database: AppDatabases.APP_CORE })export class Resource { @PrimaryGeneratedColumn('uuid') id: string;
@Column() reference_number: string;
@Column() first_name: string;
// Audit fields - WHO created/modified this record? @Column({ type: 'uuid' }) created_by: string; // ❌ Problem: Just a UUID, no user details
@Column({ type: 'uuid' }) updated_by: string; // ❌ Problem: Just a UUID, no user details}Business Requirement: When displaying resource records, we need to show:
- Reference number, name, date
- Created by: “John Smith (john.smith@company.com)” — NOT just a UUID
- Last updated by: “Jane Doe (jane.d@company.com)“
The Anti-Pattern (What NOT to Do)
Section titled “The Anti-Pattern (What NOT to Do)”❌ Direct Database Access Across Services:
// WRONG: Data Owner service directly accessing IAM database@Entity({ name: 'resources', database: AppDatabases.APP_CORE })export class Resource { @ManyToOne(() => User) // ❌ User entity is in IAM database (app_iam_db) @JoinColumn({ name: 'created_by' }) creator: User; // ❌ Violates bounded context boundaries}Why This Fails:
- ❌ Violates Bounded Context: Data Owner service should not access IAM’s database directly
- ❌ Tight Coupling: Schema changes in IAM break the Data Owner service
- ❌ Cross-Database JOINs: PostgreSQL cannot JOIN across separate databases efficiently
- ❌ Scalability Issues: When services move to different servers/regions, direct database access becomes impossible
Solution Architecture: Event-Driven Synchronization
Section titled “Solution Architecture: Event-Driven Synchronization”High-Level Architecture
Section titled “High-Level Architecture”graph TB
subgraph "IAM Service (Data Owner)"
A[UsersService<br/>app_iam_db.users]
B[BullMQ Producer]
end
subgraph "Message Queue"
C[(Redis + BullMQ<br/>Queue: USER_SYNC)]
end
subgraph "Data Owner BC (Consumer)"
D[LookupUsersProcessor]
E[lookup_users table<br/>app_core_db]
F[Resource Entity]
end
subgraph "Data Processing BC (Consumer)"
G[LookupUsersProcessor]
H[lookup_users table<br/>app_processing_db]
end
subgraph "System Admin BC (Consumer)"
I[LookupUsersProcessor]
J[lookup_users table<br/>app_admin_db]
end
A -->|1. User Created/Updated/Deleted| B
B -->|2. Publish Event| C
C -->|3. Deliver Event| D
C -->|3. Deliver Event| G
C -->|3. Deliver Event| I
D -->|4. Upsert| E
G -->|4. Upsert| H
I -->|4. Upsert| J
E -->|5. JOIN| F
style A fill:#e1f5ff
style C fill:#fff4e1
style E fill:#e8f5e9
style H fill:#e8f5e9
style J fill:#e8f5e9
Flow Description
Section titled “Flow Description”- Event Trigger: When a user is created/updated/deleted in IAM service
- Event Publishing: IAM service publishes event to BullMQ queue
- Event Distribution: BullMQ delivers event to all consumer services
- Local Replication: Each consumer upserts the user data into its own
lookup_userstable - JOIN Operations: Consumer services can now perform local JOINs for audit trails
Core Principles
Section titled “Core Principles”1. Event-Driven Architecture
Section titled “1. Event-Driven Architecture”Pattern: Publish-Subscribe with BullMQ
- Publisher: IAM Service (single source of truth)
- Subscribers: All consumer services (Data Owner BC, Processing BC, etc.)
- Message Broker: Redis + BullMQ (reliable, persistent, retryable)
2. Data Ownership
Section titled “2. Data Ownership”IAM Service is the Data Owner:
- ✅ Manages the authoritative
userstable inapp_iam_db - ✅ Enforces business rules (password hashing, validation)
- ✅ Publishes change events to other services
- ✅ No other service can modify user data
Consumer Services are Data Consumers:
- ✅ Maintain read-only replicas in
lookup_userstables - ✅ Use replicated data for JOINs and queries
- ❌ Never modify user data locally
- ❌ Never trust local data as authoritative
3. Eventual Consistency
Section titled “3. Eventual Consistency”Consistency Model: Eventual consistency with bounded staleness
- Synchronization Delay: Typically < 100ms (Redis in same datacenter)
- Acceptable Trade-off: Slight delay is acceptable for audit trails
- Guaranteed Delivery: BullMQ ensures events are not lost (retry with exponential backoff)
Implementation Guide
Section titled “Implementation Guide”Step 1: Define Shared Base Entity
Section titled “Step 1: Define Shared Base Entity”Create a shared base entity that all services can extend:
File: libs/common/src/entities/base-lookup-user.entity.ts
import { Column } from 'typeorm';
import { BaseEntity } from '@lib/common/abstracts/base-entity.abstract';import { EmployeeType } from '@lib/common/enum/auth/employee-type.enum';import { Gender } from '@lib/common/enum/auth/gender.enum';import { PreferredLanguage } from '@lib/common/enum/auth/preferred-language.enum';import { IUser } from '@lib/common/interfaces/user.interface';
/** * Base entity for lookup_users table * Contains essential user information for audit trails and display * Synchronized from IAM service via BullMQ events */export abstract class BaseLookupUser extends BaseEntity implements IUser { // --- Unique Identifiers --- @Column({ type: 'uuid', nullable: true }) employee_id: string | null;
@Column({ type: 'varchar', length: 20, nullable: false }) employee_code: string;
@Column({ type: 'varchar', length: 13, nullable: false }) national_id: string;
@Column({ type: 'varchar', length: 50, nullable: false }) username: string;
// --- Personal Information --- @Column({ type: 'varchar', length: 10, nullable: true }) title: string | null;
@Column({ type: 'varchar', length: 50, nullable: false }) first_name: string;
@Column({ type: 'varchar', length: 50, nullable: false }) last_name: string;
// --- Contact Information --- @Column({ type: 'varchar', length: 100, nullable: true }) email: string | null;
@Column({ type: 'varchar', length: 15, nullable: true }) phone: string | null;
// --- Organization Information --- @Column({ type: 'uuid', nullable: true }) department_id: string | null;
@Column({ type: 'varchar', length: 200, nullable: true }) position: string | null;
@Column({ type: 'varchar', length: 20, enum: EmployeeType, nullable: true }) employee_type: EmployeeType | null;
// --- Profile Information --- @Column({ type: 'varchar', length: 500, nullable: true }) profile_image_url: string | null;
@Column({ type: 'varchar', length: 10, enum: Gender, nullable: true }) gender: Gender | null;
// --- Status --- @Column({ type: 'boolean', default: true }) is_active: boolean;
// Note: Passwords, MFA secrets, and other sensitive data are NOT replicated}Design Decisions:
- ✅ Extends BaseEntity: Includes
id,created_at,updated_at,is_deleted - ✅ Implements IUser: Type-safe interface for user data
- ✅ Essential Fields Only: Only includes data needed for display/audit (no passwords, MFA secrets)
- ✅ Nullable Fields: Most fields nullable to handle partial updates gracefully
- ✅ Abstract Class: Forces each service to create its own concrete entity
Step 2: Create Service-Specific Lookup Entity
Section titled “Step 2: Create Service-Specific Lookup Entity”Each consumer service extends the base entity and specifies its database:
Example — Data Owner BC:
File: apps/data-owner-bc/src/entities/lookup-user.entity.ts
import { Entity } from 'typeorm';
import { BaseLookupUser } from '@lib/common/entities/base-lookup-user.entity';import { AppDatabases } from '@lib/common/enum/app-databases.enum';
/** * Data Owner BC's local copy of user data for audit trails * Synchronized from IAM service via BullMQ events * Database: app_core_db */@Entity({ name: 'lookup_users', database: AppDatabases.APP_CORE })export class LookupUser extends BaseLookupUser {}Example — Data Processing BC:
@Entity({ name: 'lookup_users', database: AppDatabases.APP_PROCESSING })export class LookupUser extends BaseLookupUser {}Example — System Admin BC:
@Entity({ name: 'lookup_users', database: AppDatabases.APP_ADMIN })export class LookupUser extends BaseLookupUser {}Key Points:
- ✅ Same table name (
lookup_users) across all services for consistency - ✅ Different databases per service to maintain bounded context isolation
- ✅ Minimal code duplication (inherit everything from
BaseLookupUser)
Step 3: Configure BullMQ Queue
Section titled “Step 3: Configure BullMQ Queue”Define the queue name and job types in a shared enum:
File: libs/common/src/enum/app-queue.enum.ts
export const AppQueue = { Iam: { cmd: { User: { name: 'users', // Queue name jobs: { CreateUser: 'user.created', UpdateUser: 'user.updated', DeleteUser: 'user.deleted', }, }, }, },};Why This Structure:
- ✅ Centralized: Single source of truth for queue/job names
- ✅ Type-Safe: Autocomplete and refactoring support
- ✅ Namespaced: Clear ownership (Iam → User)
- ✅ Event Naming: Past-tense verbs (
created,updated,deleted)
Step 4: Implement Publisher (IAM Service)
Section titled “Step 4: Implement Publisher (IAM Service)”The IAM service publishes events whenever user data changes:
File: apps/iam/src/modules/user/services/users.service.ts
import { InjectQueue } from '@nestjs/bullmq';import { Injectable } from '@nestjs/common';
import { Queue } from 'bullmq';
import { AppQueue } from '@lib/common/enum/app-queue.enum';import { BaseServiceOperations } from '@lib/common/utils/base-operations/base-service-operations.util';
@Injectable()export class UsersService extends BaseServiceOperations<User, CreateUserDTO, UpdateUserDTO> { constructor( @InjectQueue(AppQueue.Iam.cmd.User.name) private readonly usersQueue: Queue, // ... other dependencies ) { super(); }
/** * Create a new user and publish 'user.created' event */ async create(data: CreateUserDTO, session: IUserSession): Promise<User> { // 1. Create user in IAM database (source of truth) const created = await super.create( { ...data, password_hash: await this.hashPassword(data.password), employee_code: await this.generateEmpCode(), }, session, );
// 2. Publish event to BullMQ (fire-and-forget, non-blocking) await this.usersQueue.add( AppQueue.Iam.cmd.User.jobs.CreateUser, // Job name created, // Payload: Full user object { attempts: 3, // Retry up to 3 times backoff: { type: 'exponential', delay: 1000 }, // 1s, 2s, 4s }, );
return created; }
/** * Update existing user and publish 'user.updated' event */ async update(id: string, data: UpdateUserDTO, session: IUserSession): Promise<User> { // 1. Update user in IAM database const updated = await super.update(id, data, session);
// 2. Publish update event await this.usersQueue.add(AppQueue.Iam.cmd.User.jobs.UpdateUser, updated, { attempts: 3, backoff: { type: 'exponential', delay: 1000 }, });
return updated; }
/** * Soft delete user and publish 'user.deleted' event */ async softDelete(id: string, session: IUserSession): Promise<void> { // 1. Soft delete in IAM database await super.delete(id, true, session);
// 2. Publish delete event (only ID needed) await this.usersQueue.add( AppQueue.Iam.cmd.User.jobs.DeleteUser, { id }, // Minimal payload for deletion { attempts: 3, backoff: { type: 'exponential', delay: 1000 }, }, ); }}Key Implementation Details:
- Non-Blocking: Event publishing does not block the main operation
- Retry Logic: Automatic retry with exponential backoff on failure
- Full Payload: Send complete user object for create/update (consumers need all fields)
- Minimal Payload: Send only ID for delete (consumers only need to know which record to remove)
- Error Handling: BullMQ handles failures, retries, and dead-letter queue automatically
Step 5: Implement Processor (Consumer Services)
Section titled “Step 5: Implement Processor (Consumer Services)”Each consumer service implements a processor to handle incoming events:
File: apps/data-owner-bc/src/modules/lookup/lookup-user.processor.ts
import { Processor, WorkerHost } from '@nestjs/bullmq';import { InjectRepository } from '@nestjs/typeorm';
import { Job } from 'bullmq';import { plainToInstance } from 'class-transformer';import { Repository } from 'typeorm';
import { AppQueue } from '@lib/common/enum/app-queue.enum';import { AppDatabases } from '@lib/common/enum/app-databases.enum';import { IUser } from '@lib/common/interfaces/user.interface';
import { LookupUser } from '../entities/lookup-user.entity';
/** * Processes user synchronization events from IAM service * Maintains a local read-only copy of user data in lookup_users table */@Processor(AppQueue.Iam.cmd.User.name)export class LookupUsersProcessor extends WorkerHost { constructor( @InjectRepository(LookupUser, AppDatabases.APP_CORE) private readonly userRepository: Repository<LookupUser>, ) { super(); }
/** * Process incoming job from BullMQ * @param job - Job containing user data and operation type */ async process(job: Job<IUser, unknown, string>): Promise<void> { switch (job.name) { case AppQueue.Iam.cmd.User.jobs.CreateUser: case AppQueue.Iam.cmd.User.jobs.UpdateUser: { // Transform plain object to LookupUser entity instance const lookupUserInstance = plainToInstance(LookupUser, job.data, { excludeExtraneousValues: false, });
// Upsert: Insert if new, update if exists (based on primary key) await this.userRepository.save(lookupUserInstance); break; }
case AppQueue.Iam.cmd.User.jobs.DeleteUser: { // Hard delete from lookup table (no soft delete needed for replicas) await this.userRepository.delete(job.data.id); break; }
default: console.warn(`Unknown job name: ${job.name}`); } }}Implementation Details:
- @Processor Decorator: Registers this class as a BullMQ worker for the queue
- Switch Statement: Handles different job types (create, update, delete)
- plainToInstance: Safely transforms JSON payload to typed entity instance
- Upsert Pattern:
.save()inserts or updates based on primary key existence - Hard Delete: Lookup tables use hard delete (replicas don’t need soft delete history)
- Error Handling: If processor throws error, BullMQ automatically retries
Step 6: Register Lookup Module in Service
Section titled “Step 6: Register Lookup Module in Service”Create a global module to register the processor:
File: apps/data-owner-bc/src/modules/lookup/lookup.module.ts
import { BullModule } from '@nestjs/bullmq';import { Global, Module } from '@nestjs/common';import { TypeOrmModule } from '@nestjs/typeorm';
import { AppQueue } from '@lib/common/enum/app-queue.enum';import { AppDatabases } from '@lib/common/enum/app-databases.enum';
import { LookupUser } from '../entities/lookup-user.entity';import { LookupUsersProcessor } from './lookup-user.processor';
/** * Global module for lookup table management * Registers BullMQ processors to sync data from other services */@Global()@Module({ imports: [ // Register entity for database access TypeOrmModule.forFeature([LookupUser], AppDatabases.APP_CORE),
// Register BullMQ queue for receiving events BullModule.registerQueue({ name: AppQueue.Iam.cmd.User.name, }), ], providers: [LookupUsersProcessor], exports: [],})export class LookupModule {}Import in Root Module:
@Module({ imports: [ // ... other imports LookupModule, // ✅ Import globally ],})export class DataOwnerBCModule {}Repeat for all consumer services (Data Processing BC, System Admin BC, etc.)
Step 7: Use Lookup Data in Entities
Section titled “Step 7: Use Lookup Data in Entities”Now you can create relations to the lookup_users table:
File: apps/data-owner-bc/src/modules/resource/entities/resource.entity.ts
import { Entity, JoinColumn, ManyToOne } from 'typeorm';
import { LookupUser } from '../../lookup/entities/lookup-user.entity';
@Entity({ name: 'resources', database: AppDatabases.APP_CORE })export class Resource { @PrimaryGeneratedColumn('uuid') id: string;
@Column() reference_number: string;
@Column() first_name: string;
// --- Audit Trail with Relations ---
/** * UUID of user who created this record */ @Column({ type: 'uuid', nullable: true }) created_by: string | null;
/** * Relation to lookup_users for display purposes * Used to JOIN user details (name, email, position) for audit display */ @ManyToOne(() => LookupUser, { nullable: true, eager: false }) @JoinColumn({ name: 'created_by' }) creator?: LookupUser;
@Column({ type: 'uuid', nullable: true }) updated_by: string | null;
@ManyToOne(() => LookupUser, { nullable: true, eager: false }) @JoinColumn({ name: 'updated_by' }) updater?: LookupUser;}Result: Now you can load user details alongside any resource:
const resource = await resourceRepository.findOne({ where: { id: resourceId }, relations: { creator: true, updater: true },});
// Display full audit trailconsole.log(`Created by: ${resource.creator?.first_name} ${resource.creator?.last_name}`);console.log(`Email: ${resource.creator?.email}`);Operational Considerations
Section titled “Operational Considerations”Full Sync on Service Startup
Section titled “Full Sync on Service Startup”When a new service is deployed (or after extended downtime), the lookup_users table may be empty or stale. Trigger a full sync via the IAM service’s bulk-sync endpoint:
// In IAM service: bulk sync all users to all consumers@Post('sync-all')async syncAllUsers(): Promise<void> { const allUsers = await this.usersService.findAll(); for (const user of allUsers) { await this.usersQueue.add(AppQueue.Iam.cmd.User.jobs.UpdateUser, user, { attempts: 3, }); }}Monitoring Queue Health
Section titled “Monitoring Queue Health”// Check queue status in Bull Board or programmaticallyconst waiting = await usersQueue.getWaiting();const failed = await usersQueue.getFailed();const completed = await usersQueue.getCompleted();
console.log({ waiting: waiting.length, failed: failed.length, completed: completed.length });Sync Delay Expectations
Section titled “Sync Delay Expectations”| Scenario | Expected Delay |
|---|---|
| Same datacenter Redis | < 100ms |
| Cross-datacenter Redis | < 500ms |
| Service under heavy load | < 2 seconds |
| Redis temporarily unavailable | Queued on retry |
Related Documentation
Section titled “Related Documentation”- BaseSyncLookupService — Abstract base class for batch sync patterns
- BullMQ Scaling Architecture — Queue architecture and worker scaling
- Database Architecture Overview — Three-database architecture and lookup patterns