api-server/src/infrastructure/queue/queue.service.ts

52 lines
2.2 KiB
TypeScript
Raw Normal View History

import { Injectable, Logger, Optional } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { EventBusService } from '../../common/event-bus/event-bus.service';
import { BaseDomainEvent } from '../../common/events/base-domain.event';
import { PrismaService } from '../database/prisma.service';
import { Queue } from 'bullmq';
export const QUEUE_AI_ANALYSIS = 'ai-analysis';
export const QUEUE_DOCUMENT_IMPORT = 'document-import';
export const QUEUE_NOTIFICATION = 'notification';
export const QUEUE_AUDIT_LOG = 'audit-logs';
export const QUEUE_FILE_CLEANUP = 'file-cleanup';
@Injectable()
export class QueueService {
private readonly logger = new Logger(QueueService.name);
constructor(
private readonly eventBus: EventBusService,
private readonly prisma: PrismaService,
@InjectQueue(QUEUE_AI_ANALYSIS) private readonly aiQueue: Queue,
@InjectQueue(QUEUE_DOCUMENT_IMPORT) private readonly importQueue: Queue,
@InjectQueue(QUEUE_NOTIFICATION) private readonly notifyQueue: Queue,
) {}
async add(queueName: string, data: any, opts?: { jobId?: string; attempts?: number; backoff?: number }) {
const queue = this.getQueue(queueName);
const job = await queue.add(queueName, data, { attempts: 3, backoff: { type: 'exponential', delay: 1000 }, ...opts });
// Log to DB
await this.prisma.taskLog.create({ data: { queueName, jobId: job.id || '', status: 'enqueued', payload: JSON.parse(JSON.stringify(data)) } }).catch(() => {});
this.eventBus.publish(new (class extends BaseDomainEvent { eventType = 'task.enqueued'; queueName: string; jobId: string; constructor(q: string, j: string) { super(); this.queueName = q; this.jobId = j; } })(queueName, job.id || ''));
this.logger.log(`Job ${job.id} added to ${queueName}`);
return job;
}
async getJob(queueName: string, jobId: string) {
const queue = this.getQueue(queueName);
return queue.getJob(jobId);
}
private getQueue(name: string): Queue {
switch (name) {
case QUEUE_AI_ANALYSIS: return this.aiQueue;
case QUEUE_DOCUMENT_IMPORT: return this.importQueue;
case QUEUE_NOTIFICATION: return this.notifyQueue;
default: throw new Error(`Unknown queue: ${name}`);
}
}
}