diff --git a/package-lock.json b/package-lock.json index ace372d..a1d8afb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,6 +14,7 @@ "@nestjs/common": "^11.0.1", "@nestjs/config": "^4.0.4", "@nestjs/core": "^11.0.1", + "@nestjs/event-emitter": "^3.1.0", "@nestjs/jwt": "^11.0.2", "@nestjs/passport": "^11.0.5", "@nestjs/platform-express": "^11.0.1", @@ -2555,6 +2556,19 @@ } } }, + "node_modules/@nestjs/event-emitter": { + "version": "3.1.0", + "resolved": "https://registry.npmmirror.com/@nestjs/event-emitter/-/event-emitter-3.1.0.tgz", + "integrity": "sha512-DOY/4XBGyIjYyOJKkO6jl1kzFE0ZfX0wV+M2HR5NWymPT9Z0zdCEcZGxTXXkoMRwPtglnvCGJALSjOpXPIcM3g==", + "license": "MIT", + "dependencies": { + "eventemitter2": "6.4.9" + }, + "peerDependencies": { + "@nestjs/common": "^10.0.0 || ^11.0.0", + "@nestjs/core": "^10.0.0 || ^11.0.0" + } + }, "node_modules/@nestjs/jwt": { "version": "11.0.2", "resolved": "https://registry.npmmirror.com/@nestjs/jwt/-/jwt-11.0.2.tgz", @@ -5972,6 +5986,12 @@ "node": ">= 0.6" } }, + "node_modules/eventemitter2": { + "version": "6.4.9", + "resolved": "https://registry.npmmirror.com/eventemitter2/-/eventemitter2-6.4.9.tgz", + "integrity": "sha512-JEPTiaOt9f04oa6NOkc4aH+nVp5I3wEjpHbIPqfgCdD5v5bUzy7xQqwcVO2aDQgOWhI28da57HksMrzK9HlRxg==", + "license": "MIT" + }, "node_modules/events": { "version": "3.3.0", "resolved": "https://registry.npmmirror.com/events/-/events-3.3.0.tgz", diff --git a/package.json b/package.json index 5a21324..f7e765e 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "@nestjs/common": "^11.0.1", "@nestjs/config": "^4.0.4", "@nestjs/core": "^11.0.1", + "@nestjs/event-emitter": "^3.1.0", "@nestjs/jwt": "^11.0.2", "@nestjs/passport": "^11.0.5", "@nestjs/platform-express": "^11.0.1", diff --git a/src/app.module.ts b/src/app.module.ts index 9d5f77d..523012c 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -6,6 +6,7 @@ import { JwtModule } from '@nestjs/jwt'; import { PrismaModule } from './infrastructure/database/prisma.module'; import { RedisModule } from './infrastructure/redis/redis.module'; import { QueueModule } from './infrastructure/queue/queue.module'; +import { EventBusModule } from './common/event-bus/event-bus.module'; import { AiModule } from './modules/ai/ai.module'; import { StorageModule } from './infrastructure/storage/storage.module'; import { LoggerModule } from './infrastructure/logger/logger.module'; @@ -85,6 +86,7 @@ import appleConfig from './config/apple.config'; }), PrismaModule, RedisModule, + EventBusModule, QueueModule, AiModule, StorageModule, diff --git a/src/common/event-bus/event-bus.module.ts b/src/common/event-bus/event-bus.module.ts new file mode 100644 index 0000000..111527d --- /dev/null +++ b/src/common/event-bus/event-bus.module.ts @@ -0,0 +1,17 @@ +import { Global, Module } from '@nestjs/common'; +import { EventEmitterModule } from '@nestjs/event-emitter'; +import { BullModule } from '@nestjs/bullmq'; +import { EventBusService } from './event-bus.service'; + +export const QUEUE_DOMAIN_EVENTS = 'domain-events'; + +@Global() +@Module({ + imports: [ + EventEmitterModule.forRoot({ wildcard: true, delimiter: '.' }), + BullModule.registerQueue({ name: QUEUE_DOMAIN_EVENTS }), + ], + providers: [EventBusService], + exports: [EventBusService, EventEmitterModule], +}) +export class EventBusModule {} diff --git a/src/common/event-bus/event-bus.service.ts b/src/common/event-bus/event-bus.service.ts new file mode 100644 index 0000000..183667b --- /dev/null +++ b/src/common/event-bus/event-bus.service.ts @@ -0,0 +1,36 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { EventEmitter2 } from '@nestjs/event-emitter'; +import { QueueService } from '../../infrastructure/queue/queue.service'; +import { BaseDomainEvent } from '../events/base-domain.event'; +import { safeLog } from '../../infrastructure/logger/sensitive-logger'; + +export const DOMAIN_EVENT = 'domain.event'; + +@Injectable() +export class EventBusService { + private readonly logger = new Logger(EventBusService.name); + + constructor( + private readonly eventEmitter: EventEmitter2, + private readonly queue: QueueService, + ) {} + + /** Sync: process-in-memory, low latency, fire-and-forget */ + publish(event: BaseDomainEvent): void { + this.logger.log(`[sync] ${event.eventType} id=${event.eventId}`); + this.eventEmitter.emit(DOMAIN_EVENT, event); + this.eventEmitter.emit(event.eventType, event); + } + + /** Async: persistent via BullMQ, retry + DLQ */ + async publishAsync(event: BaseDomainEvent): Promise { + const job = await this.queue.add('domain-events', { + eventType: event.eventType, + eventId: event.eventId, + payload: safeLog(event), + occurredAt: event.occurredAt.toISOString(), + }); + this.logger.log(`[async] ${event.eventType} id=${event.eventId} job=${job.id}`); + return job.id || ''; + } +}