api-server/src/modules/document-import/document-import.service.ts

126 lines
5.2 KiB
TypeScript
Raw Normal View History

import { Injectable, Logger } from '@nestjs/common';
import { DocumentImportRepository } from './document-import.repository';
import { KnowledgeItemsRepository } from '../knowledge-items/knowledge-items.repository';
import { KnowledgeImportWorkflow } from '../ai/workflows/knowledge-import.workflow';
import { RedisService } from '../../infrastructure/redis/redis.service';
import { QueueService } from '../../infrastructure/queue/queue.service';
@Injectable()
export class DocumentImportService {
private readonly logger = new Logger(DocumentImportService.name);
constructor(
private readonly repository: DocumentImportRepository,
private readonly knowledgeItemsRepo: KnowledgeItemsRepository,
private readonly workflow: KnowledgeImportWorkflow,
private readonly redis: RedisService,
private readonly queue: QueueService,
) {}
async createImport(dto: {
userId?: string;
knowledgeBaseId?: string;
fileName?: string;
sourceType?: string;
rawText?: string;
}) {
const lockKey = `lock:document-import:${dto.fileName || Date.now()}`;
const lockToken = await this.redis.lock(lockKey, 1800);
if (!lockToken) {
throw new Error('相同文件正在导入中,请稍候');
}
const job = await this.repository.create(dto);
await this.redis.set(`job:document-import:${job.id}:status`, 'pending', 86400);
await this.redis.set(`job:document-import:${job.id}:progress`, '0', 86400);
await this.redis.set(`job:document-import:${job.id}:message`, '任务已加入队列', 86400);
this.queue.add('document-import', {
importId: job.id,
userId: dto.userId || 'anonymous',
knowledgeBaseId: dto.knowledgeBaseId,
rawText: dto.rawText,
fileName: dto.fileName,
});
this.processImport(job, dto.rawText, dto.knowledgeBaseId, lockKey, lockToken);
return job;
}
private async processImport(
job: { id: string; userId?: string },
rawText: string | undefined,
knowledgeBaseId: string | undefined,
lockKey: string,
lockToken: string,
) {
try {
if (!rawText) {
await this.repository.updateStatus(job.id, 'completed');
await this.redis.set(`job:document-import:${job.id}:status`, 'completed', 86400);
await this.redis.set(`job:document-import:${job.id}:progress`, '100', 86400);
await this.redis.set(`job:document-import:${job.id}:message`, '无需解析的空文件', 86400);
await this.redis.unlock(lockKey, lockToken);
return;
}
await this.repository.updateStatus(job.id, 'processing');
await this.redis.set(`job:document-import:${job.id}:status`, 'parsing', 86400);
await this.redis.set(`job:document-import:${job.id}:progress`, '25', 86400);
await this.redis.set(`job:document-import:${job.id}:message`, 'AI 正在分析文本,提取知识点...', 86400);
const result = await this.workflow.execute({
userId: job.userId || 'anonymous',
rawText,
sourceName: undefined,
});
await this.redis.set(`job:document-import:${job.id}:status`, 'saving', 86400);
await this.redis.set(`job:document-import:${job.id}:progress`, '80', 86400);
await this.redis.set(`job:document-import:${job.id}:message`, `正在保存 ${result.knowledgePoints.length} 个知识点...`, 86400);
if (knowledgeBaseId && result.knowledgePoints.length > 0) {
for (let i = 0; i < result.knowledgePoints.length; i++) {
const kp = result.knowledgePoints[i];
await this.knowledgeItemsRepo.create(job.userId || 'anonymous', knowledgeBaseId, {
title: kp.title,
content: kp.content,
itemType: 'lesson',
orderIndex: kp.suggestedOrder ?? i + 1,
});
}
}
await this.repository.updateStatus(job.id, 'completed');
await this.redis.set(`job:document-import:${job.id}:status`, 'completed', 86400);
await this.redis.set(`job:document-import:${job.id}:progress`, '100', 86400);
await this.redis.set(`job:document-import:${job.id}:message`, `成功提取 ${result.knowledgePoints.length} 个知识点`, 86400);
await this.redis.unlock(lockKey, lockToken);
this.logger.log(`Import ${job.id} completed: ${result.knowledgePoints.length} knowledge points`);
} catch (error: any) {
this.logger.error(`Import ${job.id} failed: ${error.message}`);
await this.repository.updateStatus(job.id, 'failed');
await this.redis.set(`job:document-import:${job.id}:status`, 'failed', 86400);
await this.redis.set(`job:document-import:${job.id}:message`, `导入失败: ${error.message}`, 86400);
await this.redis.unlock(lockKey, lockToken);
}
}
async getStatus(id: string) {
const redisStatus = await this.redis.get(`job:document-import:${id}:status`);
const redisProgress = await this.redis.get(`job:document-import:${id}:progress`);
const redisMessage = await this.redis.get(`job:document-import:${id}:message`);
const dbJob = await this.repository.findById(id);
return {
id,
fileName: dbJob?.sourceName,
status: redisStatus || dbJob?.status || 'unknown',
progress: redisProgress ? parseInt(redisProgress, 10) : 0,
message: redisMessage || null,
};
}
}