Real-Time Database Architecture
Overview
Section titled βOverviewβThis guide covers database design and architecture patterns for real-time enterprise systems. Building on the infrastructure setup covered in Part 1, this document focuses on data persistence, real-time event synchronization, and notification system architecture.
Key Topics:
- Notification system data model design
- Event-driven database synchronization
- Lookup table patterns for real-time queries
- Data lifecycle management and archival
- High-volume data handling strategies
- Real-time query optimization
Document Structure
Section titled βDocument StructureβThis guide follows a theoretical-to-practical approach:
- Theoretical Foundations - Event-driven architecture, CQRS, eventual consistency
- Data Modeling - Database schema design for real-time systems
- Synchronization Patterns - Event publishing and consumption strategies
- Performance Optimization - Query optimization and caching strategies
- Operational Concerns - Data lifecycle, archival, and maintenance
Theoretical Foundations
Section titled βTheoretical FoundationsβEvent-Driven Architecture Principles
Section titled βEvent-Driven Architecture PrinciplesβCore Concepts
Section titled βCore ConceptsβEvent-driven architecture (EDA) is fundamental to building responsive, scalable real-time systems.
Key Principles:
Event-Driven Architecture:
1. Events as First-Class Citizens ββ Events represent "something happened" ββ Events are immutable facts ββ Events trigger downstream processing
2. Loose Coupling ββ Producer doesn't know about consumers ββ Consumers can be added/removed without affecting producers ββ Allows independent scaling
3. Asynchronous Communication ββ Producer doesn't wait for consumer response ββ Better performance and resilience ββ Requires handling eventual consistency
4. Event Sourcing (Optional) ββ Events are the source of truth ββ Current state derived from event history ββ Complete audit trailApplication to Enterprise Systems:
Event Flow:Order Created (Event)ββ Triggers: ββ Update lookup table (for fast queries) ββ Create notification (for real-time alerts) ββ Update analytics (for reporting) ββ Trigger billing process (for accounting)
All triggered asynchronously, independentlyCQRS Pattern (Command Query Responsibility Segregation)
Section titled βCQRS Pattern (Command Query Responsibility Segregation)βCQRS is crucial for real-time systems handling high read and write loads.
Theory:
Traditional Architecture:βββββββββββββββββββββ Application βββββββββββ¬ββββββββββ β ββββ Read βββ ββββ Write βββ€ β β ββββββΌβββββββββββββΌββββ β Single Database β β (Normalized Schema)β βββββββββββββββββββββββ
Problem:- Reads optimized for queries need denormalization- Writes optimized for integrity need normalization- Can't optimize for both simultaneously
CQRS Architecture:βββββββββββββββββββββ Application βββββββ¬ββββββββββ¬ββββ β β Write Read β βββββββΌββββ βββΌββββββββββββββββ Master β β Read Models ββDatabaseββββ (Lookup Tables)ββ(Write) β β (Optimized ββ β β for queries) βββββββββββ ββββββββββββββββββ β Event BusOur Implementation:
CQRS in Enterprise Systems:
Command Side (Write):ββ Master Database: app_master_dbββ Normalized schemaββ Enforces referential integrityββ Source of truthββ Example: orders table
Query Side (Read):ββ Lookup Tables: app_core_dbββ Denormalized for performanceββ Optimized indexesββ Eventually consistent with masterββ Example: orders_lookup table
Synchronization:ββ Event-driven updates via message busBenefits for Real-Time Systems:
| Aspect | Traditional | CQRS | Impact |
|---|---|---|---|
| Read Performance | Medium | High | Faster notification queries |
| Write Performance | Medium | High | No read locks during writes |
| Scalability | Limited | Infinite | Read replicas scale separately |
| Data Consistency | Immediate | Eventual | Trade-off for performance |
| Query Complexity | High | Low | Simpler joins, faster execution |
Eventual Consistency Theory
Section titled βEventual Consistency TheoryβUnderstanding Consistency Models
Section titled βUnderstanding Consistency ModelsβTraditional ACID vs. BASE:
ACID (Strong Consistency):ββ Atomicity: All or nothingββ Consistency: Valid state alwaysββ Isolation: Transactions don't interfereββ Durability: Committed = permanent
Benefits: Data integrity guaranteedCost: Performance, availability, scalability
BASE (Eventual Consistency):ββ Basically Available: System operationalββ Soft state: State may change over timeββ Eventually consistent: Will be consistent
Benefits: High performance, availabilityCost: Temporary inconsistenciesConsistency Levels in Our Architecture:
Component Consistency Level RationaleβββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββMaster Database ACID (Strong) Source of truthLookup Tables BASE (Eventual) PerformanceNotification Delivery BASE (Eventual) Low latencyAudit Logs ACID (Strong) ComplianceCache (Redis) Eventually Deleted PerformanceConsistency Boundaries
Section titled βConsistency BoundariesβAcceptable Inconsistency Window:
Critical Business Data:ββ Max inconsistency: 0ms (ACID required)ββ Example: Current order statusββ Solution: Direct master database queries
Order Processing Status:ββ Max inconsistency: 1-2 secondsββ Example: Order status changed from "pending" to "completed"ββ Solution: Event-driven lookup table sync
Notifications:ββ Max inconsistency: 2-5 secondsββ Example: "Order ready" notificationββ Solution: Async notification creation + WebSocket broadcast
Analytics/Reporting:ββ Max inconsistency: 5-60 secondsββ Example: Daily order countββ Solution: Batch processing, materialized viewsData Flow Analysis
Section titled βData Flow AnalysisβEnd-to-End Data Flow
Section titled βEnd-to-End Data FlowβUnderstanding how data flows through the system:
sequenceDiagram
participant User as Staff
participant UI as Frontend UI
participant API as REST API
participant MasterDB as Master Database<br/>(Write Model)
participant EventBus as Event Bus<br/>(RabbitMQ/Redis)
participant Worker1 as Lookup Sync Worker
participant Worker2 as Notification Worker
participant LookupDB as Lookup Table<br/>(Read Model)
participant NotifDB as Notifications Table
participant Socket as Socket.io Gateway
participant Client as Connected Clients
User->>UI: Create Order
UI->>API: POST /api/orders
API->>MasterDB: INSERT INTO orders
MasterDB-->>API: Success (ID: 12345)
API-->>UI: Order Created
UI-->>User: Success Message
Note over API,EventBus: Asynchronous Processing Begins
API->>EventBus: Publish: order.created<br/>{id: 12345, ...}
par Parallel Event Processing
EventBus->>Worker1: Event: order.created
Worker1->>LookupDB: UPSERT orders_lookup
Note over LookupDB: Lookup table now<br/>eventually consistent
and
EventBus->>Worker2: Event: order.created
Worker2->>NotifDB: INSERT notification
Worker2->>Socket: Broadcast to department
Socket->>Client: Real-time notification
Note over Client: User sees notification<br/>within 1-2 seconds
end
Note over MasterDB,Client: Total latency: 50-200ms<br/>Event processing: 100-2000ms async
Latency Breakdown:
User Action to Database Write:ββ Network (client β server): 10-50msββ API processing: 5-20msββ Database write: 5-30msββ Response (server β client): 10-50msββ Total: 30-150ms (user sees success)
Event Processing (asynchronous):ββ Event publish: 1-5msββ Event bus latency: 5-20msββ Worker pickup: 10-50msββ Lookup table update: 10-50msββ Notification creation: 10-50msββ WebSocket broadcast: 5-20msββ Total: 41-195ms (clients notified)
End-to-end (user action β all clients notified): 71-345msPerformance Optimization Theory
Section titled βPerformance Optimization TheoryβDatabase Index Strategy
Section titled βDatabase Index StrategyβIndex Selection Criteria:
When to create an index:
1. High Selectivity Queries ββ SELECT WHERE column = value ββ Selectivity = distinct_values / total_rows ββ Index if selectivity > 0.1 (10%)
2. Frequently Used Filters ββ Track query frequency ββ Index top 20% most common WHERE clauses ββ Cost of index < benefit from faster queries
3. Foreign Keys ββ Always index foreign keys ββ Prevents full table scans on JOINs
4. Composite Indexes ββ For queries with multiple WHERE conditions ββ Order columns by selectivity (high β low) ββ Example: (department_id, created_at) for department-based time queriesIndex Cost-Benefit Analysis:
Index Benefits:ββ Read speed: 1000x faster for indexed lookupsββ Query time: O(log n) instead of O(n)ββ Scalability: Performance doesn't degrade with data growth
Index Costs:ββ Write overhead: ~10-30% slower INSERTsββ Storage: ~20-50% additional disk spaceββ Maintenance: VACUUM/ANALYZE overheadββ Memory: Index cache competes with data cache
Decision Rule:Create index if: (Query frequency Γ Time saved) > (Write overhead + Storage cost)Caching Strategy
Section titled βCaching StrategyβCache Hierarchy:
Multi-Level Caching:
Level 1: Application Memory (Fastest)ββ In-process cache (Node.js)ββ Latency: < 1msββ Size: ~100MB per processββ Use for: Configuration, lookup maps
Level 2: Redis (Fast)ββ Distributed cacheββ Latency: 1-5msββ Size: ~16GB sharedββ Use for: User sessions, badge counts, recent queries
Level 3: Database (Slower but consistent)ββ Lookup tablesββ Latency: 10-50msββ Size: Unlimitedββ Use for: All persistent dataCache Invalidation Strategies:
Time-To-Live (TTL):ββ Set expiration time on cache entriesββ Simple, predictableββ Trade-off: Stale data vs. cache miss rateββ Example: Badge count cached for 5 minutes
Event-Driven Invalidation:ββ Invalidate cache when source data changesββ Most accurateββ Complex to implementββ Example: Invalidate department badge when new notification arrives
Write-Through:ββ Update cache AND database simultaneouslyββ Cache always consistentββ Higher write latencyββ Example: User preferences
Write-Behind:ββ Update cache immediately, database asynchronouslyββ Lowest latencyββ Risk of data lossββ Example: High-frequency metrics (careful!)Part 1: Real-Time Notification System
Section titled βPart 1: Real-Time Notification SystemβNotification System Architecture Overview
Section titled βNotification System Architecture OverviewβA robust real-time system requires a notification system that:
- Persists notifications for later retrieval
- Delivers instantly via WebSocket
- Handles high volume (1000+ events/sec during peak)
- Manages lifecycle (archive old data, cleanup)
- Supports querying (find notifications by user, type, date)
Architecture Diagram:
graph TD
subgraph "Event Sources"
OrderPlaced["Order Placed"]
OrderCompleted["Order Completed"]
ResourceUpdated["Resource Updated"]
AlertTriggered["Alert Triggered"]
end
subgraph "Event Processing"
EventBus["Event Bus<br/>(RabbitMQ/Redis Streams)"]
NotificationService["Notification Service<br/>Create & Persist"]
end
subgraph "Storage"
NotificationTable["Notifications Table<br/>(Recent: last 30 days)"]
ArchivedNotifications["Archived Notifications<br/>(Object Storage)"]
end
subgraph "Real-Time Delivery"
RedisAdapter["Redis Adapter"]
Socket["Socket.io<br/>Broadcast to Departments"]
end
subgraph "User Access"
WebSocket["Connected Clients"]
API["REST API<br/>Query History"]
end
OrderPlaced --> EventBus
OrderCompleted --> EventBus
ResourceUpdated --> EventBus
AlertTriggered --> EventBus
EventBus --> NotificationService
NotificationService --> NotificationTable
NotificationService --> RedisAdapter
NotificationTable --> ArchivedNotifications
NotificationTable --> API
RedisAdapter --> Socket
Socket --> WebSocket
style EventBus fill:#fff4e1,stroke:#ff9800,stroke-width:2px
style NotificationTable fill:#e1f5ff,stroke:#0288d1,stroke-width:2px
style Socket fill:#e8f5e9,stroke:#4caf50,stroke-width:2px
Notification Data Model
Section titled βNotification Data ModelβEntity Definition:
import { Column, CreateDateColumn, DeleteDateColumn, Entity, Index, PrimaryGeneratedColumn, UpdateDateColumn,} from 'typeorm';
import { AppDatabases } from '@lib/common/enum/app-databases.enum';import { ITimestamp } from '@lib/common/interfaces/timestamp.interface';
/** * Notification Entity - Stores all real-time notifications * Optimized for high-volume writes and department-based queries */@Entity({ name: 'notifications', database: AppDatabases.APP_CORE })@Index('idx_notifications_department_id', ['department_id'])@Index('idx_notifications_user_id', ['user_id'])@Index('idx_notifications_created_at', ['created_at'])@Index('idx_notifications_department_type', ['department_id', 'notification_type'])@Index('idx_notifications_read_status', ['is_read', 'department_id'])export class Notification implements ITimestamp { @PrimaryGeneratedColumn('uuid') id: string;
/** * Department-level notifications are broadcast to all users in a department * Can be NULL for user-specific notifications */ @Column({ type: 'varchar', length: 20, nullable: true, comment: 'Department ID for department-level broadcasts', }) department_id: string | null;
/** * User-specific notification * Can be NULL for department-level broadcasts */ @Column({ type: 'uuid', nullable: true, comment: 'Target user ID (NULL = department broadcast)', }) user_id: string | null;
/** * Notification type categorizes different event types * Examples: 'ORDER_PLACED', 'ORDER_COMPLETED', 'ALERT', 'RESOURCE_UPDATED' */ @Column({ type: 'varchar', length: 50, comment: 'Notification type for filtering', }) notification_type: string;
/** * Human-readable title * Examples: 'Order Ready', 'Critical Alert', 'Resource Updated' */ @Column({ type: 'varchar', length: 255, comment: 'Notification title for display', }) title: string;
/** * Detailed message (supports markdown for formatting) */ @Column({ type: 'text', comment: 'Detailed message content', }) message: string;
/** * Priority level determines urgency and display style * 'CRITICAL': Red, sound alert, requires action * 'HIGH': Orange, prominent display * 'NORMAL': Blue, standard display * 'LOW': Gray, can be dismissed */ @Column({ type: 'varchar', length: 20, default: 'NORMAL', comment: 'Priority: CRITICAL | HIGH | NORMAL | LOW', }) priority: 'CRITICAL' | 'HIGH' | 'NORMAL' | 'LOW';
/** * Reference ID for linking to domain objects * Examples: order_id, result_id, resource_id, alert_id */ @Column({ type: 'uuid', nullable: true, comment: 'Reference to related domain object', }) reference_id: string | null;
/** * Reference type indicates what domain object is referenced * Examples: 'ORDER', 'RESULT', 'RESOURCE', 'ALERT' */ @Column({ type: 'varchar', length: 50, nullable: true, comment: 'Type of referenced object', }) reference_type: string | null;
/** * Metadata as JSON for flexible storage * Examples: {order_number: "ORD-2025-0001", status: "completed"} */ @Column({ type: 'jsonb', nullable: true, default: {}, comment: 'Additional data as JSON', }) metadata: Record<string, any> | null;
/** * Track read status per user */ @Column({ type: 'boolean', default: false, comment: 'Has user read this notification', }) is_read: boolean;
@Column({ type: 'timestamptz', nullable: true, comment: 'When notification was read', }) read_at: Date | null;
@Column({ type: 'boolean', default: false, comment: 'Has user acted on this notification', }) is_actioned: boolean;
@CreateDateColumn({ type: 'timestamptz', comment: 'When notification was created', }) created_at: Date;
@UpdateDateColumn({ type: 'timestamptz', comment: 'Last update timestamp', }) updated_at: Date;
@DeleteDateColumn({ type: 'timestamptz', nullable: true, comment: 'When notification was deleted/archived', }) deleted_at: Date | null;
@Column({ type: 'uuid', nullable: true, comment: 'System/user that created this notification', }) created_by: string | null;}Notification Service Implementation
Section titled βNotification Service ImplementationβCreating and Broadcasting Notifications:
import { Injectable, Logger } from '@nestjs/common';import { EventEmitter2 } from '@nestjs/event-emitter';import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { AppDatabases } from '@lib/common/enum/app-databases.enum';
import { Notification } from '../entities/notification.entity';
@Injectable()export class NotificationService { private readonly logger = new Logger(NotificationService.name);
constructor( @InjectRepository(Notification, AppDatabases.APP_CORE) private readonly notificationRepository: Repository<Notification>, private readonly eventEmitter: EventEmitter2, ) {}
/** * Create a department-level notification (broadcast to all users in a department) */ async createDepartmentNotification( departmentId: string, notificationType: string, title: string, message: string, options?: { priority?: 'CRITICAL' | 'HIGH' | 'NORMAL' | 'LOW'; referenceId?: string; referenceType?: string; metadata?: Record<string, any>; }, ): Promise<Notification> { const notification = this.notificationRepository.create({ department_id: departmentId, user_id: null, // Department-level, not user-specific notification_type: notificationType, title, message, priority: options?.priority || 'NORMAL', reference_id: options?.referenceId, reference_type: options?.referenceType, metadata: options?.metadata || {}, created_by: 'system', });
const saved = await this.notificationRepository.save(notification);
// Emit event for real-time delivery this.eventEmitter.emit('notification.created', { notification: saved, departmentId, });
this.logger.log(`Department notification created: ${saved.id} for department ${departmentId}`);
return saved; }
/** * Create a user-specific notification */ async createUserNotification( userId: string, departmentId: string, notificationType: string, title: string, message: string, options?: { priority?: 'CRITICAL' | 'HIGH' | 'NORMAL' | 'LOW'; referenceId?: string; referenceType?: string; metadata?: Record<string, any>; }, ): Promise<Notification> { const notification = this.notificationRepository.create({ department_id: departmentId, user_id: userId, notification_type: notificationType, title, message, priority: options?.priority || 'NORMAL', reference_id: options?.referenceId, reference_type: options?.referenceType, metadata: options?.metadata || {}, created_by: 'system', });
const saved = await this.notificationRepository.save(notification);
// Emit event for real-time delivery this.eventEmitter.emit('notification.created', { notification: saved, userId, });
this.logger.log(`User notification created: ${saved.id} for user ${userId}`);
return saved; }
async markAsRead(notificationId: string, userId: string): Promise<void> { await this.notificationRepository.update( { id: notificationId, user_id: userId }, { is_read: true, read_at: new Date(), }, ); }
async getUnreadDepartmentNotifications(departmentId: string, limit: number = 50): Promise<Notification[]> { return this.notificationRepository.find({ where: { department_id: departmentId, is_read: false, deleted_at: null, }, order: { created_at: 'DESC', }, take: limit, }); }
async getNotificationHistory( departmentId: string, options?: { userId?: string; notificationType?: string; startDate?: Date; endDate?: Date; page?: number; limit?: number; }, ): Promise<{ data: Notification[]; total: number }> { const query = this.notificationRepository .createQueryBuilder('n') .where('n.department_id = :departmentId', { departmentId }) .andWhere('n.deleted_at IS NULL');
if (options?.userId) { query.andWhere('(n.user_id IS NULL OR n.user_id = :userId)', { userId: options.userId, }); }
if (options?.notificationType) { query.andWhere('n.notification_type = :notificationType', { notificationType: options.notificationType, }); }
if (options?.startDate) { query.andWhere('n.created_at >= :startDate', { startDate: options.startDate }); }
if (options?.endDate) { query.andWhere('n.created_at <= :endDate', { endDate: options.endDate }); }
const [data, total] = await query .orderBy('n.created_at', 'DESC') .skip((options?.page || 1 - 1) * (options?.limit || 20)) .take(options?.limit || 20) .getManyAndCount();
return { data, total }; }}Real-Time Broadcasting with Socket.io
Section titled βReal-Time Broadcasting with Socket.ioβEvent Listener for Real-Time Notification Delivery:
import { Injectable, Logger } from '@nestjs/common';import { OnEvent } from '@nestjs/event-emitter';
import { NotificationGateway } from '../gateways/notification.gateway';
@Injectable()export class NotificationCreatedListener { private readonly logger = new Logger(NotificationCreatedListener.name);
constructor(private readonly notificationGateway: NotificationGateway) {}
@OnEvent('notification.created') async handleNotificationCreated(event: { notification: any; departmentId?: string; userId?: string; }) { const { notification, departmentId, userId } = event;
if (departmentId) { // Department-level broadcast await this.notificationGateway.notifyDepartment(departmentId, { id: notification.id, type: notification.notification_type, title: notification.title, message: notification.message, priority: notification.priority, reference_id: notification.reference_id, reference_type: notification.reference_type, metadata: notification.metadata, created_at: notification.created_at, }); } else if (userId) { // User-specific notification await this.notificationGateway.notifyUser(userId, { id: notification.id, type: notification.notification_type, title: notification.title, message: notification.message, priority: notification.priority, reference_id: notification.reference_id, reference_type: notification.reference_type, created_at: notification.created_at, }); } }}Part 2: Lookup/Read Model Pattern for Real-Time Systems
Section titled βPart 2: Lookup/Read Model Pattern for Real-Time SystemsβLookup Table Integration
Section titled βLookup Table IntegrationβThe Lookup/Read Model Pattern becomes critical for real-time systems. Hereβs how to integrate it with notifications.
Real-Time Query Problem:
// β SLOW: JOINs across databases// Notification has reference_id to an ORDER in master database// Can't easily cross-database join efficiently
// β
FAST: Lookup table in same database// Use lookup table synchronized from masterSELECT n.*, o.order_code, o.statusFROM notifications nLEFT JOIN orders_lookup o ON n.reference_id = o.idWHERE n.created_at > NOW() - INTERVAL '30 days';Orders Lookup Table
Section titled βOrders Lookup TableβMaster Table (Source of Truth):
@Entity({ name: 'orders', database: AppDatabases.APP_CORE_MASTER })export class Order implements ITimestamp { @PrimaryGeneratedColumn('uuid') id: string;
@Column({ type: 'varchar', length: 20, unique: true }) order_code: string; // e.g., "ORD-2025-000001"
@Column({ type: 'varchar', length: 100 }) item_name: string;
@Column({ type: 'varchar', length: 50 }) status: string; // 'PENDING' | 'PROCESSING' | 'COMPLETED' | 'CANCELLED'
@Column({ type: 'uuid' }) department_id: string;
@Column({ type: 'timestamptz' }) ordered_at: Date;
@Column({ type: 'timestamptz', nullable: true }) completed_at: Date | null;
// ... audit fields}Lookup Table (Read Model):
@Entity({ name: 'order_lookups', database: AppDatabases.APP_CORE })@Index('idx_order_lookups_status', ['status'])@Index('idx_order_lookups_ordered_at', ['ordered_at'])export class OrderLookup { @PrimaryColumn({ type: 'varchar', length: 20 }) order_code: string;
@Column({ type: 'uuid' }) id: string;
@Column({ type: 'varchar', length: 100 }) item_name: string;
@Column({ type: 'varchar', length: 50 }) status: string;
@Column({ type: 'uuid' }) department_id: string;
@Column({ type: 'timestamptz' }) ordered_at: Date;
@Column({ type: 'timestamptz', nullable: true }) completed_at: Date | null;
@UpdateDateColumn({ type: 'timestamptz' }) synced_at: Date;
@Column({ type: 'uuid', nullable: true }) master_id: string | null;}Event-Driven Synchronization
Section titled βEvent-Driven SynchronizationβEvent Publishing from Master Database:
@Injectable()export class OrderService { constructor( private readonly eventEmitter: EventEmitter2, private readonly orderRepository: Repository<Order>, ) {}
async createOrder(dto: CreateOrderDTO): Promise<Order> { const order = this.orderRepository.create(dto); const saved = await this.orderRepository.save(order);
// Emit event for other services to sync this.eventEmitter.emit('order.created', { id: saved.id, order_code: saved.order_code, item_name: saved.item_name, status: saved.status, department_id: saved.department_id, ordered_at: saved.ordered_at, synced_at: new Date(), });
return saved; }
async updateOrderStatus(orderId: string, newStatus: string): Promise<void> { await this.orderRepository.update( { id: orderId }, { status: newStatus, completed_at: new Date() }, );
// Emit update event this.eventEmitter.emit('order.status-changed', { id: orderId, status: newStatus, completed_at: new Date(), synced_at: new Date(), }); }}Event Listener in Data Owner BC (Consumer):
@Injectable()export class OrderLookupSyncListener { constructor( @InjectRepository(OrderLookup, AppDatabases.APP_CORE) private readonly orderLookupRepository: Repository<OrderLookup>, ) {}
@OnEvent('order.created') async handleOrderCreated(payload: any) { await this.orderLookupRepository.upsert( [ { order_code: payload.order_code, id: payload.id, item_name: payload.item_name, status: payload.status, department_id: payload.department_id, ordered_at: payload.ordered_at, completed_at: null, synced_at: new Date(), master_id: payload.id, }, ], ['order_code'], ); }
@OnEvent('order.status-changed') async handleOrderStatusChanged(payload: any) { await this.orderLookupRepository.update( { id: payload.id }, { status: payload.status, completed_at: payload.completed_at, synced_at: new Date(), }, ); }}Part 3: Data Lifecycle and Archival
Section titled βPart 3: Data Lifecycle and ArchivalβNotification Retention Policy
Section titled βNotification Retention PolicyβProblem: Notification table grows rapidly:
- 1000 concurrent users
- 50 notifications/user/day
- 50,000 notifications/day
- 1.5M notifications/month
- 18M notifications/year
Solution: Automatic Archival
import { Injectable, Logger } from '@nestjs/common';import { Cron, CronExpression } from '@nestjs/schedule';import { InjectRepository } from '@nestjs/typeorm';
import { In, LessThan, Repository } from 'typeorm';
import { AppDatabases } from '@lib/common/enum/app-databases.enum';
import { Notification } from '../entities/notification.entity';
@Injectable()export class NotificationArchivalService { private readonly logger = new Logger(NotificationArchivalService.name);
/** * Retention policy: * - Keep in database: Last 30 days (hot storage) * - Archive to S3: 30-365 days (cold storage) * - Delete: Older than 1 year */ private readonly RETENTION_DAYS = 30; private readonly ARCHIVE_CUTOFF_DAYS = 365;
constructor( @InjectRepository(Notification, AppDatabases.APP_CORE) private readonly notificationRepository: Repository<Notification>, private readonly s3Service: S3Service, ) {}
/** * Run archival job nightly at 2 AM * Move old notifications to object storage */ @Cron(CronExpression.EVERY_DAY_AT_2AM, { name: 'archive-old-notifications', timeZone: process.env.TZ || 'UTC', }) async archiveOldNotifications(): Promise<void> { this.logger.log('Starting notification archival job...');
const archivalDate = new Date(); archivalDate.setDate(archivalDate.getDate() - this.RETENTION_DAYS);
const notificationsToArchive = await this.notificationRepository.find({ where: { created_at: LessThan(archivalDate), deleted_at: null, }, take: 10000, // Process in batches });
if (notificationsToArchive.length === 0) { this.logger.log('No notifications to archive'); return; }
const groupedByDeptAndDate = this.groupNotificationsByDeptAndDate(notificationsToArchive);
let uploadedCount = 0; for (const [deptKey, notifications] of Object.entries(groupedByDeptAndDate)) { const [deptId, dateStr] = deptKey.split('::'); const s3Key = `archived-notifications/department/${deptId}/${dateStr}.jsonl`;
try { const jsonlContent = notifications.map((n) => JSON.stringify(n)).join('\n');
await this.s3Service.putObject( 'app-archives', s3Key, Buffer.from(jsonlContent), { 'Content-Type': 'application/x-ndjson', 'Content-Encoding': 'gzip', }, );
uploadedCount += notifications.length; } catch (error) { this.logger.error(`Failed to archive ${s3Key}:`, error); } }
// Delete from database after successful archival const idsToDelete = notificationsToArchive.map((n) => n.id); await this.notificationRepository.softDelete({ id: In(idsToDelete), });
this.logger.log(`Archival complete: ${uploadedCount} notifications archived`); }
/** * Cleanup job: Delete permanently archived notifications older than 1 year * Run quarterly to free up database space */ @Cron('0 0 1 1,4,7,10 *', { name: 'cleanup-old-archived-notifications', timeZone: process.env.TZ || 'UTC', }) async cleanupArchivedNotifications(): Promise<void> { this.logger.log('Starting notification cleanup job...');
const cleanupDate = new Date(); cleanupDate.setFullYear(cleanupDate.getFullYear() - 1);
const result = await this.notificationRepository.delete({ deleted_at: LessThan(cleanupDate), });
this.logger.log(`Cleanup complete: ${result.affected} notifications deleted`); }
private groupNotificationsByDeptAndDate( notifications: Notification[], ): Record<string, Notification[]> { const grouped: Record<string, Notification[]> = {};
notifications.forEach((notification) => { const dept = notification.department_id || 'system'; const dateStr = notification.created_at.toISOString().split('T')[0]; // YYYY-MM-DD
const key = `${dept}::${dateStr}`; if (!grouped[key]) { grouped[key] = []; } grouped[key].push(notification); });
return grouped; }}Database Maintenance
Section titled βDatabase MaintenanceβPartitioning Strategy for High-Volume Tables:
-- Create partitioned table for notifications-- Partitions by date range for easier archival and performance
CREATE TABLE notifications_v2 ( id uuid, department_id varchar(20), user_id uuid, notification_type varchar(50), title varchar(255), message text, priority varchar(20), is_read boolean, created_at timestamptz, updated_at timestamptz, deleted_at timestamptz, PRIMARY KEY (id, created_at)) PARTITION BY RANGE (created_at);
-- Create monthly partitionsCREATE TABLE notifications_2025_01 PARTITION OF notifications_v2 FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
CREATE TABLE notifications_2025_02 PARTITION OF notifications_v2 FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');
-- Create indexes on partitionsCREATE INDEX idx_notifications_v2_department_id ON notifications_v2 (department_id);CREATE INDEX idx_notifications_v2_created_at ON notifications_v2 (created_at DESC);Benefit of Partitioning:
- Queries on recent dates are faster (smaller table scans)
- Old partitions can be easily archived or deleted
- Maintenance (VACUUM, ANALYZE) can run per partition
- Scales linearly with data growth
Part 4: Real-Time Query Optimization
Section titled βPart 4: Real-Time Query OptimizationβQuery Patterns for Common UI Scenarios
Section titled βQuery Patterns for Common UI Scenariosβ1. Unread Badge Count (Cache this!)
@Injectable()export class NotificationBadgeService { constructor( private readonly cacheService: CacheService, // Redis private readonly notificationRepository: Repository<Notification>, ) {}
/** * Get unread count for user * Cached for 5 minutes to reduce database load */ async getUnreadCount(userId: string, departmentId: string): Promise<number> { const cacheKey = `unread-badge:${userId}:${departmentId}`;
// Check cache first const cached = await this.cacheService.get(cacheKey); if (cached !== null) { return parseInt(cached, 10); }
// Query database if not cached const count = await this.notificationRepository.count({ where: { department_id: departmentId, is_read: false, deleted_at: null, }, });
// Cache for 5 minutes await this.cacheService.set(cacheKey, count.toString(), 300);
return count; }
/** * Invalidate cache when new notification arrives */ async invalidateBadgeCache(departmentId: string, userId?: string): Promise<void> { if (userId) { await this.cacheService.del(`unread-badge:${userId}:${departmentId}`); } else { // Invalidate all users in department (using Redis SCAN pattern) const keys = await this.cacheService.keys(`unread-badge:*:${departmentId}`); if (keys.length > 0) { await this.cacheService.del(...keys); } } }}2. Recent Notifications Feed (Paginated)
// Optimized query with proper indexesSELECT n.id, n.notification_type, n.title, n.message, n.priority, n.is_read, n.created_at, n.reference_id, n.reference_type, -- Join with lookup table for readable data COALESCE(ol.order_code, '') as reference_code, COALESCE(ol.status, '') as reference_statusFROM notifications nLEFT JOIN order_lookups ol ON n.reference_type = 'ORDER' AND n.reference_id = ol.idWHERE n.department_id = $1 -- Indexed AND n.created_at > NOW() - INTERVAL '30 days' -- Partition pruning AND n.deleted_at IS NULLORDER BY n.created_at DESCLIMIT 50;
-- Query plan should use:-- Index: idx_notifications_department_id-- Index: idx_notifications_created_at