api-server/src/common/event-bus/event-bus.service.ts

38 lines
1.4 KiB
TypeScript
Raw Normal View History

import { Injectable, Logger, Optional, Inject, forwardRef } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { QueueService } from '../../infrastructure/queue/queue.service';
import { BaseDomainEvent } from '../events/base-domain.event';
import { safeLog } from '../../infrastructure/logger/sensitive-logger';
export const DOMAIN_EVENT = 'domain.event';
@Injectable()
export class EventBusService {
private readonly logger = new Logger(EventBusService.name);
constructor(
private readonly eventEmitter: EventEmitter2,
@Optional() @Inject(forwardRef(() => require('../../infrastructure/queue/queue.service').QueueService)) private readonly queue?: any,
) {}
/** Sync: process-in-memory, low latency, fire-and-forget */
publish(event: BaseDomainEvent): void {
this.logger.log(`[sync] ${event.eventType} id=${event.eventId}`);
this.eventEmitter.emit(DOMAIN_EVENT, event);
this.eventEmitter.emit(event.eventType, event);
}
/** Async: persistent via BullMQ, retry + DLQ */
async publishAsync(event: BaseDomainEvent): Promise<string> {
if (!this.queue) return;
const job = await this.queue.add('domain-events', {
eventType: event.eventType,
eventId: event.eventId,
payload: safeLog(event),
occurredAt: event.occurredAt.toISOString(),
});
this.logger.log(`[async] ${event.eventType} id=${event.eventId} job=${job.id}`);
return job.id || '';
}
}