BaseSyncLookupService β Abstract Sync Architecture
Overview
Section titled βOverviewβWhen master data changes (e.g., departments, regions, catalog items), those changes must be propagated to lookup tables in other databases so that reports and cross-database queries remain consistent. Doing this naively β looping through records one at a time, or loading everything into memory in a single query β creates performance bottlenecks at scale.
BaseSyncLookupService is an abstract base class that encodes a battle-tested sync workflow into a reusable template. Concrete service classes simply extend it and implement a single method (fetchData) to connect the workflow to their specific data source.
Architectural Concepts
Section titled βArchitectural ConceptsβThe class is designed around two well-known principles:
Template Method Pattern
Section titled βTemplate Method PatternβThe base class defines the algorithm skeleton (fetch β queue β batch), while subclasses fill in only the part that differs: how to fetch the data. This prevents duplicating queue-management logic across every sync service.
BaseSyncLookupService SyncLookupDepartmentsServiceββββββββββββββββββββββ βββββββββββββββββββββββββββββββββ syncAll() β β ββ ββ fetchData() ββββΌββββββββββ€ fetchData(page, limit) ββ ββ enqueueBatches β β ββ departmentsService ββ ββ Promise.all β β .findPaginated(...) βββββββββββββββββββββββ ββββββββββββββββββββββββββββββββSOLID Principles Applied
Section titled βSOLID Principles Appliedβ| Principle | How it applies |
|---|---|
| S β Single Responsibility | Base class owns the queueing workflow. Subclass owns data fetching. |
| O β Open/Closed | Add new sync types by extending the base class without modifying it. |
| D β Dependency Inversion | The base class depends on the abstract fetchData contract, not on any concrete service like DepartmentsService. |
Three-Phase Workflow
Section titled βThree-Phase WorkflowβEvery call to syncAll() runs through three distinct phases designed to balance speed, memory efficiency, and throughput.
graph TD
A[Start syncAll] --> B[Fetch Page 1]
B --> C{Total Records > 0?}
C -- No --> D[Return 0 Results]
C -- Yes --> E[Enqueue Page 1 Immediately]
E --> F[Generate Remaining Page Promises]
F --> G[Wait for all pages via Promise.all]
G --> H[Flatten all data into single array]
H --> I[Split into Batches of 1,000]
I --> J[Enqueue Batches in Parallel β Non-blocking]
J --> K[Return Sync Status Result]
Phase 1 β Initial Phase
Section titled βPhase 1 β Initial PhaseβThe first page is fetched to discover the total record count and total pages. The first pageβs data is immediately added to the BullMQ queue so processing starts without waiting for all remaining pages to be fetched. This keeps the system responsive even with very large datasets.
Phase 2 β Parallel Fetching
Section titled βPhase 2 β Parallel FetchingβAll remaining pages are fetched simultaneously using Promise.all. Instead of awaiting each page sequentially (which would be slow), all outbound queries are issued at once and resolved together. This dramatically reduces total fetch time for large datasets.
Phase 3 β Batch Processing
Section titled βPhase 3 β Batch ProcessingβAll fetched records are flattened into a single array, then split into chunks of 1,000 records. Each chunk is enqueued as a separate BullMQ job in parallel. This keeps individual job payloads manageable and avoids Redis memory spikes.
sequenceDiagram
participant Client as Client/Trigger
participant Sub as SubClass (SyncDept)
participant Base as BaseSyncLookup (Abstract)
participant DB as Database/Service
participant MQ as BullMQ (Redis)
Client->>Sub: syncAll()
Sub->>Base: Execute template logic
Base->>Sub: fetchData(Page 1)
Sub->>DB: Query offset 0 limit 2,000
DB-->>Sub: Return Data + Pagination
Sub-->>Base: Data Results
Base->>MQ: Add Page 1 Data (Immediate)
Note over Base,DB: Parallel Fetching Phase
Base->>Sub: fetchData(Remaining Pages 2...N)
Sub->>DB: Multiple Parallel Queries
DB-->>Base: All remaining data
Note over Base,MQ: Batching Phase
Base->>Base: Flatten & Split into Batches (1,000/each)
Base->>MQ: Enqueue Batches in Parallel
Base-->>Client: Return Summary {total, synced, skipped}
Configuration Constants
Section titled βConfiguration Constantsβ| Constant | Value | Purpose |
|---|---|---|
BATCH_SIZE | 1,000 | Maximum records per BullMQ job payload |
FETCH_LIMIT | 2,000 | Records fetched per database query page |
FETCH_LIMIT (2,000) is intentionally larger than BATCH_SIZE (1,000) so that each database round-trip is efficient, while individual Redis payloads remain small enough to avoid memory pressure.
Abstract Base Class
Section titled βAbstract Base ClassβThis class lives in the shared library (libs/common) so every Bounded Context can use it.
import { Job, Queue } from 'bullmq';
import { IResponsePaginatedService } from '@lib/common/interfaces/response/response-service.interface';import { LogsService } from '@lib/common/modules/log/logs.service';
export abstract class BaseSyncLookupService<TEntity, TPayload = TEntity> { protected constructor( protected readonly logger: LogsService, protected readonly queue: Queue, protected readonly jobName: string, ) {}
private static readonly BATCH_SIZE = 1000; private static readonly FETCH_LIMIT = 2000;
/** * Subclasses implement this to connect to their specific data source. * Returns the SOURCE entity type (e.g., Clinic, Department, Product). */ protected abstract fetchData( page: number, limit: number, ): Promise<IResponsePaginatedService<TEntity[]>>;
/** * Optional: Override to transform source entities into a different payload. * If not overridden, TPayload defaults to TEntity (no transformation). * * Use this when: * - You need to extract specific fields from the source entity * - You need to flatten nested relations into JSON objects * - You need to restructure data for the lookup table format * * @example * protected transformData(clinics: Clinic[]): ILookupClinic[] { * return clinics.map(clinic => ({ * id: clinic.id, * name: clinic.name, * stations: clinic.stations?.map(s => ({ id: s.id, name: s.name })) ?? [], * })); * } */ protected transformData(entities: TEntity[]): TPayload[] { return entities as unknown as TPayload[]; }
async syncAll(): Promise<ISyncLookupResult> { const response = await this.fetchData(1, BaseSyncLookupService.FETCH_LIMIT); const total = response.pagination?.total_records ?? 0; const totalPages = response.pagination?.total_pages ?? 0;
if (total === 0) return { total: 0, synced: 0, skipped: 0 };
// Phase 1: Transform and enqueue first page immediately const firstPagePayload = this.transformData(response.data ?? []); await this.queue.add(this.jobName, firstPagePayload, { attempts: 3, backoff: { type: 'exponential', delay: 1000 }, });
// Phase 2: Fetch remaining pages in parallel const remainingPagePromises: Promise<IResponsePaginatedService<TEntity[]>>[] = []; for (let page = 2; page <= totalPages; page++) { remainingPagePromises.push(this.fetchData(page, BaseSyncLookupService.FETCH_LIMIT)); }
const remainingPagesResults = await Promise.all(remainingPagePromises); const remainingEntities = remainingPagesResults.flatMap((res) => res.data ?? []);
// Transform and Phase 3: Enqueue remaining data as batches (non-blocking) if (remainingEntities.length > 0) { const transformedPayloads = this.transformData(remainingEntities); this.enqueueBatches(transformedPayloads).catch((err) => { this.logger.error(`Failed to enqueue batches: ${err.message}`); }); }
return { total, synced: total, skipped: 0 }; }
private async enqueueBatches(payloads: TPayload[]): Promise<void> { const queuePromises: Promise<Job>[] = []; for (let i = 0; i < payloads.length; i += BaseSyncLookupService.BATCH_SIZE) { const batch = payloads.slice(i, i + BaseSyncLookupService.BATCH_SIZE); queuePromises.push(this.queue.add(this.jobName, batch)); } await Promise.all(queuePromises); }}Generic Parameters Explained
Section titled βGeneric Parameters ExplainedβBaseSyncLookupService supports two generic parameters:
| Parameter | Purpose | Default | Example |
|---|---|---|---|
TEntity | The source entity type fetched from the database | Required | Clinic, Department, Product |
TPayload | The transformed payload type enqueued to Redis | TEntity | ILookupClinic, ILookupDepartment |
No Transformation (Simple Case)
Section titled βNo Transformation (Simple Case)βIf source and destination are the same, use a single generic:
// Source entity = Destination payloadexport class SyncLookupDepartmentsService extends BaseSyncLookupService<Department> { // Implicitly: TPayload = Department // No transformData() override needed}With Transformation (Complex Case)
Section titled βWith Transformation (Complex Case)βIf you need to extract fields or reshape data, provide both generics and override transformData:
// Source entity β Destination payloadexport class SyncLookupClinicsService extends BaseSyncLookupService<Clinic, ILookupClinic> { protected transformData(clinics: Clinic[]): ILookupClinic[] { return clinics.map((clinic) => ({ id: clinic.id, name: clinic.name, department: (clinic.lookup_department ?? {}) as Record<string, unknown>, stations: Array.isArray(clinic.stations) ? clinic.stations.map((station) => ({ id: station.id, name: station.name, is_active: station.is_active, sort_order: station.sort_order, })) : [], })); }}Implementing a Concrete Subclass
Section titled βImplementing a Concrete SubclassβCreating a new sync service requires three steps:
- Extend
BaseSyncLookupService<TEntity>(orBaseSyncLookupService<TEntity, TPayload>if transforming) - Call
super()with logger, queue, and job name - Implement
fetchData()to call your domain service - Optionally override
transformData()if you need to reshape the data
Example 1: Simple Service (No Transformation)
Section titled βExample 1: Simple Service (No Transformation)βWhen source entity and lookup entity have the same structure:
import { InjectQueue } from '@nestjs/bullmq';import { Injectable } from '@nestjs/common';
import { Queue } from 'bullmq';
import { BaseSyncLookupService } from '@lib/common/abstracts/base-sync-lookup.service';import { AppQueue } from '@lib/common/enum/app-queue.enum';import { IResponsePaginatedService } from '@lib/common/interfaces/response/response-service.interface';import { LogsService } from '@lib/common/modules/log/logs.service';
import { Department } from '../entities/department.entity';import { DepartmentsService } from './departments.service';
@Injectable()export class SyncLookupDepartmentsService extends BaseSyncLookupService<Department> { // TPayload defaults to Department (no transformation) constructor( logger: LogsService, private readonly departmentsService: DepartmentsService, @InjectQueue(AppQueue.SystemAdmin.cmd.Department.name) departmentQueue: Queue, ) { super(logger, departmentQueue, AppQueue.SystemAdmin.cmd.Department.jobs.SyncDepartment); }
protected async fetchData( page: number, limit: number, ): Promise<IResponsePaginatedService<Department[]>> { return this.departmentsService.findPaginated({ page, limit }); } // transformData() is inherited from base; no override needed}Example 2: Complex Service (With Transformation)
Section titled βExample 2: Complex Service (With Transformation)βWhen you need to extract specific fields and reshape nested relations:
import { InjectQueue } from '@nestjs/bullmq';import { Injectable } from '@nestjs/common';
import { Queue } from 'bullmq';
import { BaseSyncLookupService } from '@lib/common/abstracts/base-sync-lookup.service';import { QueryParamsDTO } from '@lib/common/dto/query-params.dto';import { AppQueue } from '@lib/common/enum/app-queue.enum';import { ILookupClinic } from '@lib/common/interfaces/clinic.interface';import { IResponsePaginatedService } from '@lib/common/interfaces/response/response-service.interface';import { LogsService } from '@lib/common/modules/log/logs.service';
import { Clinic } from '../entities/clinic.entity';import { Station } from '../entities/station.entity';import { ClinicsService } from '../services/clinics.service';
@Injectable()export class SyncLookupClinicsService extends BaseSyncLookupService<Clinic, ILookupClinic> { // TEntity = Clinic (source), TPayload = ILookupClinic (transformed) constructor( logger: LogsService, private readonly clinicService: ClinicsService, @InjectQueue(AppQueue.Scheduling.cmd.Clinic.name) clinicQueue: Queue, ) { super(logger, clinicQueue, AppQueue.Scheduling.cmd.Clinic.jobs.SyncClinic); }
protected async fetchData( page: number, limit: number, ): Promise<IResponsePaginatedService<Clinic[]>> { const query = new QueryParamsDTO(); query.page = page; query.limit = limit; query.relations = 'stations,lookup_department'; // Eager-load relations return this.clinicService.findPaginated(query); }
/** * Transform Clinic entities into ILookupClinic payloads. * Extracts specific fields and reshapes nested relations into JSON objects. */ protected transformData(clinics: Clinic[]): ILookupClinic[] { return clinics.map((clinic: Clinic) => ({ id: clinic.id, // Flatten department relation to JSONB object department: (clinic.lookup_department ?? {}) as Record<string, unknown>, // Transform stations array: extract only id, name, is_active, sort_order stations: Array.isArray(clinic.stations) ? clinic.stations.map((station: Station) => ({ id: station.id, name: station.name, is_active: station.is_active, sort_order: station.sort_order, })) : [], })); }}Key Differences:
| Aspect | Simple | Complex |
|---|---|---|
| Generics | <Department> | <Clinic, ILookupClinic> |
| Payload Transform | None; sync raw entity | Extract fields, reshape relations |
| fetchData Relations | Load whatβs needed | Use relations to eager-load nested data |
| transformData Override | Not needed | Required to reshape payload |
Thatβs it. The entire sync workflow β pagination, parallelism, batching, error logging, and queueing β is inherited from BaseSyncLookupService.
Module & Controller Registration
Section titled βModule & Controller RegistrationβModule Setup
Section titled βModule SetupβRegister the sync service as a provider in your module and inject the BullMQ queue:
import { BullModule } from '@nestjs/bullmq';import { Module } from '@nestjs/common';import { TypeOrmModule } from '@nestjs/typeorm';
import { CommonModule } from '@lib/common';import { AppQueue } from '@lib/common/enum/app-queue.enum';import { AppDatabases } from '@lib/common/enum/app-databases.enum';import { DatabaseModule } from '@lib/database';
import { DepartmentsController } from './controllers/departments.controller';import { SyncLookupDepartmentsController } from './controllers/sync-lookup-department.controller';import { Department } from './entities/department.entity';import { DepartmentsService } from './services/departments.service';import { SyncLookupDepartmentsService } from './services/sync-lookup-department.service';
@Module({ imports: [ CommonModule, DatabaseModule.registerAsync(AppDatabases.APP_CORE), TypeOrmModule.forFeature([Department], AppDatabases.APP_CORE), BullModule.registerQueue({ name: AppQueue.SystemAdmin.cmd.Department.name, }), ], controllers: [DepartmentsController, SyncLookupDepartmentsController], providers: [DepartmentsService, SyncLookupDepartmentsService], exports: [DepartmentsService],})export class DepartmentsModule {}Controller Endpoint
Section titled βController Endpointβimport { Controller, HttpCode, HttpStatus, Post } from '@nestjs/common';import { ApiOperation, ApiResponse, ApiTags } from '@nestjs/swagger';
import { RequirePermission } from '@lib/common';import { ResourceType } from '@lib/common/decorators/resource-type.decorator';
import { SyncLookupDepartmentsService } from '../services/sync-lookup-department.service';
@ResourceType('sync-lookup-departments')@ApiTags('Sync Lookup Departments')@Controller('sync-lookup-departments')export class SyncLookupDepartmentsController { constructor(private readonly syncLookupDepartmentsService: SyncLookupDepartmentsService) {}
@Post() @RequirePermission('department:sync') @HttpCode(HttpStatus.OK) @ApiOperation({ summary: 'Sync all departments to lookup table' }) @ApiResponse({ status: HttpStatus.OK, description: 'Returns the number of departments synced, skipped, and total.', }) syncAll(): Promise<ISyncLookupResult> { return this.syncLookupDepartmentsService.syncAll(); }}Return Value
Section titled βReturn ValueβsyncAll() returns a summary object that can be used for monitoring:
{ total: 1540, // Total records found in the source database synced: 1540, // Records enqueued for processing skipped: 0 // Records skipped (always 0 in current implementation)}Error Handling
Section titled βError Handlingβ| Scenario | Behavior |
|---|---|
| Source database is unreachable | fetchData() throws; syncAll() propagates the error to the caller |
| First-page enqueue fails | Error propagates synchronously β sync is aborted |
| Remaining-batch enqueue fails | Error is caught silently (.catch()), logged via logger.error, sync summary is still returned |
| Worker fails to process a job | BullMQ retries up to 3 times with exponential backoff (configured in Phase 1 enqueue) |
Related Documentation
Section titled βRelated Documentationβ- Event-Driven User Data Synchronization β BullMQ-based user sync pattern
- Data Replication for Reporting β Event-driven lookup table sync
- BullMQ Scaling Architecture β Queue architecture and worker scaling