52 lines
2.2 KiB
TypeScript
52 lines
2.2 KiB
TypeScript
import { Injectable, Logger, Optional, Inject, forwardRef } from '@nestjs/common';
|
|
import { InjectQueue } from '@nestjs/bullmq';
|
|
import { EventBusService } from '../../common/event-bus/event-bus.service';
|
|
import { BaseDomainEvent } from '../../common/events/base-domain.event';
|
|
import { PrismaService } from '../database/prisma.service';
|
|
import { Queue } from 'bullmq';
|
|
|
|
export const QUEUE_AI_ANALYSIS = 'ai-analysis';
|
|
export const QUEUE_DOCUMENT_IMPORT = 'document-import';
|
|
export const QUEUE_NOTIFICATION = 'notification';
|
|
export const QUEUE_AUDIT_LOG = 'audit-logs';
|
|
export const QUEUE_FILE_CLEANUP = 'file-cleanup';
|
|
|
|
@Injectable()
|
|
export class QueueService {
|
|
private readonly logger = new Logger(QueueService.name);
|
|
|
|
constructor(
|
|
private readonly prisma: PrismaService,
|
|
@Optional() private readonly eventBus?: any,
|
|
@InjectQueue(QUEUE_AI_ANALYSIS) private readonly aiQueue: Queue,
|
|
@InjectQueue(QUEUE_DOCUMENT_IMPORT) private readonly importQueue: Queue,
|
|
@InjectQueue(QUEUE_NOTIFICATION) private readonly notifyQueue: Queue,
|
|
) {}
|
|
|
|
async add(queueName: string, data: any, opts?: { jobId?: string; attempts?: number; backoff?: number }) {
|
|
const queue = this.getQueue(queueName);
|
|
const job = await queue.add(queueName, data, { attempts: 3, backoff: { type: 'exponential', delay: 1000 }, ...opts });
|
|
|
|
// Log to DB
|
|
await this.prisma.taskLog.create({ data: { queueName, jobId: job.id || '', status: 'enqueued', payload: JSON.parse(JSON.stringify(data)) } }).catch(() => {});
|
|
|
|
this.eventBus?.publish(new (class extends BaseDomainEvent { eventType = 'task.enqueued'; queueName: string; jobId: string; constructor(q: string, j: string) { super(); this.queueName = q; this.jobId = j; } })(queueName, job.id || ''));
|
|
this.logger.log(`Job ${job.id} added to ${queueName}`);
|
|
return job;
|
|
}
|
|
|
|
async getJob(queueName: string, jobId: string) {
|
|
const queue = this.getQueue(queueName);
|
|
return queue.getJob(jobId);
|
|
}
|
|
|
|
private getQueue(name: string): Queue {
|
|
switch (name) {
|
|
case QUEUE_AI_ANALYSIS: return this.aiQueue;
|
|
case QUEUE_DOCUMENT_IMPORT: return this.importQueue;
|
|
case QUEUE_NOTIFICATION: return this.notifyQueue;
|
|
default: throw new Error(`Unknown queue: ${name}`);
|
|
}
|
|
}
|
|
}
|