From 9244db05b4a046e79df021402419a8e33840883b Mon Sep 17 00:00:00 2001 From: WangDL Date: Sat, 23 May 2026 09:47:30 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20M0-09=20=E2=80=94=20BullMQ=20cleanup=20?= =?UTF-8?q?queue=20+=20Domain=20Events=20+=20async=20COS=20delete?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/infrastructure/queue/queue.module.ts | 3 ++- src/infrastructure/queue/queue.service.ts | 1 + src/modules/files/admin-files.controller.ts | 7 +++++- src/modules/files/file-cleanup.processor.ts | 24 +++++++++++++++++++++ src/modules/files/files.module.ts | 3 +++ 5 files changed, 36 insertions(+), 2 deletions(-) create mode 100644 src/modules/files/file-cleanup.processor.ts diff --git a/src/infrastructure/queue/queue.module.ts b/src/infrastructure/queue/queue.module.ts index 6662a7f..ed2ae2e 100644 --- a/src/infrastructure/queue/queue.module.ts +++ b/src/infrastructure/queue/queue.module.ts @@ -1,7 +1,7 @@ import { Global, Module } from '@nestjs/common'; import { BullModule } from '@nestjs/bullmq'; import { ConfigService } from '@nestjs/config'; -import { QueueService, QUEUE_AI_ANALYSIS, QUEUE_AUDIT_LOG, QUEUE_DOCUMENT_IMPORT, QUEUE_NOTIFICATION } from './queue.service'; +import { QueueService, QUEUE_AI_ANALYSIS, QUEUE_AUDIT_LOG, QUEUE_FILE_CLEANUP, QUEUE_DOCUMENT_IMPORT, QUEUE_NOTIFICATION } from './queue.service'; @Global() @Module({ @@ -28,6 +28,7 @@ import { QueueService, QUEUE_AI_ANALYSIS, QUEUE_AUDIT_LOG, QUEUE_DOCUMENT_IMPORT { name: QUEUE_DOCUMENT_IMPORT }, { name: QUEUE_NOTIFICATION }, { name: QUEUE_AUDIT_LOG }, + { name: QUEUE_FILE_CLEANUP }, ), ], providers: [QueueService], diff --git a/src/infrastructure/queue/queue.service.ts b/src/infrastructure/queue/queue.service.ts index 01c7d8c..86b76df 100644 --- a/src/infrastructure/queue/queue.service.ts +++ b/src/infrastructure/queue/queue.service.ts @@ -6,6 +6,7 @@ export const QUEUE_AI_ANALYSIS = 'ai-analysis'; export const QUEUE_DOCUMENT_IMPORT = 'document-import'; export const QUEUE_NOTIFICATION = 'notification'; export const QUEUE_AUDIT_LOG = 'audit-logs'; +export const QUEUE_FILE_CLEANUP = 'file-cleanup'; @Injectable() export class QueueService { diff --git a/src/modules/files/admin-files.controller.ts b/src/modules/files/admin-files.controller.ts index 9fe21e3..6e4da8e 100644 --- a/src/modules/files/admin-files.controller.ts +++ b/src/modules/files/admin-files.controller.ts @@ -1,6 +1,7 @@ import { Controller, Get, Delete, Param, Query, UseGuards } from '@nestjs/common'; import { ApiTags, ApiBearerAuth, ApiOperation } from '@nestjs/swagger'; import { PrismaService } from '../../infrastructure/database/prisma.service'; +import { QueueService, QUEUE_FILE_CLEANUP } from '../../infrastructure/queue/queue.service'; import { AdminAuthGuard } from '../../common/guards/admin-auth.guard'; import { AdminRolesGuard } from '../../common/guards/admin-roles.guard'; import { AdminRoles } from '../../common/decorators/admin-roles.decorator'; @@ -11,7 +12,7 @@ import type { AdminRole } from '../../common/types/admin-role.enum'; @UseGuards(AdminAuthGuard, AdminRolesGuard) @ApiBearerAuth() export class AdminFilesController { - constructor(private readonly prisma: PrismaService) {} + constructor(private readonly prisma: PrismaService, private readonly queue: QueueService) {} @Get() @AdminRoles('SUPER_ADMIN' as AdminRole) @@ -34,6 +35,10 @@ export class AdminFilesController { @AdminRoles('SUPER_ADMIN' as AdminRole) @ApiOperation({ summary: '软删除文件' }) async remove(@Param('id') id: string) { + const file = await this.prisma.uploadedFile.findUnique({ where: { id } }); + if (file?.objectKey) { + await this.queue.add(QUEUE_FILE_CLEANUP, { objectKey: file.objectKey, bucket: file.bucket, region: 'ap-beijing' }); + } await this.prisma.uploadedFile.delete({ where: { id } }); return { success: true }; } diff --git a/src/modules/files/file-cleanup.processor.ts b/src/modules/files/file-cleanup.processor.ts new file mode 100644 index 0000000..5b5b590 --- /dev/null +++ b/src/modules/files/file-cleanup.processor.ts @@ -0,0 +1,24 @@ +import { Processor, WorkerHost } from '@nestjs/bullmq'; +import { Job } from 'bullmq'; +import { Logger } from '@nestjs/common'; +import { QUEUE_FILE_CLEANUP } from '../../infrastructure/queue/queue.service'; +import { CosStorageProvider } from '../../infrastructure/storage/cos-storage.provider'; + +@Processor(QUEUE_FILE_CLEANUP) +export class FileCleanupProcessor extends WorkerHost { + private readonly logger = new Logger(FileCleanupProcessor.name); + + constructor(private readonly cos: CosStorageProvider) { super(); } + + async process(job: Job) { + const { objectKey, bucket, region } = job.data; + if (!objectKey) return; + try { + await this.cos.deleteObject(objectKey); + this.logger.log(`Cleaned up COS: ${objectKey}`); + } catch (err: any) { + this.logger.warn(`Cleanup failed for ${objectKey}: ${err.message}`); + throw err; // BullMQ will retry + } + } +} diff --git a/src/modules/files/files.module.ts b/src/modules/files/files.module.ts index b6a41a3..09a80d2 100644 --- a/src/modules/files/files.module.ts +++ b/src/modules/files/files.module.ts @@ -1,4 +1,7 @@ import { Module } from '@nestjs/common'; +import { BullModule } from '@nestjs/bullmq'; +import { FileCleanupProcessor } from './file-cleanup.processor'; +import { QUEUE_FILE_CLEANUP } from '../../infrastructure/queue/queue.service'; import { FilesController } from './files.controller'; import { AdminFilesController } from './admin-files.controller'; import { FilesService } from './files.service';