diff --git a/src/common/event-bus/event-bus.service.ts b/src/common/event-bus/event-bus.service.ts index 183667b..c336809 100644 --- a/src/common/event-bus/event-bus.service.ts +++ b/src/common/event-bus/event-bus.service.ts @@ -1,4 +1,4 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable, Logger, Optional, Inject, forwardRef } from '@nestjs/common'; import { EventEmitter2 } from '@nestjs/event-emitter'; import { QueueService } from '../../infrastructure/queue/queue.service'; import { BaseDomainEvent } from '../events/base-domain.event'; @@ -12,7 +12,7 @@ export class EventBusService { constructor( private readonly eventEmitter: EventEmitter2, - private readonly queue: QueueService, + @Optional() @Inject(forwardRef(() => require('../../infrastructure/queue/queue.service').QueueService)) private readonly queue?: any, ) {} /** Sync: process-in-memory, low latency, fire-and-forget */ @@ -24,6 +24,7 @@ export class EventBusService { /** Async: persistent via BullMQ, retry + DLQ */ async publishAsync(event: BaseDomainEvent): Promise { + if (!this.queue) return; const job = await this.queue.add('domain-events', { eventType: event.eventType, eventId: event.eventId, diff --git a/src/infrastructure/queue/queue.service.ts b/src/infrastructure/queue/queue.service.ts index e85bfef..8b21432 100644 --- a/src/infrastructure/queue/queue.service.ts +++ b/src/infrastructure/queue/queue.service.ts @@ -1,4 +1,4 @@ -import { Injectable, Logger, Optional } from '@nestjs/common'; +import { Injectable, Logger, Optional, Inject, forwardRef } from '@nestjs/common'; import { InjectQueue } from '@nestjs/bullmq'; import { EventBusService } from '../../common/event-bus/event-bus.service'; import { BaseDomainEvent } from '../../common/events/base-domain.event'; @@ -16,7 +16,7 @@ export class QueueService { private readonly logger = new Logger(QueueService.name); constructor( - private readonly eventBus: EventBusService, + @Optional() private readonly eventBus?: any, private readonly prisma: PrismaService, @InjectQueue(QUEUE_AI_ANALYSIS) private readonly aiQueue: Queue, @InjectQueue(QUEUE_DOCUMENT_IMPORT) private readonly importQueue: Queue, @@ -30,7 +30,7 @@ export class QueueService { // Log to DB await this.prisma.taskLog.create({ data: { queueName, jobId: job.id || '', status: 'enqueued', payload: JSON.parse(JSON.stringify(data)) } }).catch(() => {}); - this.eventBus.publish(new (class extends BaseDomainEvent { eventType = 'task.enqueued'; queueName: string; jobId: string; constructor(q: string, j: string) { super(); this.queueName = q; this.jobId = j; } })(queueName, job.id || '')); + this.eventBus?.publish(new (class extends BaseDomainEvent { eventType = 'task.enqueued'; queueName: string; jobId: string; constructor(q: string, j: string) { super(); this.queueName = q; this.jobId = j; } })(queueName, job.id || '')); this.logger.log(`Job ${job.id} added to ${queueName}`); return job; }