api-server/src/modules/document-import/document-import.service.ts

78 lines
3.2 KiB
TypeScript
Raw Normal View History

import { Injectable, Logger } from '@nestjs/common';
import { DocumentImportRepository } from './document-import.repository';
import { RedisService } from '../../infrastructure/redis/redis.service';
import { QueueService } from '../../infrastructure/queue/queue.service';
@Injectable()
export class DocumentImportService {
private readonly logger = new Logger(DocumentImportService.name);
constructor(
private readonly repository: DocumentImportRepository,
private readonly redis: RedisService,
private readonly queue: QueueService,
) {}
async createImport(dto: any) {
const lockKey = `lock:document-import:${dto.fileName || Date.now()}`;
const lockToken = await this.redis.lock(lockKey, 1800);
if (!lockToken) {
throw new Error('相同文件正在导入中,请稍候');
}
const job = await this.repository.create(dto);
await this.redis.set(`job:document-import:${job.id}:status`, 'pending', 86400);
await this.redis.set(`job:document-import:${job.id}:progress`, '0', 86400);
await this.redis.set(`job:document-import:${job.id}:message`, '任务已加入队列', 86400);
this.queue.add('document-import', { importId: job.id, userId: dto.userId || 'anonymous' });
this.processImport(job, lockKey, lockToken);
return job;
}
private processImport(job: any, lockKey: string, lockToken: string) {
this.repository.updateStatus(job.id, 'processing');
this.redis.set(`job:document-import:${job.id}:status`, 'parsing', 86400);
this.redis.set(`job:document-import:${job.id}:message`, '正在解析文件', 86400);
this.redis.set(`job:document-import:${job.id}:progress`, '25', 86400);
setTimeout(async () => {
await this.redis.set(`job:document-import:${job.id}:status`, 'chunking', 86400);
await this.redis.set(`job:document-import:${job.id}:message`, '正在分段提取', 86400);
await this.redis.set(`job:document-import:${job.id}:progress`, '50', 86400);
setTimeout(async () => {
await this.redis.set(`job:document-import:${job.id}:status`, 'generating', 86400);
await this.redis.set(`job:document-import:${job.id}:message`, '正在生成知识点', 86400);
await this.redis.set(`job:document-import:${job.id}:progress`, '75', 86400);
setTimeout(async () => {
this.repository.updateStatus(job.id, 'completed');
await this.redis.set(`job:document-import:${job.id}:status`, 'completed', 86400);
await this.redis.set(`job:document-import:${job.id}:progress`, '100', 86400);
await this.redis.unlock(lockKey, lockToken);
this.logger.log(`Import ${job.id} completed`);
}, 1000);
}, 1000);
}, 1000);
}
async getStatus(id: string) {
const redisStatus = await this.redis.get(`job:document-import:${id}:status`);
const redisProgress = await this.redis.get(`job:document-import:${id}:progress`);
const redisMessage = await this.redis.get(`job:document-import:${id}:message`);
const dbJob = await this.repository.findById(id);
return {
id,
fileName: dbJob?.sourceName,
status: redisStatus || dbJob?.status || 'unknown',
progress: redisProgress ? parseInt(redisProgress, 10) : 0,
message: redisMessage || null,
};
}
}