fix(API-AI-080): enqueue import job + pass sourceId in KnowledgeSource pipeline
All checks were successful
Deploy API Server / build-and-deploy (push) Successful in 43s

KnowledgeSourceService.addSource() was creating Source + DocumentImport
records but never enqueuing the job for processing. Also the worker
wasn't linking created KnowledgeItems back to their source.

Changes:
- Inject QueueService + RedisService into KnowledgeSourceService
- Enqueue document-import job after creating Source + DocumentImport
- Set Redis status keys (matching DocumentImportService pattern)
- Worker: resolve sourceId from job data or DocumentImport record
- Worker: pass sourceRef when creating KnowledgeItems

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
wangdl 2026-06-18 21:51:49 +08:00
parent 137fe36a72
commit 76bdba330d
2 changed files with 29 additions and 3 deletions

View File

@ -1,12 +1,16 @@
import { Injectable, NotFoundException, BadRequestException } from '@nestjs/common'; import { Injectable, NotFoundException, BadRequestException } from '@nestjs/common';
import { KnowledgeSourceRepository } from './knowledge-source.repository'; import { KnowledgeSourceRepository } from './knowledge-source.repository';
import { DocumentImportRepository } from '../document-import/document-import.repository'; import { DocumentImportRepository } from '../document-import/document-import.repository';
import { QueueService } from '../../infrastructure/queue/queue.service';
import { RedisService } from '../../infrastructure/redis/redis.service';
@Injectable() @Injectable()
export class KnowledgeSourceService { export class KnowledgeSourceService {
constructor( constructor(
private readonly repository: KnowledgeSourceRepository, private readonly repository: KnowledgeSourceRepository,
private readonly importRepo: DocumentImportRepository, private readonly importRepo: DocumentImportRepository,
private readonly queue: QueueService,
private readonly redis: RedisService,
) {} ) {}
async addSource(userId: string, knowledgeBaseId: string, dto: { async addSource(userId: string, knowledgeBaseId: string, dto: {
@ -20,8 +24,8 @@ export class KnowledgeSourceService {
}) { }) {
const source = await this.repository.create(userId, knowledgeBaseId, dto); const source = await this.repository.create(userId, knowledgeBaseId, dto);
// 自动创建 import 任务 // 自动创建 import 任务并入队
await this.importRepo.create({ const importJob = await this.importRepo.create({
userId, userId,
knowledgeBaseId, knowledgeBaseId,
sourceId: source.id, sourceId: source.id,
@ -31,6 +35,20 @@ export class KnowledgeSourceService {
status: 'QUEUED', status: 'QUEUED',
}); });
// 设置 Redis 状态
await this.redis.set(`job:document-import:${importJob.id}:status`, 'pending', 86400);
await this.redis.set(`job:document-import:${importJob.id}:progress`, '0', 86400);
await this.redis.set(`job:document-import:${importJob.id}:message`, '任务已加入队列', 86400);
// 入队处理
await this.queue.add('document-import', {
importId: importJob.id,
userId,
knowledgeBaseId,
sourceId: source.id,
fileName: dto.originalFilename ?? dto.title,
});
return source; return source;
} }

View File

@ -24,11 +24,18 @@ export class DocumentImportWorker extends WorkerHost {
importId: string; importId: string;
userId: string; userId: string;
knowledgeBaseId?: string; knowledgeBaseId?: string;
sourceId?: string;
rawText?: string; rawText?: string;
fileName?: string; fileName?: string;
}>) { }>) {
const { importId, userId, knowledgeBaseId, rawText, fileName } = job.data; const { importId, userId, knowledgeBaseId, rawText, fileName } = job.data;
this.logger.log(`Processing document import job ${job.id}, importId=${importId}`); // Resolve sourceId: from job data first, then fallback to DocumentImport record
let sourceId = job.data.sourceId;
if (!sourceId) {
const importRecord = await this.repository.findById(importId);
sourceId = importRecord?.sourceId ?? undefined;
}
this.logger.log(`Processing document import job ${job.id}, importId=${importId}, sourceId=${sourceId ?? 'none'}`);
try { try {
if (!rawText) { if (!rawText) {
@ -62,6 +69,7 @@ export class DocumentImportWorker extends WorkerHost {
content: kp.content, content: kp.content,
itemType: 'lesson', itemType: 'lesson',
orderIndex: kp.suggestedOrder ?? i + 1, orderIndex: kp.suggestedOrder ?? i + 1,
sourceRef: sourceId,
}); });
} }
} }