diff --git a/src/modules/ai-runtime/internal/runtime-internal.service.ts b/src/modules/ai-runtime/internal/runtime-internal.service.ts index 75f83cd..759dec9 100644 --- a/src/modules/ai-runtime/internal/runtime-internal.service.ts +++ b/src/modules/ai-runtime/internal/runtime-internal.service.ts @@ -35,6 +35,24 @@ export class RuntimeInternalService { }, }); + // Register / update RuntimeInstance with capabilities + if (supSnapshot.length > 0 || supOutput.length > 0) { + await this.prisma.runtimeInstance.upsert({ + where: { runtimeInstanceId }, + create: { + runtimeInstanceId, + status: 'active', + lastHeartbeatAt: new Date(), + capabilities: capabilities as any, + }, + update: { + status: 'active', + lastHeartbeatAt: new Date(), + capabilities: capabilities as any, + }, + }); + } + // Post-filter by snapshotVersion if needed (requires snapshot join) if (supSnapshot.length > 0) { const snapshotIds = [...new Set(jobs.map(j => j.snapshotId).filter(Boolean))]; @@ -92,14 +110,14 @@ export class RuntimeInternalService { const now = new Date(); const lockUntil = new Date(now.getTime() + 60_000); - // First heartbeat: locked → running; subsequent heartbeats: extend lockUntil + // First heartbeat: locked → running + set startedAt; subsequent: extend lockUntil const result = await this.prisma.aiRuntimeJob.updateMany({ where: { id: jobId, lockedBy: runtimeInstanceId, status: { in: ['locked', 'running'] }, }, - data: { lockUntil, status: 'running' }, + data: { lockUntil, status: 'running', startedAt: new Date() }, }); if (result.count === 0) { @@ -176,6 +194,14 @@ export class RuntimeInternalService { throw new ConflictException({ errorCode: 'JOB_NOT_ACTIVE', message: `Job is ${job.status}, cannot accept result` }); } + // Validate schema version compatibility + if (job.outputSchemaVersion && dto.schemaVersion !== job.outputSchemaVersion) { + throw new BadRequestException({ + errorCode: 'RESULT_SCHEMA_UNSUPPORTED', + message: `Result schemaVersion ${dto.schemaVersion} does not match job outputSchemaVersion ${job.outputSchemaVersion}`, + }); + } + const resultIdempotencyKey = `${jobId}:${dto.attemptNo}:${dto.outputHash ?? ''}`; // Check duplicate