From 76bdba330d488a1a4b7ad66714fa6b931e3bc2d3 Mon Sep 17 00:00:00 2001 From: wangdl Date: Thu, 18 Jun 2026 21:51:49 +0800 Subject: [PATCH] fix(API-AI-080): enqueue import job + pass sourceId in KnowledgeSource pipeline KnowledgeSourceService.addSource() was creating Source + DocumentImport records but never enqueuing the job for processing. Also the worker wasn't linking created KnowledgeItems back to their source. Changes: - Inject QueueService + RedisService into KnowledgeSourceService - Enqueue document-import job after creating Source + DocumentImport - Set Redis status keys (matching DocumentImportService pattern) - Worker: resolve sourceId from job data or DocumentImport record - Worker: pass sourceRef when creating KnowledgeItems Co-Authored-By: Claude Opus 4.7 --- .../knowledge-source.service.ts | 22 +++++++++++++++++-- src/workers/document-import.worker.ts | 10 ++++++++- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/src/modules/knowledge-source/knowledge-source.service.ts b/src/modules/knowledge-source/knowledge-source.service.ts index 378733c..dd8a431 100644 --- a/src/modules/knowledge-source/knowledge-source.service.ts +++ b/src/modules/knowledge-source/knowledge-source.service.ts @@ -1,12 +1,16 @@ import { Injectable, NotFoundException, BadRequestException } from '@nestjs/common'; import { KnowledgeSourceRepository } from './knowledge-source.repository'; import { DocumentImportRepository } from '../document-import/document-import.repository'; +import { QueueService } from '../../infrastructure/queue/queue.service'; +import { RedisService } from '../../infrastructure/redis/redis.service'; @Injectable() export class KnowledgeSourceService { constructor( private readonly repository: KnowledgeSourceRepository, private readonly importRepo: DocumentImportRepository, + private readonly queue: QueueService, + private readonly redis: RedisService, ) {} async addSource(userId: string, knowledgeBaseId: string, dto: { @@ -20,8 +24,8 @@ export class KnowledgeSourceService { }) { const source = await this.repository.create(userId, knowledgeBaseId, dto); - // 自动创建 import 任务 - await this.importRepo.create({ + // 自动创建 import 任务并入队 + const importJob = await this.importRepo.create({ userId, knowledgeBaseId, sourceId: source.id, @@ -31,6 +35,20 @@ export class KnowledgeSourceService { status: 'QUEUED', }); + // 设置 Redis 状态 + await this.redis.set(`job:document-import:${importJob.id}:status`, 'pending', 86400); + await this.redis.set(`job:document-import:${importJob.id}:progress`, '0', 86400); + await this.redis.set(`job:document-import:${importJob.id}:message`, '任务已加入队列', 86400); + + // 入队处理 + await this.queue.add('document-import', { + importId: importJob.id, + userId, + knowledgeBaseId, + sourceId: source.id, + fileName: dto.originalFilename ?? dto.title, + }); + return source; } diff --git a/src/workers/document-import.worker.ts b/src/workers/document-import.worker.ts index 8da2dd7..941b077 100644 --- a/src/workers/document-import.worker.ts +++ b/src/workers/document-import.worker.ts @@ -24,11 +24,18 @@ export class DocumentImportWorker extends WorkerHost { importId: string; userId: string; knowledgeBaseId?: string; + sourceId?: string; rawText?: string; fileName?: string; }>) { const { importId, userId, knowledgeBaseId, rawText, fileName } = job.data; - this.logger.log(`Processing document import job ${job.id}, importId=${importId}`); + // Resolve sourceId: from job data first, then fallback to DocumentImport record + let sourceId = job.data.sourceId; + if (!sourceId) { + const importRecord = await this.repository.findById(importId); + sourceId = importRecord?.sourceId ?? undefined; + } + this.logger.log(`Processing document import job ${job.id}, importId=${importId}, sourceId=${sourceId ?? 'none'}`); try { if (!rawText) { @@ -62,6 +69,7 @@ export class DocumentImportWorker extends WorkerHost { content: kp.content, itemType: 'lesson', orderIndex: kp.suggestedOrder ?? i + 1, + sourceRef: sourceId, }); } }