Skip to content

Real-Time Database Architecture

This guide covers database design and architecture patterns for real-time enterprise systems. Building on the infrastructure setup covered in Part 1, this document focuses on data persistence, real-time event synchronization, and notification system architecture.

Key Topics:

  • Notification system data model design
  • Event-driven database synchronization
  • Lookup table patterns for real-time queries
  • Data lifecycle management and archival
  • High-volume data handling strategies
  • Real-time query optimization

This guide follows a theoretical-to-practical approach:

  1. Theoretical Foundations - Event-driven architecture, CQRS, eventual consistency
  2. Data Modeling - Database schema design for real-time systems
  3. Synchronization Patterns - Event publishing and consumption strategies
  4. Performance Optimization - Query optimization and caching strategies
  5. Operational Concerns - Data lifecycle, archival, and maintenance

Event-driven architecture (EDA) is fundamental to building responsive, scalable real-time systems.

Key Principles:

Event-Driven Architecture:
1. Events as First-Class Citizens
β”œβ”€ Events represent "something happened"
β”œβ”€ Events are immutable facts
└─ Events trigger downstream processing
2. Loose Coupling
β”œβ”€ Producer doesn't know about consumers
β”œβ”€ Consumers can be added/removed without affecting producers
└─ Allows independent scaling
3. Asynchronous Communication
β”œβ”€ Producer doesn't wait for consumer response
β”œβ”€ Better performance and resilience
└─ Requires handling eventual consistency
4. Event Sourcing (Optional)
β”œβ”€ Events are the source of truth
β”œβ”€ Current state derived from event history
└─ Complete audit trail

Application to Enterprise Systems:

Event Flow:
Order Created (Event)
└─ Triggers:
β”œβ”€ Update lookup table (for fast queries)
β”œβ”€ Create notification (for real-time alerts)
β”œβ”€ Update analytics (for reporting)
└─ Trigger billing process (for accounting)
All triggered asynchronously, independently

CQRS Pattern (Command Query Responsibility Segregation)

Section titled β€œCQRS Pattern (Command Query Responsibility Segregation)”

CQRS is crucial for real-time systems handling high read and write loads.

Theory:

Traditional Architecture:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Application β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β”œβ”€β”€β”€ Read ──┐
β”œβ”€β”€β”€ Write ───
β”‚ β”‚
β”Œβ”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”
β”‚ Single Database β”‚
β”‚ (Normalized Schema)β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Problem:
- Reads optimized for queries need denormalization
- Writes optimized for integrity need normalization
- Can't optimize for both simultaneously
CQRS Architecture:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Application β”‚
β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”˜
β”‚ β”‚
Write Read
β”‚ β”‚
β”Œβ”€β”€β”€β”€β–Όβ”€β”€β”€β” β”Œβ”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Master β”‚ β”‚ Read Models β”‚
β”‚Database│──│ (Lookup Tables)β”‚
β”‚(Write) β”‚ β”‚ (Optimized β”‚
β”‚ β”‚ β”‚ for queries) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
Event Bus

Our Implementation:

CQRS in Enterprise Systems:
Command Side (Write):
β”œβ”€ Master Database: app_master_db
β”œβ”€ Normalized schema
β”œβ”€ Enforces referential integrity
β”œβ”€ Source of truth
└─ Example: orders table
Query Side (Read):
β”œβ”€ Lookup Tables: app_core_db
β”œβ”€ Denormalized for performance
β”œβ”€ Optimized indexes
β”œβ”€ Eventually consistent with master
└─ Example: orders_lookup table
Synchronization:
└─ Event-driven updates via message bus

Benefits for Real-Time Systems:

AspectTraditionalCQRSImpact
Read PerformanceMediumHighFaster notification queries
Write PerformanceMediumHighNo read locks during writes
ScalabilityLimitedInfiniteRead replicas scale separately
Data ConsistencyImmediateEventualTrade-off for performance
Query ComplexityHighLowSimpler joins, faster execution

Traditional ACID vs. BASE:

ACID (Strong Consistency):
β”œβ”€ Atomicity: All or nothing
β”œβ”€ Consistency: Valid state always
β”œβ”€ Isolation: Transactions don't interfere
└─ Durability: Committed = permanent
Benefits: Data integrity guaranteed
Cost: Performance, availability, scalability
BASE (Eventual Consistency):
β”œβ”€ Basically Available: System operational
β”œβ”€ Soft state: State may change over time
└─ Eventually consistent: Will be consistent
Benefits: High performance, availability
Cost: Temporary inconsistencies

Consistency Levels in Our Architecture:

Component Consistency Level Rationale
─────────────────────────────────────────────────────────────────
Master Database ACID (Strong) Source of truth
Lookup Tables BASE (Eventual) Performance
Notification Delivery BASE (Eventual) Low latency
Audit Logs ACID (Strong) Compliance
Cache (Redis) Eventually Deleted Performance

Acceptable Inconsistency Window:

Critical Business Data:
β”œβ”€ Max inconsistency: 0ms (ACID required)
β”œβ”€ Example: Current order status
└─ Solution: Direct master database queries
Order Processing Status:
β”œβ”€ Max inconsistency: 1-2 seconds
β”œβ”€ Example: Order status changed from "pending" to "completed"
└─ Solution: Event-driven lookup table sync
Notifications:
β”œβ”€ Max inconsistency: 2-5 seconds
β”œβ”€ Example: "Order ready" notification
└─ Solution: Async notification creation + WebSocket broadcast
Analytics/Reporting:
β”œβ”€ Max inconsistency: 5-60 seconds
β”œβ”€ Example: Daily order count
└─ Solution: Batch processing, materialized views

Understanding how data flows through the system:

sequenceDiagram
    participant User as Staff
    participant UI as Frontend UI
    participant API as REST API
    participant MasterDB as Master Database<br/>(Write Model)
    participant EventBus as Event Bus<br/>(RabbitMQ/Redis)
    participant Worker1 as Lookup Sync Worker
    participant Worker2 as Notification Worker
    participant LookupDB as Lookup Table<br/>(Read Model)
    participant NotifDB as Notifications Table
    participant Socket as Socket.io Gateway
    participant Client as Connected Clients

    User->>UI: Create Order
    UI->>API: POST /api/orders
    API->>MasterDB: INSERT INTO orders
    MasterDB-->>API: Success (ID: 12345)
    API-->>UI: Order Created
    UI-->>User: Success Message

    Note over API,EventBus: Asynchronous Processing Begins

    API->>EventBus: Publish: order.created<br/>{id: 12345, ...}

    par Parallel Event Processing
        EventBus->>Worker1: Event: order.created
        Worker1->>LookupDB: UPSERT orders_lookup
        Note over LookupDB: Lookup table now<br/>eventually consistent
    and
        EventBus->>Worker2: Event: order.created
        Worker2->>NotifDB: INSERT notification
        Worker2->>Socket: Broadcast to department
        Socket->>Client: Real-time notification
        Note over Client: User sees notification<br/>within 1-2 seconds
    end

    Note over MasterDB,Client: Total latency: 50-200ms<br/>Event processing: 100-2000ms async

Latency Breakdown:

User Action to Database Write:
β”œβ”€ Network (client β†’ server): 10-50ms
β”œβ”€ API processing: 5-20ms
β”œβ”€ Database write: 5-30ms
β”œβ”€ Response (server β†’ client): 10-50ms
└─ Total: 30-150ms (user sees success)
Event Processing (asynchronous):
β”œβ”€ Event publish: 1-5ms
β”œβ”€ Event bus latency: 5-20ms
β”œβ”€ Worker pickup: 10-50ms
β”œβ”€ Lookup table update: 10-50ms
β”œβ”€ Notification creation: 10-50ms
β”œβ”€ WebSocket broadcast: 5-20ms
└─ Total: 41-195ms (clients notified)
End-to-end (user action β†’ all clients notified): 71-345ms

Index Selection Criteria:

When to create an index:
1. High Selectivity Queries
β”œβ”€ SELECT WHERE column = value
β”œβ”€ Selectivity = distinct_values / total_rows
└─ Index if selectivity > 0.1 (10%)
2. Frequently Used Filters
β”œβ”€ Track query frequency
β”œβ”€ Index top 20% most common WHERE clauses
└─ Cost of index < benefit from faster queries
3. Foreign Keys
β”œβ”€ Always index foreign keys
└─ Prevents full table scans on JOINs
4. Composite Indexes
β”œβ”€ For queries with multiple WHERE conditions
β”œβ”€ Order columns by selectivity (high β†’ low)
└─ Example: (department_id, created_at) for department-based time queries

Index Cost-Benefit Analysis:

Index Benefits:
β”œβ”€ Read speed: 1000x faster for indexed lookups
β”œβ”€ Query time: O(log n) instead of O(n)
└─ Scalability: Performance doesn't degrade with data growth
Index Costs:
β”œβ”€ Write overhead: ~10-30% slower INSERTs
β”œβ”€ Storage: ~20-50% additional disk space
β”œβ”€ Maintenance: VACUUM/ANALYZE overhead
└─ Memory: Index cache competes with data cache
Decision Rule:
Create index if:
(Query frequency Γ— Time saved) > (Write overhead + Storage cost)

Cache Hierarchy:

Multi-Level Caching:
Level 1: Application Memory (Fastest)
β”œβ”€ In-process cache (Node.js)
β”œβ”€ Latency: < 1ms
β”œβ”€ Size: ~100MB per process
└─ Use for: Configuration, lookup maps
Level 2: Redis (Fast)
β”œβ”€ Distributed cache
β”œβ”€ Latency: 1-5ms
β”œβ”€ Size: ~16GB shared
└─ Use for: User sessions, badge counts, recent queries
Level 3: Database (Slower but consistent)
β”œβ”€ Lookup tables
β”œβ”€ Latency: 10-50ms
β”œβ”€ Size: Unlimited
└─ Use for: All persistent data

Cache Invalidation Strategies:

Time-To-Live (TTL):
β”œβ”€ Set expiration time on cache entries
β”œβ”€ Simple, predictable
β”œβ”€ Trade-off: Stale data vs. cache miss rate
└─ Example: Badge count cached for 5 minutes
Event-Driven Invalidation:
β”œβ”€ Invalidate cache when source data changes
β”œβ”€ Most accurate
β”œβ”€ Complex to implement
└─ Example: Invalidate department badge when new notification arrives
Write-Through:
β”œβ”€ Update cache AND database simultaneously
β”œβ”€ Cache always consistent
β”œβ”€ Higher write latency
└─ Example: User preferences
Write-Behind:
β”œβ”€ Update cache immediately, database asynchronously
β”œβ”€ Lowest latency
β”œβ”€ Risk of data loss
└─ Example: High-frequency metrics (careful!)

A robust real-time system requires a notification system that:

  1. Persists notifications for later retrieval
  2. Delivers instantly via WebSocket
  3. Handles high volume (1000+ events/sec during peak)
  4. Manages lifecycle (archive old data, cleanup)
  5. Supports querying (find notifications by user, type, date)

Architecture Diagram:

graph TD
    subgraph "Event Sources"
        OrderPlaced["Order Placed"]
        OrderCompleted["Order Completed"]
        ResourceUpdated["Resource Updated"]
        AlertTriggered["Alert Triggered"]
    end

    subgraph "Event Processing"
        EventBus["Event Bus<br/>(RabbitMQ/Redis Streams)"]
        NotificationService["Notification Service<br/>Create & Persist"]
    end

    subgraph "Storage"
        NotificationTable["Notifications Table<br/>(Recent: last 30 days)"]
        ArchivedNotifications["Archived Notifications<br/>(Object Storage)"]
    end

    subgraph "Real-Time Delivery"
        RedisAdapter["Redis Adapter"]
        Socket["Socket.io<br/>Broadcast to Departments"]
    end

    subgraph "User Access"
        WebSocket["Connected Clients"]
        API["REST API<br/>Query History"]
    end

    OrderPlaced --> EventBus
    OrderCompleted --> EventBus
    ResourceUpdated --> EventBus
    AlertTriggered --> EventBus

    EventBus --> NotificationService
    NotificationService --> NotificationTable
    NotificationService --> RedisAdapter

    NotificationTable --> ArchivedNotifications
    NotificationTable --> API

    RedisAdapter --> Socket
    Socket --> WebSocket

    style EventBus fill:#fff4e1,stroke:#ff9800,stroke-width:2px
    style NotificationTable fill:#e1f5ff,stroke:#0288d1,stroke-width:2px
    style Socket fill:#e8f5e9,stroke:#4caf50,stroke-width:2px

Entity Definition:

apps/data-owner-bc/src/modules/notification/entities/notification.entity.ts
import {
Column,
CreateDateColumn,
DeleteDateColumn,
Entity,
Index,
PrimaryGeneratedColumn,
UpdateDateColumn,
} from 'typeorm';
import { AppDatabases } from '@lib/common/enum/app-databases.enum';
import { ITimestamp } from '@lib/common/interfaces/timestamp.interface';
/**
* Notification Entity - Stores all real-time notifications
* Optimized for high-volume writes and department-based queries
*/
@Entity({ name: 'notifications', database: AppDatabases.APP_CORE })
@Index('idx_notifications_department_id', ['department_id'])
@Index('idx_notifications_user_id', ['user_id'])
@Index('idx_notifications_created_at', ['created_at'])
@Index('idx_notifications_department_type', ['department_id', 'notification_type'])
@Index('idx_notifications_read_status', ['is_read', 'department_id'])
export class Notification implements ITimestamp {
@PrimaryGeneratedColumn('uuid')
id: string;
/**
* Department-level notifications are broadcast to all users in a department
* Can be NULL for user-specific notifications
*/
@Column({
type: 'varchar',
length: 20,
nullable: true,
comment: 'Department ID for department-level broadcasts',
})
department_id: string | null;
/**
* User-specific notification
* Can be NULL for department-level broadcasts
*/
@Column({
type: 'uuid',
nullable: true,
comment: 'Target user ID (NULL = department broadcast)',
})
user_id: string | null;
/**
* Notification type categorizes different event types
* Examples: 'ORDER_PLACED', 'ORDER_COMPLETED', 'ALERT', 'RESOURCE_UPDATED'
*/
@Column({
type: 'varchar',
length: 50,
comment: 'Notification type for filtering',
})
notification_type: string;
/**
* Human-readable title
* Examples: 'Order Ready', 'Critical Alert', 'Resource Updated'
*/
@Column({
type: 'varchar',
length: 255,
comment: 'Notification title for display',
})
title: string;
/**
* Detailed message (supports markdown for formatting)
*/
@Column({
type: 'text',
comment: 'Detailed message content',
})
message: string;
/**
* Priority level determines urgency and display style
* 'CRITICAL': Red, sound alert, requires action
* 'HIGH': Orange, prominent display
* 'NORMAL': Blue, standard display
* 'LOW': Gray, can be dismissed
*/
@Column({
type: 'varchar',
length: 20,
default: 'NORMAL',
comment: 'Priority: CRITICAL | HIGH | NORMAL | LOW',
})
priority: 'CRITICAL' | 'HIGH' | 'NORMAL' | 'LOW';
/**
* Reference ID for linking to domain objects
* Examples: order_id, result_id, resource_id, alert_id
*/
@Column({
type: 'uuid',
nullable: true,
comment: 'Reference to related domain object',
})
reference_id: string | null;
/**
* Reference type indicates what domain object is referenced
* Examples: 'ORDER', 'RESULT', 'RESOURCE', 'ALERT'
*/
@Column({
type: 'varchar',
length: 50,
nullable: true,
comment: 'Type of referenced object',
})
reference_type: string | null;
/**
* Metadata as JSON for flexible storage
* Examples: {order_number: "ORD-2025-0001", status: "completed"}
*/
@Column({
type: 'jsonb',
nullable: true,
default: {},
comment: 'Additional data as JSON',
})
metadata: Record<string, any> | null;
/**
* Track read status per user
*/
@Column({
type: 'boolean',
default: false,
comment: 'Has user read this notification',
})
is_read: boolean;
@Column({
type: 'timestamptz',
nullable: true,
comment: 'When notification was read',
})
read_at: Date | null;
@Column({
type: 'boolean',
default: false,
comment: 'Has user acted on this notification',
})
is_actioned: boolean;
@CreateDateColumn({
type: 'timestamptz',
comment: 'When notification was created',
})
created_at: Date;
@UpdateDateColumn({
type: 'timestamptz',
comment: 'Last update timestamp',
})
updated_at: Date;
@DeleteDateColumn({
type: 'timestamptz',
nullable: true,
comment: 'When notification was deleted/archived',
})
deleted_at: Date | null;
@Column({
type: 'uuid',
nullable: true,
comment: 'System/user that created this notification',
})
created_by: string | null;
}

Creating and Broadcasting Notifications:

apps/data-owner-bc/src/modules/notification/services/notification.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { AppDatabases } from '@lib/common/enum/app-databases.enum';
import { Notification } from '../entities/notification.entity';
@Injectable()
export class NotificationService {
private readonly logger = new Logger(NotificationService.name);
constructor(
@InjectRepository(Notification, AppDatabases.APP_CORE)
private readonly notificationRepository: Repository<Notification>,
private readonly eventEmitter: EventEmitter2,
) {}
/**
* Create a department-level notification (broadcast to all users in a department)
*/
async createDepartmentNotification(
departmentId: string,
notificationType: string,
title: string,
message: string,
options?: {
priority?: 'CRITICAL' | 'HIGH' | 'NORMAL' | 'LOW';
referenceId?: string;
referenceType?: string;
metadata?: Record<string, any>;
},
): Promise<Notification> {
const notification = this.notificationRepository.create({
department_id: departmentId,
user_id: null, // Department-level, not user-specific
notification_type: notificationType,
title,
message,
priority: options?.priority || 'NORMAL',
reference_id: options?.referenceId,
reference_type: options?.referenceType,
metadata: options?.metadata || {},
created_by: 'system',
});
const saved = await this.notificationRepository.save(notification);
// Emit event for real-time delivery
this.eventEmitter.emit('notification.created', {
notification: saved,
departmentId,
});
this.logger.log(`Department notification created: ${saved.id} for department ${departmentId}`);
return saved;
}
/**
* Create a user-specific notification
*/
async createUserNotification(
userId: string,
departmentId: string,
notificationType: string,
title: string,
message: string,
options?: {
priority?: 'CRITICAL' | 'HIGH' | 'NORMAL' | 'LOW';
referenceId?: string;
referenceType?: string;
metadata?: Record<string, any>;
},
): Promise<Notification> {
const notification = this.notificationRepository.create({
department_id: departmentId,
user_id: userId,
notification_type: notificationType,
title,
message,
priority: options?.priority || 'NORMAL',
reference_id: options?.referenceId,
reference_type: options?.referenceType,
metadata: options?.metadata || {},
created_by: 'system',
});
const saved = await this.notificationRepository.save(notification);
// Emit event for real-time delivery
this.eventEmitter.emit('notification.created', {
notification: saved,
userId,
});
this.logger.log(`User notification created: ${saved.id} for user ${userId}`);
return saved;
}
async markAsRead(notificationId: string, userId: string): Promise<void> {
await this.notificationRepository.update(
{ id: notificationId, user_id: userId },
{
is_read: true,
read_at: new Date(),
},
);
}
async getUnreadDepartmentNotifications(departmentId: string, limit: number = 50): Promise<Notification[]> {
return this.notificationRepository.find({
where: {
department_id: departmentId,
is_read: false,
deleted_at: null,
},
order: {
created_at: 'DESC',
},
take: limit,
});
}
async getNotificationHistory(
departmentId: string,
options?: {
userId?: string;
notificationType?: string;
startDate?: Date;
endDate?: Date;
page?: number;
limit?: number;
},
): Promise<{ data: Notification[]; total: number }> {
const query = this.notificationRepository
.createQueryBuilder('n')
.where('n.department_id = :departmentId', { departmentId })
.andWhere('n.deleted_at IS NULL');
if (options?.userId) {
query.andWhere('(n.user_id IS NULL OR n.user_id = :userId)', {
userId: options.userId,
});
}
if (options?.notificationType) {
query.andWhere('n.notification_type = :notificationType', {
notificationType: options.notificationType,
});
}
if (options?.startDate) {
query.andWhere('n.created_at >= :startDate', { startDate: options.startDate });
}
if (options?.endDate) {
query.andWhere('n.created_at <= :endDate', { endDate: options.endDate });
}
const [data, total] = await query
.orderBy('n.created_at', 'DESC')
.skip((options?.page || 1 - 1) * (options?.limit || 20))
.take(options?.limit || 20)
.getManyAndCount();
return { data, total };
}
}

Event Listener for Real-Time Notification Delivery:

apps/data-owner-bc/src/modules/notification/listeners/notification-created.listener.ts
import { Injectable, Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { NotificationGateway } from '../gateways/notification.gateway';
@Injectable()
export class NotificationCreatedListener {
private readonly logger = new Logger(NotificationCreatedListener.name);
constructor(private readonly notificationGateway: NotificationGateway) {}
@OnEvent('notification.created')
async handleNotificationCreated(event: {
notification: any;
departmentId?: string;
userId?: string;
}) {
const { notification, departmentId, userId } = event;
if (departmentId) {
// Department-level broadcast
await this.notificationGateway.notifyDepartment(departmentId, {
id: notification.id,
type: notification.notification_type,
title: notification.title,
message: notification.message,
priority: notification.priority,
reference_id: notification.reference_id,
reference_type: notification.reference_type,
metadata: notification.metadata,
created_at: notification.created_at,
});
} else if (userId) {
// User-specific notification
await this.notificationGateway.notifyUser(userId, {
id: notification.id,
type: notification.notification_type,
title: notification.title,
message: notification.message,
priority: notification.priority,
reference_id: notification.reference_id,
reference_type: notification.reference_type,
created_at: notification.created_at,
});
}
}
}

Part 2: Lookup/Read Model Pattern for Real-Time Systems

Section titled β€œPart 2: Lookup/Read Model Pattern for Real-Time Systems”

The Lookup/Read Model Pattern becomes critical for real-time systems. Here’s how to integrate it with notifications.

Real-Time Query Problem:

// ❌ SLOW: JOINs across databases
// Notification has reference_id to an ORDER in master database
// Can't easily cross-database join efficiently
// βœ… FAST: Lookup table in same database
// Use lookup table synchronized from master
SELECT n.*, o.order_code, o.status
FROM notifications n
LEFT JOIN orders_lookup o ON n.reference_id = o.id
WHERE n.created_at > NOW() - INTERVAL '30 days';

Master Table (Source of Truth):

apps/masterdata/src/modules/order/entities/order.entity.ts
@Entity({ name: 'orders', database: AppDatabases.APP_CORE_MASTER })
export class Order implements ITimestamp {
@PrimaryGeneratedColumn('uuid')
id: string;
@Column({ type: 'varchar', length: 20, unique: true })
order_code: string; // e.g., "ORD-2025-000001"
@Column({ type: 'varchar', length: 100 })
item_name: string;
@Column({ type: 'varchar', length: 50 })
status: string; // 'PENDING' | 'PROCESSING' | 'COMPLETED' | 'CANCELLED'
@Column({ type: 'uuid' })
department_id: string;
@Column({ type: 'timestamptz' })
ordered_at: Date;
@Column({ type: 'timestamptz', nullable: true })
completed_at: Date | null;
// ... audit fields
}

Lookup Table (Read Model):

apps/data-owner-bc/src/modules/lookup/entities/order-lookup.entity.ts
@Entity({ name: 'order_lookups', database: AppDatabases.APP_CORE })
@Index('idx_order_lookups_status', ['status'])
@Index('idx_order_lookups_ordered_at', ['ordered_at'])
export class OrderLookup {
@PrimaryColumn({ type: 'varchar', length: 20 })
order_code: string;
@Column({ type: 'uuid' })
id: string;
@Column({ type: 'varchar', length: 100 })
item_name: string;
@Column({ type: 'varchar', length: 50 })
status: string;
@Column({ type: 'uuid' })
department_id: string;
@Column({ type: 'timestamptz' })
ordered_at: Date;
@Column({ type: 'timestamptz', nullable: true })
completed_at: Date | null;
@UpdateDateColumn({ type: 'timestamptz' })
synced_at: Date;
@Column({ type: 'uuid', nullable: true })
master_id: string | null;
}

Event Publishing from Master Database:

apps/masterdata/src/modules/order/services/order.service.ts
@Injectable()
export class OrderService {
constructor(
private readonly eventEmitter: EventEmitter2,
private readonly orderRepository: Repository<Order>,
) {}
async createOrder(dto: CreateOrderDTO): Promise<Order> {
const order = this.orderRepository.create(dto);
const saved = await this.orderRepository.save(order);
// Emit event for other services to sync
this.eventEmitter.emit('order.created', {
id: saved.id,
order_code: saved.order_code,
item_name: saved.item_name,
status: saved.status,
department_id: saved.department_id,
ordered_at: saved.ordered_at,
synced_at: new Date(),
});
return saved;
}
async updateOrderStatus(orderId: string, newStatus: string): Promise<void> {
await this.orderRepository.update(
{ id: orderId },
{ status: newStatus, completed_at: new Date() },
);
// Emit update event
this.eventEmitter.emit('order.status-changed', {
id: orderId,
status: newStatus,
completed_at: new Date(),
synced_at: new Date(),
});
}
}

Event Listener in Data Owner BC (Consumer):

apps/data-owner-bc/src/modules/lookup/listeners/order-lookup-sync.listener.ts
@Injectable()
export class OrderLookupSyncListener {
constructor(
@InjectRepository(OrderLookup, AppDatabases.APP_CORE)
private readonly orderLookupRepository: Repository<OrderLookup>,
) {}
@OnEvent('order.created')
async handleOrderCreated(payload: any) {
await this.orderLookupRepository.upsert(
[
{
order_code: payload.order_code,
id: payload.id,
item_name: payload.item_name,
status: payload.status,
department_id: payload.department_id,
ordered_at: payload.ordered_at,
completed_at: null,
synced_at: new Date(),
master_id: payload.id,
},
],
['order_code'],
);
}
@OnEvent('order.status-changed')
async handleOrderStatusChanged(payload: any) {
await this.orderLookupRepository.update(
{ id: payload.id },
{
status: payload.status,
completed_at: payload.completed_at,
synced_at: new Date(),
},
);
}
}

Problem: Notification table grows rapidly:

  • 1000 concurrent users
  • 50 notifications/user/day
  • 50,000 notifications/day
  • 1.5M notifications/month
  • 18M notifications/year

Solution: Automatic Archival

apps/data-owner-bc/src/modules/notification/services/notification-archival.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { InjectRepository } from '@nestjs/typeorm';
import { In, LessThan, Repository } from 'typeorm';
import { AppDatabases } from '@lib/common/enum/app-databases.enum';
import { Notification } from '../entities/notification.entity';
@Injectable()
export class NotificationArchivalService {
private readonly logger = new Logger(NotificationArchivalService.name);
/**
* Retention policy:
* - Keep in database: Last 30 days (hot storage)
* - Archive to S3: 30-365 days (cold storage)
* - Delete: Older than 1 year
*/
private readonly RETENTION_DAYS = 30;
private readonly ARCHIVE_CUTOFF_DAYS = 365;
constructor(
@InjectRepository(Notification, AppDatabases.APP_CORE)
private readonly notificationRepository: Repository<Notification>,
private readonly s3Service: S3Service,
) {}
/**
* Run archival job nightly at 2 AM
* Move old notifications to object storage
*/
@Cron(CronExpression.EVERY_DAY_AT_2AM, {
name: 'archive-old-notifications',
timeZone: process.env.TZ || 'UTC',
})
async archiveOldNotifications(): Promise<void> {
this.logger.log('Starting notification archival job...');
const archivalDate = new Date();
archivalDate.setDate(archivalDate.getDate() - this.RETENTION_DAYS);
const notificationsToArchive = await this.notificationRepository.find({
where: {
created_at: LessThan(archivalDate),
deleted_at: null,
},
take: 10000, // Process in batches
});
if (notificationsToArchive.length === 0) {
this.logger.log('No notifications to archive');
return;
}
const groupedByDeptAndDate = this.groupNotificationsByDeptAndDate(notificationsToArchive);
let uploadedCount = 0;
for (const [deptKey, notifications] of Object.entries(groupedByDeptAndDate)) {
const [deptId, dateStr] = deptKey.split('::');
const s3Key = `archived-notifications/department/${deptId}/${dateStr}.jsonl`;
try {
const jsonlContent = notifications.map((n) => JSON.stringify(n)).join('\n');
await this.s3Service.putObject(
'app-archives',
s3Key,
Buffer.from(jsonlContent),
{
'Content-Type': 'application/x-ndjson',
'Content-Encoding': 'gzip',
},
);
uploadedCount += notifications.length;
} catch (error) {
this.logger.error(`Failed to archive ${s3Key}:`, error);
}
}
// Delete from database after successful archival
const idsToDelete = notificationsToArchive.map((n) => n.id);
await this.notificationRepository.softDelete({
id: In(idsToDelete),
});
this.logger.log(`Archival complete: ${uploadedCount} notifications archived`);
}
/**
* Cleanup job: Delete permanently archived notifications older than 1 year
* Run quarterly to free up database space
*/
@Cron('0 0 1 1,4,7,10 *', {
name: 'cleanup-old-archived-notifications',
timeZone: process.env.TZ || 'UTC',
})
async cleanupArchivedNotifications(): Promise<void> {
this.logger.log('Starting notification cleanup job...');
const cleanupDate = new Date();
cleanupDate.setFullYear(cleanupDate.getFullYear() - 1);
const result = await this.notificationRepository.delete({
deleted_at: LessThan(cleanupDate),
});
this.logger.log(`Cleanup complete: ${result.affected} notifications deleted`);
}
private groupNotificationsByDeptAndDate(
notifications: Notification[],
): Record<string, Notification[]> {
const grouped: Record<string, Notification[]> = {};
notifications.forEach((notification) => {
const dept = notification.department_id || 'system';
const dateStr = notification.created_at.toISOString().split('T')[0]; // YYYY-MM-DD
const key = `${dept}::${dateStr}`;
if (!grouped[key]) {
grouped[key] = [];
}
grouped[key].push(notification);
});
return grouped;
}
}

Partitioning Strategy for High-Volume Tables:

-- Create partitioned table for notifications
-- Partitions by date range for easier archival and performance
CREATE TABLE notifications_v2 (
id uuid,
department_id varchar(20),
user_id uuid,
notification_type varchar(50),
title varchar(255),
message text,
priority varchar(20),
is_read boolean,
created_at timestamptz,
updated_at timestamptz,
deleted_at timestamptz,
PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (created_at);
-- Create monthly partitions
CREATE TABLE notifications_2025_01 PARTITION OF notifications_v2
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
CREATE TABLE notifications_2025_02 PARTITION OF notifications_v2
FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');
-- Create indexes on partitions
CREATE INDEX idx_notifications_v2_department_id ON notifications_v2 (department_id);
CREATE INDEX idx_notifications_v2_created_at ON notifications_v2 (created_at DESC);

Benefit of Partitioning:

  • Queries on recent dates are faster (smaller table scans)
  • Old partitions can be easily archived or deleted
  • Maintenance (VACUUM, ANALYZE) can run per partition
  • Scales linearly with data growth

1. Unread Badge Count (Cache this!)

apps/data-owner-bc/src/modules/notification/services/notification-badge.service.ts
@Injectable()
export class NotificationBadgeService {
constructor(
private readonly cacheService: CacheService, // Redis
private readonly notificationRepository: Repository<Notification>,
) {}
/**
* Get unread count for user
* Cached for 5 minutes to reduce database load
*/
async getUnreadCount(userId: string, departmentId: string): Promise<number> {
const cacheKey = `unread-badge:${userId}:${departmentId}`;
// Check cache first
const cached = await this.cacheService.get(cacheKey);
if (cached !== null) {
return parseInt(cached, 10);
}
// Query database if not cached
const count = await this.notificationRepository.count({
where: {
department_id: departmentId,
is_read: false,
deleted_at: null,
},
});
// Cache for 5 minutes
await this.cacheService.set(cacheKey, count.toString(), 300);
return count;
}
/**
* Invalidate cache when new notification arrives
*/
async invalidateBadgeCache(departmentId: string, userId?: string): Promise<void> {
if (userId) {
await this.cacheService.del(`unread-badge:${userId}:${departmentId}`);
} else {
// Invalidate all users in department (using Redis SCAN pattern)
const keys = await this.cacheService.keys(`unread-badge:*:${departmentId}`);
if (keys.length > 0) {
await this.cacheService.del(...keys);
}
}
}
}

2. Recent Notifications Feed (Paginated)

// Optimized query with proper indexes
SELECT
n.id,
n.notification_type,
n.title,
n.message,
n.priority,
n.is_read,
n.created_at,
n.reference_id,
n.reference_type,
-- Join with lookup table for readable data
COALESCE(ol.order_code, '') as reference_code,
COALESCE(ol.status, '') as reference_status
FROM notifications n
LEFT JOIN order_lookups ol
ON n.reference_type = 'ORDER'
AND n.reference_id = ol.id
WHERE
n.department_id = $1 -- Indexed
AND n.created_at > NOW() - INTERVAL '30 days' -- Partition pruning
AND n.deleted_at IS NULL
ORDER BY n.created_at DESC
LIMIT 50;
-- Query plan should use:
-- Index: idx_notifications_department_id
-- Index: idx_notifications_created_at