import { Controller, Get, Post, Param, UseGuards } from '@nestjs/common'; import { ApiTags, ApiBearerAuth, ApiOperation } from '@nestjs/swagger'; import { InjectQueue, InjectFlowProducer } from '@nestjs/bullmq'; import { Queue, FlowProducer, Job } from 'bullmq'; import { AdminAuthGuard } from '../../common/guards/admin-auth.guard'; import { AdminRolesGuard } from '../../common/guards/admin-roles.guard'; import { AdminRoles } from '../../common/decorators/admin-roles.decorator'; import type { AdminRole } from '../../common/types/admin-role.enum'; const QUEUES = ['ai-analysis', 'document-import', 'notification', 'domain-events'] as const; @ApiTags('admin-events') @Controller('admin-api/events') @UseGuards(AdminAuthGuard, AdminRolesGuard) @ApiBearerAuth() export class AdminEventsController { constructor( @InjectQueue('ai-analysis') private aiQ: Queue, @InjectQueue('document-import') private importQ: Queue, @InjectQueue('notification') private notifyQ: Queue, @InjectQueue('domain-events') private eventQ: Queue, ) {} @Get() @AdminRoles('SUPER_ADMIN' as AdminRole) @ApiOperation({ summary: '队列概览' }) async overview() { const queues = await Promise.all( QUEUES.map(async (name) => { const q = this.getQueue(name); const [waiting, active, completed, failed, delayed] = await Promise.all([ q.getWaitingCount(), q.getActiveCount(), q.getCompletedCount(), q.getFailedCount(), q.getDelayedCount(), ]); return { name, waiting, active, completed, failed, delayed, total: waiting + active + completed + failed + delayed }; }), ); return { queues }; } @Get(':queue/failed') @AdminRoles('SUPER_ADMIN' as AdminRole) @ApiOperation({ summary: '失败任务列表' }) async failed(@Param('queue') queueName: string) { const q = this.getQueue(queueName); const jobs = await q.getFailed(0, 20); return { jobs: jobs.map(this.formatJob) }; } @Get(':queue/jobs/:jobId') @AdminRoles('SUPER_ADMIN' as AdminRole) @ApiOperation({ summary: '任务详情' }) async jobDetail(@Param('queue') queueName: string, @Param('jobId') jobId: string) { const q = this.getQueue(queueName); const job = await q.getJob(jobId); if (!job) return { error: 'Job not found' }; const state = await job.getState(); return { ...this.formatJob(job), state, data: job.data, stacktrace: job.stacktrace }; } @Post(':queue/jobs/:jobId/retry') @AdminRoles('SUPER_ADMIN' as AdminRole) @ApiOperation({ summary: '重试失败任务' }) async retry(@Param('queue') queueName: string, @Param('jobId') jobId: string) { const q = this.getQueue(queueName); const job = await q.getJob(jobId); if (!job) return { error: 'Job not found' }; await job.retry(); return { success: true }; } private getQueue(name: string): Queue { const map: Record = { 'ai-analysis': this.aiQ, 'document-import': this.importQ, 'notification': this.notifyQ, 'domain-events': this.eventQ, }; return map[name] || this.eventQ; } private formatJob(job: Job) { return { id: job.id, name: job.name, timestamp: job.timestamp, attemptsMade: job.attemptsMade, failedReason: job.failedReason?.slice(0, 200), finishedOn: job.finishedOn, processedOn: job.processedOn, }; } }