Skip to content

BaseSyncLookupService β€” Abstract Sync Architecture

When master data changes (e.g., departments, regions, catalog items), those changes must be propagated to lookup tables in other databases so that reports and cross-database queries remain consistent. Doing this naively β€” looping through records one at a time, or loading everything into memory in a single query β€” creates performance bottlenecks at scale.

BaseSyncLookupService is an abstract base class that encodes a battle-tested sync workflow into a reusable template. Concrete service classes simply extend it and implement a single method (fetchData) to connect the workflow to their specific data source.


The class is designed around two well-known principles:

The base class defines the algorithm skeleton (fetch β†’ queue β†’ batch), while subclasses fill in only the part that differs: how to fetch the data. This prevents duplicating queue-management logic across every sync service.

BaseSyncLookupService SyncLookupDepartmentsService
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ syncAll() β”‚ β”‚ β”‚
β”‚ └─ fetchData() ◄──┼────────── fetchData(page, limit) β”‚
β”‚ └─ enqueueBatches β”‚ β”‚ └─ departmentsService β”‚
β”‚ └─ Promise.all β”‚ β”‚ .findPaginated(...) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
PrincipleHow it applies
S β€” Single ResponsibilityBase class owns the queueing workflow. Subclass owns data fetching.
O β€” Open/ClosedAdd new sync types by extending the base class without modifying it.
D β€” Dependency InversionThe base class depends on the abstract fetchData contract, not on any concrete service like DepartmentsService.

Every call to syncAll() runs through three distinct phases designed to balance speed, memory efficiency, and throughput.

graph TD
    A[Start syncAll] --> B[Fetch Page 1]
    B --> C{Total Records > 0?}
    C -- No --> D[Return 0 Results]
    C -- Yes --> E[Enqueue Page 1 Immediately]
    E --> F[Generate Remaining Page Promises]
    F --> G[Wait for all pages via Promise.all]
    G --> H[Flatten all data into single array]
    H --> I[Split into Batches of 1,000]
    I --> J[Enqueue Batches in Parallel β€” Non-blocking]
    J --> K[Return Sync Status Result]

The first page is fetched to discover the total record count and total pages. The first page’s data is immediately added to the BullMQ queue so processing starts without waiting for all remaining pages to be fetched. This keeps the system responsive even with very large datasets.

All remaining pages are fetched simultaneously using Promise.all. Instead of awaiting each page sequentially (which would be slow), all outbound queries are issued at once and resolved together. This dramatically reduces total fetch time for large datasets.

All fetched records are flattened into a single array, then split into chunks of 1,000 records. Each chunk is enqueued as a separate BullMQ job in parallel. This keeps individual job payloads manageable and avoids Redis memory spikes.

sequenceDiagram
    participant Client as Client/Trigger
    participant Sub as SubClass (SyncDept)
    participant Base as BaseSyncLookup (Abstract)
    participant DB as Database/Service
    participant MQ as BullMQ (Redis)

    Client->>Sub: syncAll()
    Sub->>Base: Execute template logic
    Base->>Sub: fetchData(Page 1)
    Sub->>DB: Query offset 0 limit 2,000
    DB-->>Sub: Return Data + Pagination
    Sub-->>Base: Data Results

    Base->>MQ: Add Page 1 Data (Immediate)

    Note over Base,DB: Parallel Fetching Phase
    Base->>Sub: fetchData(Remaining Pages 2...N)
    Sub->>DB: Multiple Parallel Queries
    DB-->>Base: All remaining data

    Note over Base,MQ: Batching Phase
    Base->>Base: Flatten & Split into Batches (1,000/each)
    Base->>MQ: Enqueue Batches in Parallel
    Base-->>Client: Return Summary {total, synced, skipped}

ConstantValuePurpose
BATCH_SIZE1,000Maximum records per BullMQ job payload
FETCH_LIMIT2,000Records fetched per database query page

FETCH_LIMIT (2,000) is intentionally larger than BATCH_SIZE (1,000) so that each database round-trip is efficient, while individual Redis payloads remain small enough to avoid memory pressure.


This class lives in the shared library (libs/common) so every Bounded Context can use it.

import { Job, Queue } from 'bullmq';
import { IResponsePaginatedService } from '@lib/common/interfaces/response/response-service.interface';
import { LogsService } from '@lib/common/modules/log/logs.service';
export abstract class BaseSyncLookupService<TEntity, TPayload = TEntity> {
protected constructor(
protected readonly logger: LogsService,
protected readonly queue: Queue,
protected readonly jobName: string,
) {}
private static readonly BATCH_SIZE = 1000;
private static readonly FETCH_LIMIT = 2000;
/**
* Subclasses implement this to connect to their specific data source.
* Returns the SOURCE entity type (e.g., Clinic, Department, Product).
*/
protected abstract fetchData(
page: number,
limit: number,
): Promise<IResponsePaginatedService<TEntity[]>>;
/**
* Optional: Override to transform source entities into a different payload.
* If not overridden, TPayload defaults to TEntity (no transformation).
*
* Use this when:
* - You need to extract specific fields from the source entity
* - You need to flatten nested relations into JSON objects
* - You need to restructure data for the lookup table format
*
* @example
* protected transformData(clinics: Clinic[]): ILookupClinic[] {
* return clinics.map(clinic => ({
* id: clinic.id,
* name: clinic.name,
* stations: clinic.stations?.map(s => ({ id: s.id, name: s.name })) ?? [],
* }));
* }
*/
protected transformData(entities: TEntity[]): TPayload[] {
return entities as unknown as TPayload[];
}
async syncAll(): Promise<ISyncLookupResult> {
const response = await this.fetchData(1, BaseSyncLookupService.FETCH_LIMIT);
const total = response.pagination?.total_records ?? 0;
const totalPages = response.pagination?.total_pages ?? 0;
if (total === 0) return { total: 0, synced: 0, skipped: 0 };
// Phase 1: Transform and enqueue first page immediately
const firstPagePayload = this.transformData(response.data ?? []);
await this.queue.add(this.jobName, firstPagePayload, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
});
// Phase 2: Fetch remaining pages in parallel
const remainingPagePromises: Promise<IResponsePaginatedService<TEntity[]>>[] = [];
for (let page = 2; page <= totalPages; page++) {
remainingPagePromises.push(this.fetchData(page, BaseSyncLookupService.FETCH_LIMIT));
}
const remainingPagesResults = await Promise.all(remainingPagePromises);
const remainingEntities = remainingPagesResults.flatMap((res) => res.data ?? []);
// Transform and Phase 3: Enqueue remaining data as batches (non-blocking)
if (remainingEntities.length > 0) {
const transformedPayloads = this.transformData(remainingEntities);
this.enqueueBatches(transformedPayloads).catch((err) => {
this.logger.error(`Failed to enqueue batches: ${err.message}`);
});
}
return { total, synced: total, skipped: 0 };
}
private async enqueueBatches(payloads: TPayload[]): Promise<void> {
const queuePromises: Promise<Job>[] = [];
for (let i = 0; i < payloads.length; i += BaseSyncLookupService.BATCH_SIZE) {
const batch = payloads.slice(i, i + BaseSyncLookupService.BATCH_SIZE);
queuePromises.push(this.queue.add(this.jobName, batch));
}
await Promise.all(queuePromises);
}
}

BaseSyncLookupService supports two generic parameters:

ParameterPurposeDefaultExample
TEntityThe source entity type fetched from the databaseRequiredClinic, Department, Product
TPayloadThe transformed payload type enqueued to RedisTEntityILookupClinic, ILookupDepartment

If source and destination are the same, use a single generic:

// Source entity = Destination payload
export class SyncLookupDepartmentsService extends BaseSyncLookupService<Department> {
// Implicitly: TPayload = Department
// No transformData() override needed
}

If you need to extract fields or reshape data, provide both generics and override transformData:

// Source entity β‰  Destination payload
export class SyncLookupClinicsService extends BaseSyncLookupService<Clinic, ILookupClinic> {
protected transformData(clinics: Clinic[]): ILookupClinic[] {
return clinics.map((clinic) => ({
id: clinic.id,
name: clinic.name,
department: (clinic.lookup_department ?? {}) as Record<string, unknown>,
stations: Array.isArray(clinic.stations)
? clinic.stations.map((station) => ({
id: station.id,
name: station.name,
is_active: station.is_active,
sort_order: station.sort_order,
}))
: [],
}));
}
}

Creating a new sync service requires three steps:

  1. Extend BaseSyncLookupService<TEntity> (or BaseSyncLookupService<TEntity, TPayload> if transforming)
  2. Call super() with logger, queue, and job name
  3. Implement fetchData() to call your domain service
  4. Optionally override transformData() if you need to reshape the data

When source entity and lookup entity have the same structure:

apps/system-admin-bc/src/modules/department/services/sync-lookup-department.service.ts
import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import { Queue } from 'bullmq';
import { BaseSyncLookupService } from '@lib/common/abstracts/base-sync-lookup.service';
import { AppQueue } from '@lib/common/enum/app-queue.enum';
import { IResponsePaginatedService } from '@lib/common/interfaces/response/response-service.interface';
import { LogsService } from '@lib/common/modules/log/logs.service';
import { Department } from '../entities/department.entity';
import { DepartmentsService } from './departments.service';
@Injectable()
export class SyncLookupDepartmentsService extends BaseSyncLookupService<Department> {
// TPayload defaults to Department (no transformation)
constructor(
logger: LogsService,
private readonly departmentsService: DepartmentsService,
@InjectQueue(AppQueue.SystemAdmin.cmd.Department.name)
departmentQueue: Queue,
) {
super(logger, departmentQueue, AppQueue.SystemAdmin.cmd.Department.jobs.SyncDepartment);
}
protected async fetchData(
page: number,
limit: number,
): Promise<IResponsePaginatedService<Department[]>> {
return this.departmentsService.findPaginated({ page, limit });
}
// transformData() is inherited from base; no override needed
}

When you need to extract specific fields and reshape nested relations:

apps/scheduling-bc/src/modules/clinic/services/sync-lookup-clinic.service.ts
import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import { Queue } from 'bullmq';
import { BaseSyncLookupService } from '@lib/common/abstracts/base-sync-lookup.service';
import { QueryParamsDTO } from '@lib/common/dto/query-params.dto';
import { AppQueue } from '@lib/common/enum/app-queue.enum';
import { ILookupClinic } from '@lib/common/interfaces/clinic.interface';
import { IResponsePaginatedService } from '@lib/common/interfaces/response/response-service.interface';
import { LogsService } from '@lib/common/modules/log/logs.service';
import { Clinic } from '../entities/clinic.entity';
import { Station } from '../entities/station.entity';
import { ClinicsService } from '../services/clinics.service';
@Injectable()
export class SyncLookupClinicsService extends BaseSyncLookupService<Clinic, ILookupClinic> {
// TEntity = Clinic (source), TPayload = ILookupClinic (transformed)
constructor(
logger: LogsService,
private readonly clinicService: ClinicsService,
@InjectQueue(AppQueue.Scheduling.cmd.Clinic.name)
clinicQueue: Queue,
) {
super(logger, clinicQueue, AppQueue.Scheduling.cmd.Clinic.jobs.SyncClinic);
}
protected async fetchData(
page: number,
limit: number,
): Promise<IResponsePaginatedService<Clinic[]>> {
const query = new QueryParamsDTO();
query.page = page;
query.limit = limit;
query.relations = 'stations,lookup_department'; // Eager-load relations
return this.clinicService.findPaginated(query);
}
/**
* Transform Clinic entities into ILookupClinic payloads.
* Extracts specific fields and reshapes nested relations into JSON objects.
*/
protected transformData(clinics: Clinic[]): ILookupClinic[] {
return clinics.map((clinic: Clinic) => ({
id: clinic.id,
// Flatten department relation to JSONB object
department: (clinic.lookup_department ?? {}) as Record<string, unknown>,
// Transform stations array: extract only id, name, is_active, sort_order
stations: Array.isArray(clinic.stations)
? clinic.stations.map((station: Station) => ({
id: station.id,
name: station.name,
is_active: station.is_active,
sort_order: station.sort_order,
}))
: [],
}));
}
}

Key Differences:

AspectSimpleComplex
Generics<Department><Clinic, ILookupClinic>
Payload TransformNone; sync raw entityExtract fields, reshape relations
fetchData RelationsLoad what’s neededUse relations to eager-load nested data
transformData OverrideNot neededRequired to reshape payload

That’s it. The entire sync workflow β€” pagination, parallelism, batching, error logging, and queueing β€” is inherited from BaseSyncLookupService.


Register the sync service as a provider in your module and inject the BullMQ queue:

apps/system-admin-bc/src/modules/department/departments.module.ts
import { BullModule } from '@nestjs/bullmq';
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { CommonModule } from '@lib/common';
import { AppQueue } from '@lib/common/enum/app-queue.enum';
import { AppDatabases } from '@lib/common/enum/app-databases.enum';
import { DatabaseModule } from '@lib/database';
import { DepartmentsController } from './controllers/departments.controller';
import { SyncLookupDepartmentsController } from './controllers/sync-lookup-department.controller';
import { Department } from './entities/department.entity';
import { DepartmentsService } from './services/departments.service';
import { SyncLookupDepartmentsService } from './services/sync-lookup-department.service';
@Module({
imports: [
CommonModule,
DatabaseModule.registerAsync(AppDatabases.APP_CORE),
TypeOrmModule.forFeature([Department], AppDatabases.APP_CORE),
BullModule.registerQueue({
name: AppQueue.SystemAdmin.cmd.Department.name,
}),
],
controllers: [DepartmentsController, SyncLookupDepartmentsController],
providers: [DepartmentsService, SyncLookupDepartmentsService],
exports: [DepartmentsService],
})
export class DepartmentsModule {}
apps/system-admin-bc/src/modules/department/controllers/sync-lookup-department.controller.ts
import { Controller, HttpCode, HttpStatus, Post } from '@nestjs/common';
import { ApiOperation, ApiResponse, ApiTags } from '@nestjs/swagger';
import { RequirePermission } from '@lib/common';
import { ResourceType } from '@lib/common/decorators/resource-type.decorator';
import { SyncLookupDepartmentsService } from '../services/sync-lookup-department.service';
@ResourceType('sync-lookup-departments')
@ApiTags('Sync Lookup Departments')
@Controller('sync-lookup-departments')
export class SyncLookupDepartmentsController {
constructor(private readonly syncLookupDepartmentsService: SyncLookupDepartmentsService) {}
@Post()
@RequirePermission('department:sync')
@HttpCode(HttpStatus.OK)
@ApiOperation({ summary: 'Sync all departments to lookup table' })
@ApiResponse({
status: HttpStatus.OK,
description: 'Returns the number of departments synced, skipped, and total.',
})
syncAll(): Promise<ISyncLookupResult> {
return this.syncLookupDepartmentsService.syncAll();
}
}

syncAll() returns a summary object that can be used for monitoring:

{
total: 1540, // Total records found in the source database
synced: 1540, // Records enqueued for processing
skipped: 0 // Records skipped (always 0 in current implementation)
}

ScenarioBehavior
Source database is unreachablefetchData() throws; syncAll() propagates the error to the caller
First-page enqueue failsError propagates synchronously β€” sync is aborted
Remaining-batch enqueue failsError is caught silently (.catch()), logged via logger.error, sync summary is still returned
Worker fails to process a jobBullMQ retries up to 3 times with exponential backoff (configured in Phase 1 enqueue)