api-server/src/modules/ai-runtime/job-reaper.service.ts

99 lines
3.4 KiB
TypeScript
Raw Normal View History

import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { PrismaService } from '../../infrastructure/database/prisma.service';
const REAP_INTERVAL_MS = 30_000; // every 30 seconds
@Injectable()
export class JobReaperService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(JobReaperService.name);
private timer: ReturnType<typeof setInterval> | null = null;
constructor(private readonly prisma: PrismaService) {}
async onModuleInit() {
await this.reap().catch(() => {});
this.timer = setInterval(() => this.reap().catch(() => {}), REAP_INTERVAL_MS);
}
onModuleDestroy() {
if (this.timer) clearInterval(this.timer);
}
/** Recover jobs stuck in locked or running state past their timeout. */
async reap(): Promise<{ expired: number; retried: number; failed: number }> {
const now = new Date();
// 1. Expire stuck locked jobs (lockUntil passed)
const expiredLocks = await this.prisma.aiRuntimeJob.updateMany({
where: { status: 'locked', lockUntil: { lt: now } },
data: { status: 'expired' },
});
// 2. Expire stuck running jobs (startedAt + timeoutSeconds < now)
// We can't do arithmetic in Prisma where, so fetch IDs and update
const stuckRunning = await this.prisma.aiRuntimeJob.findMany({
where: { status: 'running' },
select: { id: true, startedAt: true, timeoutSeconds: true, retryCount: true, maxRetryCount: true },
take: 500,
});
const stuckIds = stuckRunning
.filter(j => j.startedAt && (now.getTime() - j.startedAt.getTime()) > j.timeoutSeconds * 1000)
.map(j => j.id);
let expiredRunning = 0;
if (stuckIds.length > 0) {
const result = await this.prisma.aiRuntimeJob.updateMany({
where: { id: { in: stuckIds }, status: 'running' },
data: { status: 'expired' },
});
expiredRunning = result.count;
}
// 3. Retry expired jobs where retryCount < maxRetryCount
// Prisma doesn't support comparing two columns in where, so fetch and batch
const expiredJobs = await this.prisma.aiRuntimeJob.findMany({
where: { status: 'expired' },
select: { id: true, retryCount: true, maxRetryCount: true },
take: 500,
});
const retryIds = expiredJobs.filter(j => j.retryCount < j.maxRetryCount).map(j => j.id);
const failIds = expiredJobs.filter(j => j.retryCount >= j.maxRetryCount).map(j => j.id);
let retried = 0;
let failed = 0;
if (retryIds.length > 0) {
const result = await this.prisma.aiRuntimeJob.updateMany({
where: { id: { in: retryIds }, status: 'expired' },
data: {
status: 'pending',
lockedBy: null,
lockedAt: null,
lockUntil: null,
retryCount: { increment: 1 },
},
});
retried = result.count;
}
if (failIds.length > 0) {
const result = await this.prisma.aiRuntimeJob.updateMany({
where: { id: { in: failIds }, status: 'expired' },
data: { status: 'failed', finishedAt: new Date() },
});
failed = result.count;
}
const total = expiredLocks.count + expiredRunning + retried + failed;
if (total > 0) {
this.logger.log(
`Reaped: ${expiredLocks.count} locked expired, ${expiredRunning} running expired, ` +
`${retried} retried → pending, ${failed} failed`,
);
}
return { expired: expiredLocks.count + expiredRunning, retried, failed };
}
}