diff --git a/src/modules/ai-runtime/ai-runtime.module.ts b/src/modules/ai-runtime/ai-runtime.module.ts index 6b92009..18255bc 100644 --- a/src/modules/ai-runtime/ai-runtime.module.ts +++ b/src/modules/ai-runtime/ai-runtime.module.ts @@ -4,11 +4,13 @@ import { PrismaModule } from '../../infrastructure/database/prisma.module'; import { UserAiController } from './user-ai.controller'; import { UserAiService } from './user-ai.service'; import { CredentialEncryptionService } from './credential-encryption.service'; +import { RuntimeInternalController } from './internal/runtime-internal.controller'; +import { RuntimeInternalService } from './internal/runtime-internal.service'; @Module({ imports: [ConfigModule, PrismaModule], - controllers: [UserAiController], - providers: [UserAiService, CredentialEncryptionService], - exports: [UserAiService, CredentialEncryptionService], + controllers: [UserAiController, RuntimeInternalController], + providers: [UserAiService, CredentialEncryptionService, RuntimeInternalService], + exports: [UserAiService, CredentialEncryptionService, RuntimeInternalService], }) export class AiRuntimeModule {} diff --git a/src/modules/ai-runtime/internal/runtime-internal.controller.ts b/src/modules/ai-runtime/internal/runtime-internal.controller.ts new file mode 100644 index 0000000..c79030a --- /dev/null +++ b/src/modules/ai-runtime/internal/runtime-internal.controller.ts @@ -0,0 +1,77 @@ +import { Controller, Get, Post, Param, Body, Req, UseGuards, HttpCode, HttpStatus } from '@nestjs/common'; +import { InternalAuthGuard } from '../../../common/guards/internal-auth.guard'; +import { RuntimeInternalService } from './runtime-internal.service'; +import { + RuntimePollJobsRequestDto, RuntimeLockJobRequestDto, RuntimeHeartbeatRequestDto, + RuntimeResolveCredentialRequestDto, RuntimeSubmitResultRequestDto, + RuntimeSubmitFailureRequestDto, RuntimeSubmitInvocationLogsRequestDto, +} from './dto/runtime-internal.dto'; + +@Controller('internal/runtime') +@UseGuards(InternalAuthGuard) +export class RuntimeInternalController { + constructor(private readonly service: RuntimeInternalService) {} + + private instanceId(req: any): string { + return (req.headers['x-runtime-instance-id'] as string) || 'unknown'; + } + + // ── Poll ── + + @Post('jobs/poll') + async pollJobs(@Req() req: any, @Body() dto: RuntimePollJobsRequestDto) { + return this.service.pollJobs(this.instanceId(req), dto.supportedJobTypes, dto.limit ?? 5, dto.capabilities); + } + + // ── Lock ── + + @Post('jobs/:jobId/lock') + async lockJob(@Req() req: any, @Param('jobId') jobId: string, @Body() dto: RuntimeLockJobRequestDto) { + return this.service.lockJob(jobId, dto.runtimeInstanceId || this.instanceId(req)); + } + + // ── Heartbeat ── + + @Post('jobs/:jobId/heartbeat') + @HttpCode(HttpStatus.NO_CONTENT) + async heartbeatJob(@Req() req: any, @Param('jobId') jobId: string, @Body() dto: RuntimeHeartbeatRequestDto) { + await this.service.heartbeatJob(jobId, dto.runtimeInstanceId || this.instanceId(req)); + } + + // ── Snapshot ── + + @Get('jobs/:jobId/snapshot') + async getSnapshot(@Param('jobId') jobId: string) { + return this.service.getSnapshot(jobId); + } + + // ── Credential Resolve ── + + @Post('model-credentials/resolve') + async resolveCredential(@Body() dto: RuntimeResolveCredentialRequestDto) { + return this.service.resolveCredential(dto.jobId, dto.apiKeyMode, dto.provider, dto.credentialId); + } + + // ── Submit Result ── + + @Post('jobs/:jobId/result') + @HttpCode(HttpStatus.CREATED) + async submitResult(@Param('jobId') jobId: string, @Body() dto: RuntimeSubmitResultRequestDto) { + return this.service.submitResult(jobId, dto); + } + + // ── Submit Failure ── + + @Post('jobs/:jobId/fail') + async submitFailure(@Param('jobId') jobId: string, @Body() dto: RuntimeSubmitFailureRequestDto) { + return this.service.submitFailure(jobId, dto); + } + + // ── Submit Invocation Logs ── + + @Post('invocation-logs') + @HttpCode(HttpStatus.CREATED) + async submitInvocationLogs(@Body() dto: RuntimeSubmitInvocationLogsRequestDto) { + return this.service.submitInvocationLogs(dto.logs); + } +} diff --git a/src/modules/ai-runtime/internal/runtime-internal.service.ts b/src/modules/ai-runtime/internal/runtime-internal.service.ts new file mode 100644 index 0000000..2dcb9cf --- /dev/null +++ b/src/modules/ai-runtime/internal/runtime-internal.service.ts @@ -0,0 +1,268 @@ +import { Injectable, NotFoundException, ConflictException, BadRequestException } from '@nestjs/common'; +import { PrismaService } from '../../../infrastructure/database/prisma.service'; +import { UserAiService } from '../user-ai.service'; + +@Injectable() +export class RuntimeInternalService { + constructor( + private readonly prisma: PrismaService, + private readonly userAi: UserAiService, + ) {} + + // ── Poll ── + + async pollJobs(runtimeInstanceId: string, supportedJobTypes: string[], limit: number, capabilities?: Record) { + const jobs = await this.prisma.aiRuntimeJob.findMany({ + where: { + status: 'pending', + jobType: { in: supportedJobTypes }, + }, + orderBy: [{ priority: 'asc' }, { createdAt: 'asc' }], + take: Math.min(limit || 5, 50), + select: { + id: true, jobType: true, targetType: true, targetId: true, + priority: true, snapshotId: true, promptVersion: true, outputSchemaVersion: true, + }, + }); + return { jobs }; + } + + // ── Lock ── + + async lockJob(jobId: string, runtimeInstanceId: string) { + const now = new Date(); + const lockUntil = new Date(now.getTime() + 60_000); + + const result = await this.prisma.aiRuntimeJob.updateMany({ + where: { + id: jobId, + status: 'pending', + OR: [ + { lockUntil: null }, + { lockUntil: { lt: now } }, + ], + }, + data: { + status: 'locked', + lockedBy: runtimeInstanceId, + lockedAt: now, + lockUntil, + }, + }); + + if (result.count === 0) { + throw new ConflictException({ + errorCode: 'JOB_ALREADY_LOCKED', + message: 'Job is already locked or not in pending status', + }); + } + + return { jobId, status: 'locked', lockUntil: lockUntil.getTime() }; + } + + // ── Heartbeat ── + + async heartbeatJob(jobId: string, runtimeInstanceId: string) { + const now = new Date(); + const lockUntil = new Date(now.getTime() + 60_000); + + const result = await this.prisma.aiRuntimeJob.updateMany({ + where: { id: jobId, lockedBy: runtimeInstanceId, status: 'locked' }, + data: { lockUntil }, + }); + + if (result.count === 0) { + throw new NotFoundException({ errorCode: 'JOB_NOT_FOUND', message: 'Job not found or not locked by this runtime' }); + } + } + + // ── Snapshot ── + + async getSnapshot(jobId: string) { + const job = await this.prisma.aiRuntimeJob.findUnique({ + where: { id: jobId }, + select: { id: true, snapshotId: true }, + }); + if (!job) throw new NotFoundException({ errorCode: 'JOB_NOT_FOUND', message: 'Job not found' }); + if (!job.snapshotId) throw new NotFoundException({ errorCode: 'SNAPSHOT_NOT_FOUND', message: 'No snapshot bound to this job' }); + + const snapshot = await this.prisma.learningAnalysisSnapshot.findUnique({ + where: { id: job.snapshotId }, + }); + if (!snapshot) throw new NotFoundException({ errorCode: 'SNAPSHOT_NOT_FOUND', message: 'Snapshot not found' }); + + if (snapshot.expiresAt && new Date(snapshot.expiresAt) < new Date()) { + throw new NotFoundException({ errorCode: 'SNAPSHOT_EXPIRED', message: 'Snapshot has expired for this job' }); + } + + return { + jobId: job.id, + snapshotId: snapshot.id, + snapshotVersion: snapshot.snapshotVersion, + privacyScope: snapshot.privacyScope, + userProfile: snapshot.userProfile, + aiSettings: snapshot.aiSettings, + deviceContext: snapshot.deviceContext, + learningBehaviorSummary: snapshot.learningBehaviorSummary, + materialProgressSummary: snapshot.materialProgressSummary, + contentStructureSummary: snapshot.contentStructureSummary, + behaviorSignals: snapshot.behaviorSignals, + scoreSignals: snapshot.scoreSignals, + constraints: snapshot.constraints, + allowedModelFields: snapshot.allowedModelFields, + }; + } + + // ── Credential Resolve ── + + async resolveCredential(jobId: string, apiKeyMode: string, provider: string, credentialId?: string) { + const job = await this.prisma.aiRuntimeJob.findUnique({ + where: { id: jobId }, + select: { userId: true }, + }); + if (!job) throw new NotFoundException({ errorCode: 'JOB_NOT_FOUND', message: 'Job not found' }); + + if (apiKeyMode === 'user_deepseek_key') { + if (!credentialId) throw new BadRequestException({ errorCode: 'CREDENTIAL_NOT_FOUND', message: 'credentialId required for user_deepseek_key mode' }); + const { provider: resolvedProvider, apiKey } = await this.userAi.resolveCredentialForJob(job.userId, credentialId); + return { provider: resolvedProvider, model: 'deepseek-chat', apiKey, apiKeyMode: 'user_deepseek_key' }; + } + + // platform_key: Runtime should use its own env var as primary; API returns empty key as signal + return { provider, model: 'deepseek-chat', apiKey: '', apiKeyMode: 'platform_key' }; + } + + // ── Result ── + + async submitResult(jobId: string, dto: { + runtimeInstanceId: string; schemaVersion: string; status: string; + rawOutput?: any; validatedOutput?: any; validationErrors?: string[]; + usage?: any; attemptNo: number; outputHash?: string; + }) { + const job = await this.prisma.aiRuntimeJob.findUnique({ where: { id: jobId } }); + if (!job) throw new NotFoundException({ errorCode: 'JOB_NOT_FOUND', message: 'Job not found' }); + + const resultIdempotencyKey = `${jobId}:${dto.attemptNo}:${dto.outputHash ?? ''}`; + + // Check duplicate + const existing = await this.prisma.aiRuntimeResult.findFirst({ + where: { resultIdempotencyKey }, + }); + if (existing) return { status: 'ok', duplicate: true }; + + // Check already succeeded with different hash + const existingResult = await this.prisma.aiRuntimeResult.findUnique({ where: { jobId } }); + if (existingResult && existingResult.status === 'succeeded') { + throw new ConflictException({ errorCode: 'RESULT_ALREADY_EXISTS', message: 'Job already has a succeeded result' }); + } + + await this.prisma.aiRuntimeResult.create({ + data: { + jobId, userId: job.userId, + runtimeInstanceId: dto.runtimeInstanceId, + status: dto.status, + attemptNo: dto.attemptNo, + resultIdempotencyKey, + outputHash: dto.outputHash, + rawOutput: dto.rawOutput as any, + validatedOutput: dto.validatedOutput as any, + schemaVersion: dto.schemaVersion, + validationErrors: dto.validationErrors as any, + }, + }); + + await this.prisma.aiRuntimeJob.update({ + where: { id: jobId }, + data: { status: 'succeeded', finishedAt: new Date() }, + }); + + return { status: 'ok', duplicate: false }; + } + + // ── Fail ── + + async submitFailure(jobId: string, dto: { + runtimeInstanceId: string; errorCode: string; errorMessage: string; + retryable: boolean; rawError?: string; + }) { + const job = await this.prisma.aiRuntimeJob.findUnique({ where: { id: jobId } }); + if (!job) throw new NotFoundException({ errorCode: 'JOB_NOT_FOUND', message: 'Job not found' }); + + const newRetryCount = job.retryCount + 1; + const exceeded = newRetryCount > job.maxRetryCount; + + if (dto.retryable && !exceeded) { + await this.prisma.aiRuntimeJob.update({ + where: { id: jobId }, + data: { + status: 'pending', + lockedBy: null, lockedAt: null, lockUntil: null, + retryCount: newRetryCount, + errorCode: dto.errorCode, + errorMessage: dto.errorMessage, + }, + }); + } else { + await this.prisma.aiRuntimeJob.update({ + where: { id: jobId }, + data: { + status: 'failed', + finishedAt: new Date(), + retryCount: newRetryCount, + errorCode: dto.errorCode, + errorMessage: dto.errorMessage, + }, + }); + } + + return { status: exceeded ? 'failed' : 'pending', retryCount: newRetryCount }; + } + + // ── Invocation Logs ── + + async submitInvocationLogs(logs: Array<{ + jobId: string; provider: string; model: string; apiKeyMode: string; + credentialId?: string; promptName: string; promptVersion: string; + outputSchemaVersion: string; inputTokens: number; outputTokens: number; + totalTokens: number; latencyMs: number; costEstimate?: number; + success: boolean; errorCode?: string; errorMessage?: string; + retryCount: number; runtimeInstanceId: string; + traceId?: string; correlationId?: string; + }>) { + const created = await Promise.all( + logs.map(async (log) => { + const job = await this.prisma.aiRuntimeJob.findUnique({ + where: { id: log.jobId }, select: { userId: true }, + }); + if (!job) return null; + + return this.prisma.modelInvocationLog.create({ + data: { + userId: job.userId, + jobId: log.jobId, + provider: log.provider, + model: log.model, + apiKeyMode: log.apiKeyMode, + credentialId: log.credentialId, + promptName: log.promptName, + promptVersion: log.promptVersion, + outputSchemaVersion: log.outputSchemaVersion, + inputTokens: log.inputTokens, + outputTokens: log.outputTokens, + totalTokens: log.totalTokens, + latencyMs: log.latencyMs, + costEstimate: log.costEstimate, + success: log.success, + errorCode: log.errorCode, + errorMessage: log.errorMessage, + retryCount: log.retryCount, + runtimeInstanceId: log.runtimeInstanceId, + traceId: log.traceId, + correlationId: log.correlationId, + }, + }); + }), + ); + return { accepted: created.filter(Boolean).length }; + } +}