feat: implement P1 async — AI analysis + document import via BullMQ workers
All checks were successful
Deploy API Server / build-and-deploy (push) Successful in 59s

B12: AI analysis now async — POST /ai-analysis queues job, returns immediately.
     Worker supports both active-recall and feynman-evaluation types.
B13: DocumentImportWorker fully implemented — all processing moved from
     service to worker. Service only queues and returns.
B14: NotificationWorker already complete (no changes needed).
B15: All 3 workers now fully functional.

New endpoint: GET /ai-analysis/jobs/:id for job status polling.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
WangDL 2026-05-18 10:17:06 +08:00
parent 597c7b2310
commit b1a6160d29
7 changed files with 196 additions and 108 deletions

View File

@ -10,23 +10,41 @@ export class AiAnalysisController {
constructor(private readonly service: AiAnalysisService) {} constructor(private readonly service: AiAnalysisService) {}
@Post() @Post()
@ApiOperation({ summary: '提交主动回忆分析' }) @ApiOperation({ summary: '提交主动回忆分析(异步)' })
async analyze( async analyze(
@CurrentUser() user: UserPayload, @CurrentUser() user: UserPayload,
@Body() body: { questionText: string; knowledgeItemContent: string; userAnswer: string }, @Body() body: {
questionText: string;
knowledgeItemContent: string;
userAnswer: string;
sessionId?: string;
answerId?: string;
},
) { ) {
return this.service.analyze(String(user?.id || 'anonymous'), body); return this.service.analyze(String(user?.id || 'anonymous'), body);
} }
@Post('feynman') @Post('feynman')
@ApiOperation({ summary: '提交费曼解释评估' }) @ApiOperation({ summary: '提交费曼解释评估(异步)' })
async evaluateFeynman( async evaluateFeynman(
@CurrentUser() user: UserPayload, @CurrentUser() user: UserPayload,
@Body() body: { knowledgeItemTitle: string; knowledgeItemContent: string; userExplanation: string }, @Body() body: {
knowledgeItemTitle: string;
knowledgeItemContent: string;
userExplanation: string;
sessionId?: string;
answerId?: string;
},
) { ) {
return this.service.evaluateFeynman(String(user?.id || 'anonymous'), body); return this.service.evaluateFeynman(String(user?.id || 'anonymous'), body);
} }
@Get('jobs/:id')
@ApiOperation({ summary: '查询 AI 分析任务状态' })
async getJobStatus(@Param('id') id: string) {
return this.service.getJobStatus(id);
}
@Get(':id') @Get(':id')
@ApiOperation({ summary: '获取分析结果' }) @ApiOperation({ summary: '获取分析结果' })
async findOne(@Param('id') id: string) { async findOne(@Param('id') id: string) {

View File

@ -5,11 +5,39 @@ import { PrismaService } from '../../infrastructure/database/prisma.service';
export class AiAnalysisRepository { export class AiAnalysisRepository {
constructor(private readonly prisma: PrismaService) {} constructor(private readonly prisma: PrismaService) {}
async createResult(userId: string, result: Record<string, any>) { async createJob(userId: string, jobType: string, sessionId?: string, answerId?: string) {
return this.prisma.aiAnalysisJob.create({
data: {
userId,
jobType,
sessionId: sessionId ?? null,
answerId: answerId ?? null,
status: 'pending',
queuedAt: new Date(),
},
});
}
async updateJobStatus(id: string, status: string, errorMessage?: string) {
const data: Record<string, any> = { status };
if (status === 'processing') data.startedAt = new Date();
if (status === 'completed' || status === 'failed') data.completedAt = new Date();
if (errorMessage) data.errorMessage = errorMessage;
return this.prisma.aiAnalysisJob.update({ where: { id }, data });
}
async findJobById(id: string) {
return this.prisma.aiAnalysisJob.findUnique({
where: { id },
include: { results: true },
});
}
async createResult(userId: string, jobId: string, result: Record<string, any>) {
return this.prisma.aiAnalysisResult.create({ return this.prisma.aiAnalysisResult.create({
data: { data: {
userId, userId,
jobId: '', jobId,
summary: result.summary ?? '', summary: result.summary ?? '',
masteryScore: result.score ?? null, masteryScore: result.score ?? null,
strengths: (result.strengths ?? []) as any, strengths: (result.strengths ?? []) as any,

View File

@ -1,48 +1,69 @@
import { Injectable, Logger } from '@nestjs/common'; import { Injectable, NotFoundException } from '@nestjs/common';
import { ActiveRecallAnalysisWorkflow } from '../ai/workflows/active-recall-analysis.workflow';
import { FeynmanEvaluationWorkflow } from '../ai/workflows/feynman-evaluation.workflow';
import { AiAnalysisRepository } from './ai-analysis.repository'; import { AiAnalysisRepository } from './ai-analysis.repository';
import { QueueService } from '../../infrastructure/queue/queue.service';
@Injectable() @Injectable()
export class AiAnalysisService { export class AiAnalysisService {
private readonly logger = new Logger(AiAnalysisService.name);
constructor( constructor(
private readonly workflow: ActiveRecallAnalysisWorkflow,
private readonly feynmanWorkflow: FeynmanEvaluationWorkflow,
private readonly repository: AiAnalysisRepository, private readonly repository: AiAnalysisRepository,
private readonly queue: QueueService,
) {} ) {}
async analyze(userId: string, input: { async analyze(userId: string, input: {
questionText: string; questionText: string;
knowledgeItemContent: string; knowledgeItemContent: string;
userAnswer: string; userAnswer: string;
sessionId?: string;
answerId?: string;
}) { }) {
const result = await this.workflow.execute({ const job = await this.repository.createJob(userId, 'active-recall', input.sessionId, input.answerId);
await this.queue.add('ai-analysis', {
jobId: job.id,
userId, userId,
type: 'active-recall',
questionText: input.questionText, questionText: input.questionText,
knowledgeItemContent: input.knowledgeItemContent, knowledgeItemContent: input.knowledgeItemContent,
userAnswer: input.userAnswer, userAnswer: input.userAnswer,
}); });
const saved = await this.repository.createResult(userId, result); return { jobId: job.id, status: 'queued' };
return { resultId: saved.id, ...result };
} }
async evaluateFeynman(userId: string, input: { async evaluateFeynman(userId: string, input: {
knowledgeItemTitle: string; knowledgeItemTitle: string;
knowledgeItemContent: string; knowledgeItemContent: string;
userExplanation: string; userExplanation: string;
sessionId?: string;
answerId?: string;
}) { }) {
const result = await this.feynmanWorkflow.execute({ const job = await this.repository.createJob(userId, 'feynman-evaluation', input.sessionId, input.answerId);
await this.queue.add('ai-analysis', {
jobId: job.id,
userId, userId,
type: 'feynman-evaluation',
knowledgeItemTitle: input.knowledgeItemTitle, knowledgeItemTitle: input.knowledgeItemTitle,
knowledgeItemContent: input.knowledgeItemContent, knowledgeItemContent: input.knowledgeItemContent,
userExplanation: input.userExplanation, userExplanation: input.userExplanation,
}); });
const saved = await this.repository.createResult(userId, result); return { jobId: job.id, status: 'queued' };
return { resultId: saved.id, ...result }; }
async getJobStatus(id: string) {
const job = await this.repository.findJobById(id);
if (!job) throw new NotFoundException('任务不存在');
return {
id: job.id,
type: job.jobType,
status: job.status,
queuedAt: job.queuedAt,
startedAt: job.startedAt,
completedAt: job.completedAt,
errorMessage: job.errorMessage,
results: job.results,
};
} }
async getResult(id: string) { async getResult(id: string) {

View File

@ -1,12 +1,9 @@
import { Module } from '@nestjs/common'; import { Module } from '@nestjs/common';
import { AiModule } from '../ai/ai.module';
import { KnowledgeItemsModule } from '../knowledge-items/knowledge-items.module';
import { DocumentImportController } from './document-import.controller'; import { DocumentImportController } from './document-import.controller';
import { DocumentImportService } from './document-import.service'; import { DocumentImportService } from './document-import.service';
import { DocumentImportRepository } from './document-import.repository'; import { DocumentImportRepository } from './document-import.repository';
@Module({ @Module({
imports: [AiModule, KnowledgeItemsModule],
controllers: [DocumentImportController], controllers: [DocumentImportController],
providers: [DocumentImportService, DocumentImportRepository], providers: [DocumentImportService, DocumentImportRepository],
exports: [DocumentImportService, DocumentImportRepository], exports: [DocumentImportService, DocumentImportRepository],

View File

@ -1,18 +1,12 @@
import { Injectable, Logger } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { DocumentImportRepository } from './document-import.repository'; import { DocumentImportRepository } from './document-import.repository';
import { KnowledgeItemsRepository } from '../knowledge-items/knowledge-items.repository';
import { KnowledgeImportWorkflow } from '../ai/workflows/knowledge-import.workflow';
import { RedisService } from '../../infrastructure/redis/redis.service'; import { RedisService } from '../../infrastructure/redis/redis.service';
import { QueueService } from '../../infrastructure/queue/queue.service'; import { QueueService } from '../../infrastructure/queue/queue.service';
@Injectable() @Injectable()
export class DocumentImportService { export class DocumentImportService {
private readonly logger = new Logger(DocumentImportService.name);
constructor( constructor(
private readonly repository: DocumentImportRepository, private readonly repository: DocumentImportRepository,
private readonly knowledgeItemsRepo: KnowledgeItemsRepository,
private readonly workflow: KnowledgeImportWorkflow,
private readonly redis: RedisService, private readonly redis: RedisService,
private readonly queue: QueueService, private readonly queue: QueueService,
) {} ) {}
@ -36,7 +30,7 @@ export class DocumentImportService {
await this.redis.set(`job:document-import:${job.id}:progress`, '0', 86400); await this.redis.set(`job:document-import:${job.id}:progress`, '0', 86400);
await this.redis.set(`job:document-import:${job.id}:message`, '任务已加入队列', 86400); await this.redis.set(`job:document-import:${job.id}:message`, '任务已加入队列', 86400);
this.queue.add('document-import', { await this.queue.add('document-import', {
importId: job.id, importId: job.id,
userId: dto.userId || 'anonymous', userId: dto.userId || 'anonymous',
knowledgeBaseId: dto.knowledgeBaseId, knowledgeBaseId: dto.knowledgeBaseId,
@ -44,68 +38,10 @@ export class DocumentImportService {
fileName: dto.fileName, fileName: dto.fileName,
}); });
this.processImport(job, dto.rawText, dto.knowledgeBaseId, lockKey, lockToken); // Release the lock — the worker will re-lock if needed
await this.redis.unlock(lockKey, lockToken);
return job; return { jobId: job.id, status: 'queued' };
}
private async processImport(
job: { id: string; userId?: string },
rawText: string | undefined,
knowledgeBaseId: string | undefined,
lockKey: string,
lockToken: string,
) {
try {
if (!rawText) {
await this.repository.updateStatus(job.id, 'completed');
await this.redis.set(`job:document-import:${job.id}:status`, 'completed', 86400);
await this.redis.set(`job:document-import:${job.id}:progress`, '100', 86400);
await this.redis.set(`job:document-import:${job.id}:message`, '无需解析的空文件', 86400);
await this.redis.unlock(lockKey, lockToken);
return;
}
await this.repository.updateStatus(job.id, 'processing');
await this.redis.set(`job:document-import:${job.id}:status`, 'parsing', 86400);
await this.redis.set(`job:document-import:${job.id}:progress`, '25', 86400);
await this.redis.set(`job:document-import:${job.id}:message`, 'AI 正在分析文本,提取知识点...', 86400);
const result = await this.workflow.execute({
userId: job.userId || 'anonymous',
rawText,
sourceName: undefined,
});
await this.redis.set(`job:document-import:${job.id}:status`, 'saving', 86400);
await this.redis.set(`job:document-import:${job.id}:progress`, '80', 86400);
await this.redis.set(`job:document-import:${job.id}:message`, `正在保存 ${result.knowledgePoints.length} 个知识点...`, 86400);
if (knowledgeBaseId && result.knowledgePoints.length > 0) {
for (let i = 0; i < result.knowledgePoints.length; i++) {
const kp = result.knowledgePoints[i];
await this.knowledgeItemsRepo.create(job.userId || 'anonymous', knowledgeBaseId, {
title: kp.title,
content: kp.content,
itemType: 'lesson',
orderIndex: kp.suggestedOrder ?? i + 1,
});
}
}
await this.repository.updateStatus(job.id, 'completed');
await this.redis.set(`job:document-import:${job.id}:status`, 'completed', 86400);
await this.redis.set(`job:document-import:${job.id}:progress`, '100', 86400);
await this.redis.set(`job:document-import:${job.id}:message`, `成功提取 ${result.knowledgePoints.length} 个知识点`, 86400);
await this.redis.unlock(lockKey, lockToken);
this.logger.log(`Import ${job.id} completed: ${result.knowledgePoints.length} knowledge points`);
} catch (error: any) {
this.logger.error(`Import ${job.id} failed: ${error.message}`);
await this.repository.updateStatus(job.id, 'failed');
await this.redis.set(`job:document-import:${job.id}:status`, 'failed', 86400);
await this.redis.set(`job:document-import:${job.id}:message`, `导入失败: ${error.message}`, 86400);
await this.redis.unlock(lockKey, lockToken);
}
} }
async getStatus(id: string) { async getStatus(id: string) {

View File

@ -3,6 +3,7 @@ import { Logger } from '@nestjs/common';
import { Job } from 'bullmq'; import { Job } from 'bullmq';
import { QUEUE_AI_ANALYSIS } from '../infrastructure/queue/queue.service'; import { QUEUE_AI_ANALYSIS } from '../infrastructure/queue/queue.service';
import { ActiveRecallAnalysisWorkflow } from '../modules/ai/workflows/active-recall-analysis.workflow'; import { ActiveRecallAnalysisWorkflow } from '../modules/ai/workflows/active-recall-analysis.workflow';
import { FeynmanEvaluationWorkflow } from '../modules/ai/workflows/feynman-evaluation.workflow';
import { AiAnalysisRepository } from '../modules/ai-analysis/ai-analysis.repository'; import { AiAnalysisRepository } from '../modules/ai-analysis/ai-analysis.repository';
@Processor(QUEUE_AI_ANALYSIS) @Processor(QUEUE_AI_ANALYSIS)
@ -10,22 +11,59 @@ export class AiAnalysisWorker extends WorkerHost {
private readonly logger = new Logger(AiAnalysisWorker.name); private readonly logger = new Logger(AiAnalysisWorker.name);
constructor( constructor(
private readonly workflow: ActiveRecallAnalysisWorkflow, private readonly recallWorkflow: ActiveRecallAnalysisWorkflow,
private readonly feynmanWorkflow: FeynmanEvaluationWorkflow,
private readonly repository: AiAnalysisRepository, private readonly repository: AiAnalysisRepository,
) { ) {
super(); super();
} }
async process(job: Job<{ async process(job: Job<{
jobId: string;
userId: string; userId: string;
questionText: string; type: 'active-recall' | 'feynman-evaluation';
knowledgeItemContent: string; // active-recall fields
userAnswer: string; questionText?: string;
knowledgeItemContent?: string;
userAnswer?: string;
// feynman fields
knowledgeItemTitle?: string;
userExplanation?: string;
}>) { }>) {
this.logger.log(`Processing AI analysis job ${job.id}`); const { jobId, userId, type, knowledgeItemContent } = job.data;
const result = await this.workflow.execute(job.data); this.logger.log(`Processing AI analysis job ${job.id}, dbJobId=${jobId}, type=${type}`);
await this.repository.createResult(job.data.userId, result);
this.logger.log(`AI analysis job ${job.id} completed, score=${result.score}`); try {
return result; await this.repository.updateJobStatus(jobId, 'processing');
if (type === 'feynman-evaluation') {
const result = await this.feynmanWorkflow.execute({
userId,
knowledgeItemTitle: job.data.knowledgeItemTitle || '',
knowledgeItemContent: knowledgeItemContent || '',
userExplanation: job.data.userExplanation || '',
});
await this.repository.createResult(userId, jobId, result);
await this.repository.updateJobStatus(jobId, 'completed');
this.logger.log(`AI analysis job ${job.id} completed (feynman), score=${result.score}`);
return result;
}
// active-recall (default)
const result = await this.recallWorkflow.execute({
userId,
questionText: job.data.questionText || '',
knowledgeItemContent: knowledgeItemContent || '',
userAnswer: job.data.userAnswer || '',
});
await this.repository.createResult(userId, jobId, result);
await this.repository.updateJobStatus(jobId, 'completed');
this.logger.log(`AI analysis job ${job.id} completed (recall), score=${result.score}`);
return result;
} catch (err: any) {
this.logger.error(`AI analysis job ${job.id} failed: ${err.message}`);
await this.repository.updateJobStatus(jobId, 'failed', err.message);
throw err;
}
} }
} }

View File

@ -3,6 +3,9 @@ import { Logger } from '@nestjs/common';
import { Job } from 'bullmq'; import { Job } from 'bullmq';
import { QUEUE_DOCUMENT_IMPORT } from '../infrastructure/queue/queue.service'; import { QUEUE_DOCUMENT_IMPORT } from '../infrastructure/queue/queue.service';
import { DocumentImportRepository } from '../modules/document-import/document-import.repository'; import { DocumentImportRepository } from '../modules/document-import/document-import.repository';
import { KnowledgeItemsRepository } from '../modules/knowledge-items/knowledge-items.repository';
import { KnowledgeImportWorkflow } from '../modules/ai/workflows/knowledge-import.workflow';
import { RedisService } from '../infrastructure/redis/redis.service';
@Processor(QUEUE_DOCUMENT_IMPORT) @Processor(QUEUE_DOCUMENT_IMPORT)
export class DocumentImportWorker extends WorkerHost { export class DocumentImportWorker extends WorkerHost {
@ -10,22 +13,69 @@ export class DocumentImportWorker extends WorkerHost {
constructor( constructor(
private readonly repository: DocumentImportRepository, private readonly repository: DocumentImportRepository,
private readonly knowledgeItemsRepo: KnowledgeItemsRepository,
private readonly workflow: KnowledgeImportWorkflow,
private readonly redis: RedisService,
) { ) {
super(); super();
} }
async process(job: Job<{ importId: string; userId: string }>) { async process(job: Job<{
this.logger.log(`Processing document import job ${job.id}, importId=${job.data.importId}`); importId: string;
await this.repository.updateStatus(job.data.importId, 'processing'); userId: string;
knowledgeBaseId?: string;
rawText?: string;
fileName?: string;
}>) {
const { importId, userId, knowledgeBaseId, rawText, fileName } = job.data;
this.logger.log(`Processing document import job ${job.id}, importId=${importId}`);
try { try {
// TODO: actual file parsing + AI knowledge generation if (!rawText) {
await new Promise((resolve) => setTimeout(resolve, 1000)); await this.repository.updateStatus(importId, 'completed');
await this.repository.updateStatus(job.data.importId, 'completed'); await this.redis.set(`job:document-import:${importId}:status`, 'completed', 86400);
this.logger.log(`Document import job ${job.id} completed`); await this.redis.set(`job:document-import:${importId}:progress`, '100', 86400);
await this.redis.set(`job:document-import:${importId}:message`, '无需解析的空文件', 86400);
return;
}
await this.repository.updateStatus(importId, 'processing');
await this.redis.set(`job:document-import:${importId}:status`, 'parsing', 86400);
await this.redis.set(`job:document-import:${importId}:progress`, '25', 86400);
await this.redis.set(`job:document-import:${importId}:message`, 'AI 正在分析文本,提取知识点...', 86400);
const result = await this.workflow.execute({
userId,
rawText,
sourceName: fileName,
});
await this.redis.set(`job:document-import:${importId}:status`, 'saving', 86400);
await this.redis.set(`job:document-import:${importId}:progress`, '80', 86400);
await this.redis.set(`job:document-import:${importId}:message`, `正在保存 ${result.knowledgePoints.length} 个知识点...`, 86400);
if (knowledgeBaseId && result.knowledgePoints.length > 0) {
for (let i = 0; i < result.knowledgePoints.length; i++) {
const kp = result.knowledgePoints[i];
await this.knowledgeItemsRepo.create(userId, knowledgeBaseId, {
title: kp.title,
content: kp.content,
itemType: 'lesson',
orderIndex: kp.suggestedOrder ?? i + 1,
});
}
}
await this.repository.updateStatus(importId, 'completed');
await this.redis.set(`job:document-import:${importId}:status`, 'completed', 86400);
await this.redis.set(`job:document-import:${importId}:progress`, '100', 86400);
await this.redis.set(`job:document-import:${importId}:message`, `成功提取 ${result.knowledgePoints.length} 个知识点`, 86400);
this.logger.log(`Document import job ${job.id} completed: ${result.knowledgePoints.length} knowledge points`);
} catch (err: any) { } catch (err: any) {
this.logger.error(`Document import job ${job.id} failed: ${err.message}`); this.logger.error(`Document import job ${job.id} failed: ${err.message}`);
await this.repository.updateStatus(job.data.importId, 'failed'); await this.repository.updateStatus(importId, 'failed');
await this.redis.set(`job:document-import:${importId}:status`, 'failed', 86400);
await this.redis.set(`job:document-import:${importId}:message`, `导入失败: ${err.message}`, 86400);
throw err; throw err;
} }
} }