diff --git a/prisma/migrations/20260523192858_add_task_log/migration.sql b/prisma/migrations/20260523192858_add_task_log/migration.sql new file mode 100644 index 0000000..9a23ab0 --- /dev/null +++ b/prisma/migrations/20260523192858_add_task_log/migration.sql @@ -0,0 +1,9 @@ +CREATE TABLE TaskLog ( + id VARCHAR(191) NOT NULL, queueName VARCHAR(64) NOT NULL, jobId VARCHAR(100) NOT NULL, + status VARCHAR(16) NOT NULL DEFAULT 'enqueued', payload JSON, + error TEXT, attempts INT NOT NULL DEFAULT 0, + createdAt DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), + updatedAt DATETIME(3) NOT NULL, + INDEX TaskLog_queueName_idx(queueName), INDEX TaskLog_status_idx(status), + INDEX TaskLog_createdAt_idx(createdAt), PRIMARY KEY (id) +) DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index cc71e86..ea494a0 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -962,3 +962,19 @@ model ApiMetric { @@index([path]) @@index([createdAt]) } + +model TaskLog { + id String @id @default(cuid()) + queueName String @db.VarChar(64) + jobId String @db.VarChar(100) + status String @default("enqueued") @db.VarChar(16) + payload Json? + error String? @db.Text + attempts Int @default(0) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@index([queueName]) + @@index([status]) + @@index([createdAt]) +} diff --git a/src/infrastructure/queue/queue.module.ts b/src/infrastructure/queue/queue.module.ts index ed2ae2e..6fb3cc3 100644 --- a/src/infrastructure/queue/queue.module.ts +++ b/src/infrastructure/queue/queue.module.ts @@ -1,6 +1,7 @@ import { Global, Module } from '@nestjs/common'; import { BullModule } from '@nestjs/bullmq'; import { ConfigService } from '@nestjs/config'; +import { PrismaService } from '../database/prisma.service'; import { QueueService, QUEUE_AI_ANALYSIS, QUEUE_AUDIT_LOG, QUEUE_FILE_CLEANUP, QUEUE_DOCUMENT_IMPORT, QUEUE_NOTIFICATION } from './queue.service'; @Global() @@ -31,7 +32,7 @@ import { QueueService, QUEUE_AI_ANALYSIS, QUEUE_AUDIT_LOG, QUEUE_FILE_CLEANUP, Q { name: QUEUE_FILE_CLEANUP }, ), ], - providers: [QueueService], + providers: [QueueService, PrismaService], exports: [QueueService, BullModule], }) export class QueueModule {} diff --git a/src/infrastructure/queue/queue.service.ts b/src/infrastructure/queue/queue.service.ts index 7685ae4..e85bfef 100644 --- a/src/infrastructure/queue/queue.service.ts +++ b/src/infrastructure/queue/queue.service.ts @@ -1,5 +1,8 @@ import { Injectable, Logger, Optional } from '@nestjs/common'; import { InjectQueue } from '@nestjs/bullmq'; +import { EventBusService } from '../../common/event-bus/event-bus.service'; +import { BaseDomainEvent } from '../../common/events/base-domain.event'; +import { PrismaService } from '../database/prisma.service'; import { Queue } from 'bullmq'; export const QUEUE_AI_ANALYSIS = 'ai-analysis'; @@ -13,6 +16,8 @@ export class QueueService { private readonly logger = new Logger(QueueService.name); constructor( + private readonly eventBus: EventBusService, + private readonly prisma: PrismaService, @InjectQueue(QUEUE_AI_ANALYSIS) private readonly aiQueue: Queue, @InjectQueue(QUEUE_DOCUMENT_IMPORT) private readonly importQueue: Queue, @InjectQueue(QUEUE_NOTIFICATION) private readonly notifyQueue: Queue, @@ -20,7 +25,12 @@ export class QueueService { async add(queueName: string, data: any, opts?: { jobId?: string; attempts?: number; backoff?: number }) { const queue = this.getQueue(queueName); - const job = await queue.add(queueName, data, opts || {}); + const job = await queue.add(queueName, data, { attempts: 3, backoff: { type: 'exponential', delay: 1000 }, ...opts }); + + // Log to DB + await this.prisma.taskLog.create({ data: { queueName, jobId: job.id || '', status: 'enqueued', payload: JSON.parse(JSON.stringify(data)) } }).catch(() => {}); + + this.eventBus.publish(new (class extends BaseDomainEvent { eventType = 'task.enqueued'; queueName: string; jobId: string; constructor(q: string, j: string) { super(); this.queueName = q; this.jobId = j; } })(queueName, job.id || '')); this.logger.log(`Job ${job.id} added to ${queueName}`); return job; } diff --git a/src/modules/admin-events/admin-events.controller.ts b/src/modules/admin-events/admin-events.controller.ts index 43abb0f..5054fbb 100644 --- a/src/modules/admin-events/admin-events.controller.ts +++ b/src/modules/admin-events/admin-events.controller.ts @@ -2,6 +2,7 @@ import { Controller, Get, Post, Param, UseGuards } from '@nestjs/common'; import { ApiTags, ApiBearerAuth, ApiOperation } from '@nestjs/swagger'; import { InjectQueue, InjectFlowProducer } from '@nestjs/bullmq'; import { Queue, Job } from 'bullmq'; +import { PrismaService } from '../../infrastructure/database/prisma.service'; import { WorkerHeartbeat } from '../../infrastructure/queue/worker-heartbeat'; import { AdminAuthGuard } from '../../common/guards/admin-auth.guard'; import { AdminRolesGuard } from '../../common/guards/admin-roles.guard'; @@ -16,6 +17,7 @@ const QUEUES = ['ai-analysis', 'document-import', 'notification', 'domain-events @ApiBearerAuth() export class AdminEventsController { constructor(private readonly heartbeat: WorkerHeartbeat, + private readonly prisma: PrismaService, @InjectQueue('ai-analysis') private aiQ: Queue, @InjectQueue('document-import') private importQ: Queue, @@ -69,6 +71,8 @@ export class AdminEventsController { const job = await q.getJob(jobId); if (!job) return { error: 'Job not found' }; await job.retry(); + // Audit + await this.prisma.taskLog.updateMany({ where: { jobId }, data: { status: 'retried', updatedAt: new Date() } }).catch(() => {}); return { success: true }; } diff --git a/src/modules/admin-events/admin-events.module.ts b/src/modules/admin-events/admin-events.module.ts index 93ac334..927bf4b 100644 --- a/src/modules/admin-events/admin-events.module.ts +++ b/src/modules/admin-events/admin-events.module.ts @@ -1,6 +1,7 @@ import { Module } from '@nestjs/common'; import { BullModule } from '@nestjs/bullmq'; import { AdminEventsController } from './admin-events.controller'; +import { PrismaService } from '../../infrastructure/database/prisma.service'; import { WorkerHeartbeat } from '../../infrastructure/queue/worker-heartbeat'; import { RedisService } from '../../infrastructure/redis/redis.service'; import { AdminAuthGuard } from '../../common/guards/admin-auth.guard'; @@ -14,6 +15,6 @@ import { AdminRolesGuard } from '../../common/guards/admin-roles.guard'; ), ], controllers: [AdminEventsController], - providers: [WorkerHeartbeat, RedisService, AdminAuthGuard, AdminRolesGuard], + providers: [PrismaService, WorkerHeartbeat, RedisService, AdminAuthGuard, AdminRolesGuard], }) export class AdminEventsModule {}