2026-06-08 21:09:13 +08:00
|
|
|
import { Injectable } from '@nestjs/common';
|
2026-06-08 21:12:36 +08:00
|
|
|
import { PrismaService } from '../../infrastructure/database/prisma.service';
|
2026-06-08 21:09:13 +08:00
|
|
|
import { LearningSessionRepository } from '../learning-session/learning-session.repository';
|
|
|
|
|
import { LearningActivityRepository } from '../learning-activity/learning-activity.repository';
|
|
|
|
|
import { LearningRecordService } from '../learning-record/learning-record.service';
|
|
|
|
|
import { MaterialReadingProgressService } from '../material-reading-progress/material-reading-progress.service';
|
|
|
|
|
import { ReadingEventErrorCode, ReadingEventWarningCode } from './reading-event-codes';
|
|
|
|
|
|
|
|
|
|
const VALID_EVENT_TYPES = new Set([
|
|
|
|
|
'material_opened', 'material_closed', 'position_changed', 'heartbeat', 'marked_as_read',
|
|
|
|
|
]);
|
|
|
|
|
const VALID_TARGET_TYPES = new Set(['knowledge_source', 'temporary_file']);
|
|
|
|
|
const MAX_DELTA = 300;
|
|
|
|
|
|
2026-06-08 21:12:36 +08:00
|
|
|
export interface ProcessResult {
|
2026-06-08 21:09:13 +08:00
|
|
|
processed: number;
|
|
|
|
|
duplicate: number;
|
|
|
|
|
failed: number;
|
|
|
|
|
warnings: Array<{ eventId?: string; code: string; message: string }>;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
interface ValidatedEvent {
|
|
|
|
|
eventId: string;
|
|
|
|
|
clientSessionId: string;
|
|
|
|
|
materialId: string;
|
|
|
|
|
readingTargetType: string;
|
|
|
|
|
eventType: string;
|
|
|
|
|
position: any | null;
|
|
|
|
|
activeSecondsDelta: number;
|
|
|
|
|
clientTimestampMs: bigint;
|
|
|
|
|
clientTimezoneOffsetMinutes: number | null;
|
|
|
|
|
sequence: number;
|
|
|
|
|
platform: string | null;
|
|
|
|
|
appVersion: string | null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Injectable()
|
|
|
|
|
export class ReadingEventProcessorService {
|
|
|
|
|
constructor(
|
|
|
|
|
private readonly prisma: PrismaService,
|
|
|
|
|
private readonly sessionRepo: LearningSessionRepository,
|
|
|
|
|
private readonly progressSvc: MaterialReadingProgressService,
|
|
|
|
|
private readonly activityRepo: LearningActivityRepository,
|
|
|
|
|
private readonly recordSvc: LearningRecordService,
|
|
|
|
|
) {}
|
|
|
|
|
|
|
|
|
|
async processBatch(userId: string, events: Array<Record<string, any>>): Promise<ProcessResult> {
|
|
|
|
|
// Lazy cleanup: mark interrupted sessions before processing
|
|
|
|
|
await this.cleanupInterruptedSessions(userId);
|
|
|
|
|
|
|
|
|
|
const result: ProcessResult = { processed: 0, duplicate: 0, failed: 0, warnings: [] };
|
|
|
|
|
|
|
|
|
|
for (const e of events) {
|
|
|
|
|
const { outcome, warnings } = await this.processOne(userId, e);
|
|
|
|
|
switch (outcome) {
|
|
|
|
|
case 'processed': result.processed++; break;
|
|
|
|
|
case 'duplicate': result.duplicate++; break;
|
|
|
|
|
case 'failed': result.failed++; break;
|
|
|
|
|
}
|
|
|
|
|
result.warnings.push(...warnings);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** Clean up interrupted sessions for a user before processing new events. */
|
|
|
|
|
async cleanupInterruptedSessions(userId: string): Promise<number> {
|
|
|
|
|
const cutoff = new Date(Date.now() - 30 * 60 * 1000); // 30 min ago
|
|
|
|
|
|
|
|
|
|
const result = await this.prisma.learningSession.updateMany({
|
|
|
|
|
where: {
|
|
|
|
|
userId,
|
|
|
|
|
status: 'active',
|
|
|
|
|
lastEventAt: { lt: cutoff },
|
|
|
|
|
},
|
|
|
|
|
data: { status: 'interrupted', endedAt: new Date() },
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
return result.count;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async processOne(
|
|
|
|
|
userId: string,
|
|
|
|
|
e: Record<string, any>,
|
|
|
|
|
): Promise<{ outcome: 'processed' | 'duplicate' | 'failed'; warnings: Array<{ eventId?: string; code: string; message: string }> }> {
|
|
|
|
|
// 1. Validate
|
|
|
|
|
const validated = this.validateEvent(e);
|
|
|
|
|
if (!validated) {
|
|
|
|
|
return { outcome: 'failed', warnings: [{ eventId: e.eventId, code: ReadingEventErrorCode.INVALID_EVENT_TYPE, message: 'Validation failed' }] };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 2. Dedup
|
|
|
|
|
const isDuplicate = await this.checkDuplicate(userId, e.eventId);
|
|
|
|
|
if (isDuplicate) {
|
|
|
|
|
return { outcome: 'duplicate', warnings: [{ eventId: e.eventId, code: ReadingEventWarningCode.DUPLICATE_EVENT, message: 'Duplicate eventId' }] };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 3. Validate access and resolve target
|
|
|
|
|
const access = await this.validateReadingAccess(userId, e.readingTargetType, e.materialId);
|
|
|
|
|
const eventWarnings = this.collectWarnings(e);
|
|
|
|
|
const knowledgeBaseId = access.allowed ? access.knowledgeBaseId : null;
|
|
|
|
|
if (!access.allowed) {
|
|
|
|
|
eventWarnings.push(access.errorCode);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 4. Insert event + aggregate + mark processed in one transaction (F6)
|
|
|
|
|
await this.prisma.$transaction(async (tx) => {
|
|
|
|
|
await this.insertReadingEvent(tx, userId, validated, knowledgeBaseId, eventWarnings);
|
|
|
|
|
|
|
|
|
|
// 5a. Aggregate → LearningSession
|
|
|
|
|
await this.sessionRepo.upsertFromReadingEvent(tx, {
|
|
|
|
|
userId,
|
|
|
|
|
clientSessionId: validated.clientSessionId,
|
|
|
|
|
materialId: validated.materialId,
|
|
|
|
|
readingTargetType: validated.readingTargetType,
|
|
|
|
|
knowledgeBaseId: knowledgeBaseId,
|
|
|
|
|
eventType: validated.eventType,
|
|
|
|
|
activeSecondsDelta: validated.activeSecondsDelta,
|
|
|
|
|
position: validated.position,
|
|
|
|
|
timestampMs: validated.clientTimestampMs,
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// 5b. Aggregate → MaterialReadingProgress
|
|
|
|
|
await this.progressSvc.upsertFromReadingEvent(tx, {
|
|
|
|
|
userId,
|
|
|
|
|
readingTargetType: validated.readingTargetType,
|
|
|
|
|
materialId: validated.materialId,
|
|
|
|
|
knowledgeBaseId: knowledgeBaseId,
|
|
|
|
|
eventType: validated.eventType,
|
|
|
|
|
activeSecondsDelta: validated.activeSecondsDelta,
|
|
|
|
|
position: validated.position,
|
|
|
|
|
isNewSession: validated.eventType === 'material_opened',
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// 5c. Aggregate → DailyLearningActivity
|
|
|
|
|
await this.activityRepo.upsertFromReadingEvent(tx, {
|
|
|
|
|
userId,
|
|
|
|
|
clientTimestampMs: validated.clientTimestampMs,
|
|
|
|
|
clientTimezoneOffsetMinutes: validated.clientTimezoneOffsetMinutes,
|
|
|
|
|
activeSecondsDelta: validated.activeSecondsDelta,
|
|
|
|
|
isNewMaterial: validated.eventType === 'material_opened',
|
|
|
|
|
isMarkedRead: validated.eventType === 'marked_as_read',
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// 5d. Write LearningRecord (first open / closed / marked read)
|
|
|
|
|
if (['material_opened', 'material_closed', 'marked_as_read'].includes(validated.eventType)) {
|
|
|
|
|
const recordTitle = validated.eventType === 'material_opened' ? 'Reading started'
|
|
|
|
|
: validated.eventType === 'material_closed' ? 'Reading ended'
|
|
|
|
|
: 'Marked as read';
|
|
|
|
|
await this.recordSvc.createReadingRecordTx(tx, {
|
|
|
|
|
userId,
|
|
|
|
|
sessionId: validated.clientSessionId,
|
|
|
|
|
materialId: validated.materialId,
|
|
|
|
|
readingTargetType: validated.readingTargetType,
|
|
|
|
|
knowledgeBaseId: knowledgeBaseId,
|
|
|
|
|
title: recordTitle,
|
|
|
|
|
totalActiveSeconds: validated.activeSecondsDelta,
|
|
|
|
|
lastPosition: validated.position,
|
|
|
|
|
occurredAt: new Date(Number(validated.clientTimestampMs)),
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 6. Mark processed
|
|
|
|
|
await tx.readingEvent.update({
|
|
|
|
|
where: { userId_eventId: { userId, eventId: e.eventId } },
|
|
|
|
|
data: { status: 'processed', processedAt: new Date() },
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
const resultWarnings = eventWarnings.map(code => ({ eventId: e.eventId, code, message: code }));
|
|
|
|
|
return { outcome: 'processed', warnings: resultWarnings };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ── Step 1: Validate ──
|
|
|
|
|
|
|
|
|
|
validateEvent(e: Record<string, any>): ValidatedEvent | null {
|
|
|
|
|
if (!e.eventId || !e.clientSessionId || !e.materialId) return null;
|
|
|
|
|
if (!VALID_TARGET_TYPES.has(e.readingTargetType)) return null;
|
|
|
|
|
if (!VALID_EVENT_TYPES.has(e.eventType)) return null;
|
|
|
|
|
|
|
|
|
|
const delta = Number(e.activeSecondsDelta ?? 0);
|
|
|
|
|
if (isNaN(delta) || delta < 0) return null;
|
|
|
|
|
|
|
|
|
|
const ts = Number(e.clientTimestampMs ?? 0);
|
|
|
|
|
if (isNaN(ts) || ts <= 0) return null;
|
|
|
|
|
|
|
|
|
|
let position = e.position ?? null;
|
|
|
|
|
if (position && !this.isValidPosition(position)) {
|
|
|
|
|
position = null; // save event but don't update progress
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
eventId: e.eventId,
|
|
|
|
|
clientSessionId: e.clientSessionId,
|
|
|
|
|
materialId: e.materialId,
|
|
|
|
|
readingTargetType: e.readingTargetType,
|
|
|
|
|
eventType: e.eventType,
|
|
|
|
|
position,
|
|
|
|
|
activeSecondsDelta: Math.min(delta, MAX_DELTA),
|
|
|
|
|
clientTimestampMs: BigInt(ts),
|
|
|
|
|
clientTimezoneOffsetMinutes: e.clientTimezoneOffsetMinutes ?? null,
|
|
|
|
|
sequence: Number(e.sequence ?? 0),
|
|
|
|
|
platform: e.platform ?? null,
|
|
|
|
|
appVersion: e.appVersion ?? null,
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ── Step 2: Dedup ──
|
|
|
|
|
|
|
|
|
|
private async checkDuplicate(userId: string, eventId: string): Promise<boolean> {
|
|
|
|
|
const existing = await this.prisma.readingEvent.findUnique({
|
|
|
|
|
where: { userId_eventId: { userId, eventId } },
|
|
|
|
|
select: { id: true, status: true },
|
|
|
|
|
});
|
|
|
|
|
// Only skip if already successfully processed (not failed/pending)
|
|
|
|
|
return !!existing && existing.status === 'processed';
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ── Step 3: Resolve target (with access validation) ──
|
|
|
|
|
|
|
|
|
|
/** Validate reading access and return knowledgeBaseId or error code. */
|
|
|
|
|
async validateReadingAccess(
|
|
|
|
|
userId: string,
|
|
|
|
|
readingTargetType: string,
|
|
|
|
|
materialId: string,
|
|
|
|
|
): Promise<{ allowed: true; knowledgeBaseId: string | null } | { allowed: false; errorCode: string }> {
|
|
|
|
|
if (readingTargetType === 'knowledge_source') {
|
|
|
|
|
const src = await this.prisma.knowledgeSource.findUnique({
|
|
|
|
|
where: { id: materialId },
|
|
|
|
|
select: { userId: true, knowledgeBaseId: true, deletedAt: true },
|
|
|
|
|
});
|
|
|
|
|
if (!src) return { allowed: false, errorCode: ReadingEventErrorCode.MATERIAL_NOT_FOUND };
|
|
|
|
|
if (src.userId !== userId) return { allowed: false, errorCode: ReadingEventErrorCode.MATERIAL_ACCESS_DENIED };
|
|
|
|
|
if (src.deletedAt) return { allowed: false, errorCode: ReadingEventWarningCode.SOURCE_DELETED };
|
|
|
|
|
return { allowed: true, knowledgeBaseId: src.knowledgeBaseId };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (readingTargetType === 'temporary_file') {
|
|
|
|
|
const mat = await this.prisma.temporaryReadingMaterial.findUnique({
|
|
|
|
|
where: { id: materialId },
|
|
|
|
|
select: { userId: true, deletedAt: true, expiresAt: true, sourceStatus: true },
|
|
|
|
|
});
|
|
|
|
|
if (!mat) return { allowed: false, errorCode: ReadingEventErrorCode.TEMPORARY_MATERIAL_NOT_FOUND };
|
|
|
|
|
if (mat.userId !== userId) return { allowed: false, errorCode: ReadingEventErrorCode.MATERIAL_ACCESS_DENIED };
|
|
|
|
|
if (mat.deletedAt) return { allowed: false, errorCode: ReadingEventWarningCode.SOURCE_DELETED };
|
|
|
|
|
if (mat.expiresAt && mat.expiresAt < new Date()) {
|
|
|
|
|
return { allowed: false, errorCode: ReadingEventErrorCode.TEMPORARY_MATERIAL_EXPIRED };
|
|
|
|
|
}
|
|
|
|
|
if (mat.sourceStatus === 'expired') {
|
|
|
|
|
return { allowed: false, errorCode: ReadingEventErrorCode.TEMPORARY_MATERIAL_EXPIRED };
|
|
|
|
|
}
|
|
|
|
|
return { allowed: true, knowledgeBaseId: null };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return { allowed: false, errorCode: ReadingEventErrorCode.INVALID_TARGET_TYPE };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async resolveReadingTarget(
|
|
|
|
|
userId: string,
|
|
|
|
|
readingTargetType: string,
|
|
|
|
|
materialId: string,
|
|
|
|
|
): Promise<{ knowledgeBaseId: string | null } | null> {
|
|
|
|
|
if (readingTargetType === 'knowledge_source') {
|
|
|
|
|
const src = await this.prisma.knowledgeSource.findUnique({
|
|
|
|
|
where: { id: materialId },
|
|
|
|
|
select: { userId: true, knowledgeBaseId: true },
|
|
|
|
|
});
|
|
|
|
|
if (!src || src.userId !== userId) return null;
|
|
|
|
|
return { knowledgeBaseId: src.knowledgeBaseId };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (readingTargetType === 'temporary_file') {
|
|
|
|
|
const mat = await this.prisma.temporaryReadingMaterial.findUnique({
|
|
|
|
|
where: { id: materialId },
|
|
|
|
|
select: { userId: true, deletedAt: true, expiresAt: true, sourceStatus: true },
|
|
|
|
|
});
|
|
|
|
|
if (!mat || mat.userId !== userId) return null;
|
|
|
|
|
if (mat.deletedAt) return null;
|
|
|
|
|
if (mat.expiresAt && mat.expiresAt < new Date()) return null;
|
|
|
|
|
if (mat.sourceStatus === 'expired') return null;
|
|
|
|
|
return { knowledgeBaseId: null };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ── Step 4: Insert ──
|
|
|
|
|
|
|
|
|
|
private async insertReadingEvent(
|
|
|
|
|
tx: any,
|
|
|
|
|
userId: string,
|
|
|
|
|
e: ValidatedEvent,
|
|
|
|
|
knowledgeBaseId: string | null,
|
|
|
|
|
warnings: string[],
|
|
|
|
|
) {
|
|
|
|
|
await tx.readingEvent.create({
|
|
|
|
|
data: {
|
|
|
|
|
userId,
|
|
|
|
|
eventId: e.eventId,
|
|
|
|
|
clientSessionId: e.clientSessionId,
|
|
|
|
|
readingTargetType: e.readingTargetType,
|
|
|
|
|
materialId: e.materialId,
|
|
|
|
|
knowledgeBaseId,
|
|
|
|
|
eventType: e.eventType,
|
|
|
|
|
position: e.position ?? undefined,
|
|
|
|
|
activeSecondsDelta: e.activeSecondsDelta,
|
|
|
|
|
clientTimestampMs: e.clientTimestampMs,
|
|
|
|
|
clientTimezoneOffsetMinutes: e.clientTimezoneOffsetMinutes,
|
|
|
|
|
sequence: e.sequence,
|
|
|
|
|
platform: e.platform,
|
|
|
|
|
appVersion: e.appVersion,
|
|
|
|
|
status: 'processing',
|
|
|
|
|
warningCodes: warnings.length > 0 ? warnings : undefined,
|
|
|
|
|
serverReceivedAt: new Date(),
|
|
|
|
|
},
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ── Helpers ──
|
|
|
|
|
|
|
|
|
|
private isValidPosition(pos: any): boolean {
|
|
|
|
|
if (!pos || typeof pos !== 'object') return false;
|
|
|
|
|
return ['Markdown', 'Text', 'Pdf', 'Image', 'Epub', 'Unknown'].includes(pos.type);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private collectWarnings(e: Record<string, any>): string[] {
|
|
|
|
|
const warnings: string[] = [];
|
|
|
|
|
const delta = Number(e.activeSecondsDelta ?? 0);
|
|
|
|
|
if (delta > MAX_DELTA) warnings.push(ReadingEventWarningCode.ACTIVE_SECONDS_CAPPED);
|
|
|
|
|
|
|
|
|
|
const ts = Number(e.clientTimestampMs ?? 0);
|
|
|
|
|
if (ts > Date.now() + 5 * 60 * 1000) warnings.push(ReadingEventWarningCode.CLIENT_TIMESTAMP_SKEWED);
|
|
|
|
|
|
|
|
|
|
if (e.position && !this.isValidPosition(e.position)) {
|
|
|
|
|
warnings.push(ReadingEventWarningCode.POSITION_IGNORED);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return warnings;
|
|
|
|
|
}
|
|
|
|
|
}
|