2026-05-22 22:27:07 +08:00
|
|
|
import { Controller, Get, Post, Param, UseGuards } from '@nestjs/common';
|
|
|
|
|
import { ApiTags, ApiBearerAuth, ApiOperation } from '@nestjs/swagger';
|
|
|
|
|
import { InjectQueue, InjectFlowProducer } from '@nestjs/bullmq';
|
2026-05-23 19:27:46 +08:00
|
|
|
import { Queue, Job } from 'bullmq';
|
|
|
|
|
import { WorkerHeartbeat } from '../../infrastructure/queue/worker-heartbeat';
|
2026-05-22 22:27:07 +08:00
|
|
|
import { AdminAuthGuard } from '../../common/guards/admin-auth.guard';
|
|
|
|
|
import { AdminRolesGuard } from '../../common/guards/admin-roles.guard';
|
|
|
|
|
import { AdminRoles } from '../../common/decorators/admin-roles.decorator';
|
|
|
|
|
import type { AdminRole } from '../../common/types/admin-role.enum';
|
|
|
|
|
|
|
|
|
|
const QUEUES = ['ai-analysis', 'document-import', 'notification', 'domain-events'] as const;
|
|
|
|
|
|
|
|
|
|
@ApiTags('admin-events')
|
|
|
|
|
@Controller('admin-api/events')
|
|
|
|
|
@UseGuards(AdminAuthGuard, AdminRolesGuard)
|
|
|
|
|
@ApiBearerAuth()
|
|
|
|
|
export class AdminEventsController {
|
2026-05-23 19:27:46 +08:00
|
|
|
constructor(private readonly heartbeat: WorkerHeartbeat,
|
|
|
|
|
|
2026-05-22 22:27:07 +08:00
|
|
|
@InjectQueue('ai-analysis') private aiQ: Queue,
|
|
|
|
|
@InjectQueue('document-import') private importQ: Queue,
|
|
|
|
|
@InjectQueue('notification') private notifyQ: Queue,
|
|
|
|
|
@InjectQueue('domain-events') private eventQ: Queue,
|
|
|
|
|
) {}
|
|
|
|
|
|
|
|
|
|
@Get()
|
|
|
|
|
@AdminRoles('SUPER_ADMIN' as AdminRole)
|
|
|
|
|
@ApiOperation({ summary: '队列概览' })
|
|
|
|
|
async overview() {
|
|
|
|
|
const queues = await Promise.all(
|
|
|
|
|
QUEUES.map(async (name) => {
|
|
|
|
|
const q = this.getQueue(name);
|
|
|
|
|
const [waiting, active, completed, failed, delayed] = await Promise.all([
|
|
|
|
|
q.getWaitingCount(), q.getActiveCount(), q.getCompletedCount(),
|
|
|
|
|
q.getFailedCount(), q.getDelayedCount(),
|
|
|
|
|
]);
|
|
|
|
|
return { name, waiting, active, completed, failed, delayed, total: waiting + active + completed + failed + delayed };
|
|
|
|
|
}),
|
|
|
|
|
);
|
2026-05-23 19:27:46 +08:00
|
|
|
const workers = await this.heartbeat.getActiveWorkers();
|
|
|
|
|
return { queues, workers };
|
2026-05-22 22:27:07 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Get(':queue/failed')
|
|
|
|
|
@AdminRoles('SUPER_ADMIN' as AdminRole)
|
|
|
|
|
@ApiOperation({ summary: '失败任务列表' })
|
|
|
|
|
async failed(@Param('queue') queueName: string) {
|
|
|
|
|
const q = this.getQueue(queueName);
|
|
|
|
|
const jobs = await q.getFailed(0, 20);
|
|
|
|
|
return { jobs: jobs.map(this.formatJob) };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Get(':queue/jobs/:jobId')
|
|
|
|
|
@AdminRoles('SUPER_ADMIN' as AdminRole)
|
|
|
|
|
@ApiOperation({ summary: '任务详情' })
|
|
|
|
|
async jobDetail(@Param('queue') queueName: string, @Param('jobId') jobId: string) {
|
|
|
|
|
const q = this.getQueue(queueName);
|
|
|
|
|
const job = await q.getJob(jobId);
|
|
|
|
|
if (!job) return { error: 'Job not found' };
|
|
|
|
|
const state = await job.getState();
|
|
|
|
|
return { ...this.formatJob(job), state, data: job.data, stacktrace: job.stacktrace };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Post(':queue/jobs/:jobId/retry')
|
|
|
|
|
@AdminRoles('SUPER_ADMIN' as AdminRole)
|
|
|
|
|
@ApiOperation({ summary: '重试失败任务' })
|
|
|
|
|
async retry(@Param('queue') queueName: string, @Param('jobId') jobId: string) {
|
|
|
|
|
const q = this.getQueue(queueName);
|
|
|
|
|
const job = await q.getJob(jobId);
|
|
|
|
|
if (!job) return { error: 'Job not found' };
|
|
|
|
|
await job.retry();
|
|
|
|
|
return { success: true };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private getQueue(name: string): Queue {
|
|
|
|
|
const map: Record<string, Queue> = {
|
|
|
|
|
'ai-analysis': this.aiQ, 'document-import': this.importQ,
|
|
|
|
|
'notification': this.notifyQ, 'domain-events': this.eventQ,
|
|
|
|
|
};
|
|
|
|
|
return map[name] || this.eventQ;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private formatJob(job: Job) {
|
|
|
|
|
return {
|
|
|
|
|
id: job.id, name: job.name, timestamp: job.timestamp,
|
|
|
|
|
attemptsMade: job.attemptsMade, failedReason: job.failedReason?.slice(0, 200),
|
|
|
|
|
finishedOn: job.finishedOn, processedOn: job.processedOn,
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
}
|