import { Injectable } from '@nestjs/common'; import { PrismaService } from '../../infrastructure/prisma.service'; import { LearningSessionRepository } from '../learning-session/learning-session.repository'; import { LearningActivityRepository } from '../learning-activity/learning-activity.repository'; import { LearningRecordService } from '../learning-record/learning-record.service'; import { MaterialReadingProgressService } from '../material-reading-progress/material-reading-progress.service'; import { ReadingEventErrorCode, ReadingEventWarningCode } from './reading-event-codes'; const VALID_EVENT_TYPES = new Set([ 'material_opened', 'material_closed', 'position_changed', 'heartbeat', 'marked_as_read', ]); const VALID_TARGET_TYPES = new Set(['knowledge_source', 'temporary_file']); const MAX_DELTA = 300; interface ProcessResult { processed: number; duplicate: number; failed: number; warnings: Array<{ eventId?: string; code: string; message: string }>; } interface ValidatedEvent { eventId: string; clientSessionId: string; materialId: string; readingTargetType: string; eventType: string; position: any | null; activeSecondsDelta: number; clientTimestampMs: bigint; clientTimezoneOffsetMinutes: number | null; sequence: number; platform: string | null; appVersion: string | null; } @Injectable() export class ReadingEventProcessorService { constructor( private readonly prisma: PrismaService, private readonly sessionRepo: LearningSessionRepository, private readonly progressSvc: MaterialReadingProgressService, private readonly activityRepo: LearningActivityRepository, private readonly recordSvc: LearningRecordService, ) {} async processBatch(userId: string, events: Array>): Promise { // Lazy cleanup: mark interrupted sessions before processing await this.cleanupInterruptedSessions(userId); const result: ProcessResult = { processed: 0, duplicate: 0, failed: 0, warnings: [] }; for (const e of events) { const { outcome, warnings } = await this.processOne(userId, e); switch (outcome) { case 'processed': result.processed++; break; case 'duplicate': result.duplicate++; break; case 'failed': result.failed++; break; } result.warnings.push(...warnings); } return result; } /** Clean up interrupted sessions for a user before processing new events. */ async cleanupInterruptedSessions(userId: string): Promise { const cutoff = new Date(Date.now() - 30 * 60 * 1000); // 30 min ago const result = await this.prisma.learningSession.updateMany({ where: { userId, status: 'active', lastEventAt: { lt: cutoff }, }, data: { status: 'interrupted', endedAt: new Date() }, }); return result.count; } async processOne( userId: string, e: Record, ): Promise<{ outcome: 'processed' | 'duplicate' | 'failed'; warnings: Array<{ eventId?: string; code: string; message: string }> }> { // 1. Validate const validated = this.validateEvent(e); if (!validated) { return { outcome: 'failed', warnings: [{ eventId: e.eventId, code: ReadingEventErrorCode.INVALID_EVENT_TYPE, message: 'Validation failed' }] }; } // 2. Dedup const isDuplicate = await this.checkDuplicate(userId, e.eventId); if (isDuplicate) { return { outcome: 'duplicate', warnings: [{ eventId: e.eventId, code: ReadingEventWarningCode.DUPLICATE_EVENT, message: 'Duplicate eventId' }] }; } // 3. Validate access and resolve target const access = await this.validateReadingAccess(userId, e.readingTargetType, e.materialId); const eventWarnings = this.collectWarnings(e); const knowledgeBaseId = access.allowed ? access.knowledgeBaseId : null; if (!access.allowed) { eventWarnings.push(access.errorCode); } // 4. Insert event + aggregate + mark processed in one transaction (F6) await this.prisma.$transaction(async (tx) => { await this.insertReadingEvent(tx, userId, validated, knowledgeBaseId, eventWarnings); // 5a. Aggregate → LearningSession await this.sessionRepo.upsertFromReadingEvent(tx, { userId, clientSessionId: validated.clientSessionId, materialId: validated.materialId, readingTargetType: validated.readingTargetType, knowledgeBaseId: knowledgeBaseId, eventType: validated.eventType, activeSecondsDelta: validated.activeSecondsDelta, position: validated.position, timestampMs: validated.clientTimestampMs, }); // 5b. Aggregate → MaterialReadingProgress await this.progressSvc.upsertFromReadingEvent(tx, { userId, readingTargetType: validated.readingTargetType, materialId: validated.materialId, knowledgeBaseId: knowledgeBaseId, eventType: validated.eventType, activeSecondsDelta: validated.activeSecondsDelta, position: validated.position, isNewSession: validated.eventType === 'material_opened', }); // 5c. Aggregate → DailyLearningActivity await this.activityRepo.upsertFromReadingEvent(tx, { userId, clientTimestampMs: validated.clientTimestampMs, clientTimezoneOffsetMinutes: validated.clientTimezoneOffsetMinutes, activeSecondsDelta: validated.activeSecondsDelta, isNewMaterial: validated.eventType === 'material_opened', isMarkedRead: validated.eventType === 'marked_as_read', }); // 5d. Write LearningRecord (first open / closed / marked read) if (['material_opened', 'material_closed', 'marked_as_read'].includes(validated.eventType)) { const recordTitle = validated.eventType === 'material_opened' ? 'Reading started' : validated.eventType === 'material_closed' ? 'Reading ended' : 'Marked as read'; await this.recordSvc.createReadingRecordTx(tx, { userId, sessionId: validated.clientSessionId, materialId: validated.materialId, readingTargetType: validated.readingTargetType, knowledgeBaseId: knowledgeBaseId, title: recordTitle, totalActiveSeconds: validated.activeSecondsDelta, lastPosition: validated.position, occurredAt: new Date(Number(validated.clientTimestampMs)), }); } // 6. Mark processed await tx.readingEvent.update({ where: { userId_eventId: { userId, eventId: e.eventId } }, data: { status: 'processed', processedAt: new Date() }, }); }); const resultWarnings = eventWarnings.map(code => ({ eventId: e.eventId, code, message: code })); return { outcome: 'processed', warnings: resultWarnings }; } // ── Step 1: Validate ── validateEvent(e: Record): ValidatedEvent | null { if (!e.eventId || !e.clientSessionId || !e.materialId) return null; if (!VALID_TARGET_TYPES.has(e.readingTargetType)) return null; if (!VALID_EVENT_TYPES.has(e.eventType)) return null; const delta = Number(e.activeSecondsDelta ?? 0); if (isNaN(delta) || delta < 0) return null; const ts = Number(e.clientTimestampMs ?? 0); if (isNaN(ts) || ts <= 0) return null; let position = e.position ?? null; if (position && !this.isValidPosition(position)) { position = null; // save event but don't update progress } return { eventId: e.eventId, clientSessionId: e.clientSessionId, materialId: e.materialId, readingTargetType: e.readingTargetType, eventType: e.eventType, position, activeSecondsDelta: Math.min(delta, MAX_DELTA), clientTimestampMs: BigInt(ts), clientTimezoneOffsetMinutes: e.clientTimezoneOffsetMinutes ?? null, sequence: Number(e.sequence ?? 0), platform: e.platform ?? null, appVersion: e.appVersion ?? null, }; } // ── Step 2: Dedup ── private async checkDuplicate(userId: string, eventId: string): Promise { const existing = await this.prisma.readingEvent.findUnique({ where: { userId_eventId: { userId, eventId } }, select: { id: true, status: true }, }); // Only skip if already successfully processed (not failed/pending) return !!existing && existing.status === 'processed'; } // ── Step 3: Resolve target (with access validation) ── /** Validate reading access and return knowledgeBaseId or error code. */ async validateReadingAccess( userId: string, readingTargetType: string, materialId: string, ): Promise<{ allowed: true; knowledgeBaseId: string | null } | { allowed: false; errorCode: string }> { if (readingTargetType === 'knowledge_source') { const src = await this.prisma.knowledgeSource.findUnique({ where: { id: materialId }, select: { userId: true, knowledgeBaseId: true, deletedAt: true }, }); if (!src) return { allowed: false, errorCode: ReadingEventErrorCode.MATERIAL_NOT_FOUND }; if (src.userId !== userId) return { allowed: false, errorCode: ReadingEventErrorCode.MATERIAL_ACCESS_DENIED }; if (src.deletedAt) return { allowed: false, errorCode: ReadingEventWarningCode.SOURCE_DELETED }; return { allowed: true, knowledgeBaseId: src.knowledgeBaseId }; } if (readingTargetType === 'temporary_file') { const mat = await this.prisma.temporaryReadingMaterial.findUnique({ where: { id: materialId }, select: { userId: true, deletedAt: true, expiresAt: true, sourceStatus: true }, }); if (!mat) return { allowed: false, errorCode: ReadingEventErrorCode.TEMPORARY_MATERIAL_NOT_FOUND }; if (mat.userId !== userId) return { allowed: false, errorCode: ReadingEventErrorCode.MATERIAL_ACCESS_DENIED }; if (mat.deletedAt) return { allowed: false, errorCode: ReadingEventWarningCode.SOURCE_DELETED }; if (mat.expiresAt && mat.expiresAt < new Date()) { return { allowed: false, errorCode: ReadingEventErrorCode.TEMPORARY_MATERIAL_EXPIRED }; } if (mat.sourceStatus === 'expired') { return { allowed: false, errorCode: ReadingEventErrorCode.TEMPORARY_MATERIAL_EXPIRED }; } return { allowed: true, knowledgeBaseId: null }; } return { allowed: false, errorCode: ReadingEventErrorCode.INVALID_TARGET_TYPE }; } async resolveReadingTarget( userId: string, readingTargetType: string, materialId: string, ): Promise<{ knowledgeBaseId: string | null } | null> { if (readingTargetType === 'knowledge_source') { const src = await this.prisma.knowledgeSource.findUnique({ where: { id: materialId }, select: { userId: true, knowledgeBaseId: true }, }); if (!src || src.userId !== userId) return null; return { knowledgeBaseId: src.knowledgeBaseId }; } if (readingTargetType === 'temporary_file') { const mat = await this.prisma.temporaryReadingMaterial.findUnique({ where: { id: materialId }, select: { userId: true, deletedAt: true, expiresAt: true, sourceStatus: true }, }); if (!mat || mat.userId !== userId) return null; if (mat.deletedAt) return null; if (mat.expiresAt && mat.expiresAt < new Date()) return null; if (mat.sourceStatus === 'expired') return null; return { knowledgeBaseId: null }; } return null; } // ── Step 4: Insert ── private async insertReadingEvent( tx: any, userId: string, e: ValidatedEvent, knowledgeBaseId: string | null, warnings: string[], ) { await tx.readingEvent.create({ data: { userId, eventId: e.eventId, clientSessionId: e.clientSessionId, readingTargetType: e.readingTargetType, materialId: e.materialId, knowledgeBaseId, eventType: e.eventType, position: e.position ?? undefined, activeSecondsDelta: e.activeSecondsDelta, clientTimestampMs: e.clientTimestampMs, clientTimezoneOffsetMinutes: e.clientTimezoneOffsetMinutes, sequence: e.sequence, platform: e.platform, appVersion: e.appVersion, status: 'processing', warningCodes: warnings.length > 0 ? warnings : undefined, serverReceivedAt: new Date(), }, }); } // ── Helpers ── private isValidPosition(pos: any): boolean { if (!pos || typeof pos !== 'object') return false; return ['Markdown', 'Text', 'Pdf', 'Image', 'Epub', 'Unknown'].includes(pos.type); } private collectWarnings(e: Record): string[] { const warnings: string[] = []; const delta = Number(e.activeSecondsDelta ?? 0); if (delta > MAX_DELTA) warnings.push(ReadingEventWarningCode.ACTIVE_SECONDS_CAPPED); const ts = Number(e.clientTimestampMs ?? 0); if (ts > Date.now() + 5 * 60 * 1000) warnings.push(ReadingEventWarningCode.CLIENT_TIMESTAMP_SKEWED); if (e.position && !this.isValidPosition(e.position)) { warnings.push(ReadingEventWarningCode.POSITION_IGNORED); } return warnings; } }