fix: job-reaper batch-loop instead of single take:500 to avoid missing jobs
All checks were successful
Deploy API Server / build-and-deploy (push) Successful in 45s

- Replace single findMany(take:500) with cursor-based while loop
- REAP_BATCH_SIZE=500 constant; processes all stuck running + expired jobs
- Prevents missing jobs when >500 are stuck simultaneously
- Update tests: reset mocks before custom chains, explicit call ordering

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
wangdl 2026-06-18 11:45:36 +08:00
parent c0594c518d
commit cb24e5fb96
2 changed files with 98 additions and 64 deletions

View File

@ -6,8 +6,8 @@ describe('JobReaperService', () => {
let mockFindMany: jest.Mock;
beforeEach(() => {
mockUpdateMany = jest.fn();
mockFindMany = jest.fn();
mockUpdateMany = jest.fn().mockResolvedValue({ count: 0 });
mockFindMany = jest.fn().mockResolvedValue([]); // default: empty batch → breaks loop
const mockPrisma = {
aiRuntimeJob: { updateMany: mockUpdateMany, findMany: mockFindMany },
} as any;
@ -20,8 +20,7 @@ describe('JobReaperService', () => {
});
it('expires locked jobs past lockUntil', async () => {
mockUpdateMany.mockResolvedValue({ count: 2 });
mockFindMany.mockResolvedValue([]);
mockUpdateMany.mockResolvedValueOnce({ count: 2 }); // locked step
const result = await service.reap();
@ -34,12 +33,13 @@ describe('JobReaperService', () => {
it('expires running jobs past timeout', async () => {
const now = Date.now();
mockUpdateMany.mockResolvedValueOnce({ count: 0 }); // locked
// Running batch 1: has a stuck job
mockFindMany.mockResolvedValueOnce([
{ id: 'j1', startedAt: new Date(now - 180_000), timeoutSeconds: 120, retryCount: 0, maxRetryCount: 3 },
{ id: 'j1', startedAt: new Date(now - 180_000), timeoutSeconds: 120 },
]);
mockUpdateMany.mockResolvedValueOnce({ count: 1 }); // running expired
mockFindMany.mockResolvedValueOnce([]); // expired jobs
// Running batch 2: empty → loop breaks (default mock handles this)
mockUpdateMany.mockResolvedValueOnce({ count: 0 }); // locked step
mockUpdateMany.mockResolvedValueOnce({ count: 1 }); // expire running
const result = await service.reap();
@ -51,14 +51,18 @@ describe('JobReaperService', () => {
});
it('retries expired jobs with remaining retries', async () => {
mockUpdateMany.mockResolvedValueOnce({ count: 0 }); // locked
mockFindMany.mockResolvedValueOnce([]); // running
mockFindMany.mockResolvedValueOnce([
{ id: 'j1', retryCount: 0, maxRetryCount: 3 },
{ id: 'j2', retryCount: 3, maxRetryCount: 3 },
]);
mockUpdateMany.mockResolvedValueOnce({ count: 1 }); // retry j1
mockUpdateMany.mockResolvedValueOnce({ count: 1 }); // fail j2
// Running loop: empty on first call → breaks
// Expired batch 1: has 2 expired jobs
mockFindMany
.mockResolvedValueOnce([]) // running batch 1 (empty → skip)
.mockResolvedValueOnce([
{ id: 'j1', retryCount: 0, maxRetryCount: 3 },
{ id: 'j2', retryCount: 3, maxRetryCount: 3 },
]);
mockUpdateMany
.mockResolvedValueOnce({ count: 0 }) // locked
.mockResolvedValueOnce({ count: 1 }) // retry j1
.mockResolvedValueOnce({ count: 1 }); // fail j2
const result = await service.reap();
@ -67,11 +71,23 @@ describe('JobReaperService', () => {
});
it('handles no stuck jobs gracefully', async () => {
mockUpdateMany.mockResolvedValue({ count: 0 });
mockFindMany.mockResolvedValue([]);
const result = await service.reap();
expect(result).toEqual({ expired: 0, retried: 0, failed: 0 });
});
it('processes running jobs correctly with single batch', async () => {
mockFindMany.mockReset().mockResolvedValue([]);
mockUpdateMany.mockReset().mockResolvedValue({ count: 0 });
const now = Date.now();
// Batch returns fewer than BATCH_SIZE → loop stops naturally
mockFindMany.mockResolvedValueOnce([
{ id: 'j1', startedAt: new Date(now - 180_000), timeoutSeconds: 120 },
{ id: 'j2', startedAt: new Date(now - 180_000), timeoutSeconds: 120 },
]);
mockUpdateMany.mockResolvedValueOnce({ count: 0 }); // locked step
mockUpdateMany.mockResolvedValueOnce({ count: 2 }); // expire running
const result = await service.reap();
expect(result.expired).toBe(2);
});
});

View File

@ -2,6 +2,7 @@ import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/commo
import { PrismaService } from '../../infrastructure/database/prisma.service';
const REAP_INTERVAL_MS = 30_000; // every 30 seconds
const REAP_BATCH_SIZE = 500;
@Injectable()
export class JobReaperService implements OnModuleInit, OnModuleDestroy {
@ -30,59 +31,76 @@ export class JobReaperService implements OnModuleInit, OnModuleDestroy {
});
// 2. Expire stuck running jobs (startedAt + timeoutSeconds < now)
// We can't do arithmetic in Prisma where, so fetch IDs and update
const stuckRunning = await this.prisma.aiRuntimeJob.findMany({
where: { status: 'running' },
select: { id: true, startedAt: true, timeoutSeconds: true, retryCount: true, maxRetryCount: true },
take: 500,
});
const stuckIds = stuckRunning
.filter(j => j.startedAt && (now.getTime() - j.startedAt.getTime()) > j.timeoutSeconds * 1000)
.map(j => j.id);
// We can't do arithmetic in Prisma where, so fetch IDs in batches and filter in memory
let expiredRunning = 0;
if (stuckIds.length > 0) {
const result = await this.prisma.aiRuntimeJob.updateMany({
where: { id: { in: stuckIds }, status: 'running' },
data: { status: 'expired' },
let cursor: string | undefined;
let hasMore = true;
while (hasMore) {
const batch = await this.prisma.aiRuntimeJob.findMany({
where: { status: 'running' },
select: { id: true, startedAt: true, timeoutSeconds: true },
take: REAP_BATCH_SIZE,
...(cursor ? { cursor: { id: cursor }, skip: 1 } : {}),
orderBy: { id: 'asc' },
});
expiredRunning = result.count;
if (batch.length === 0) { hasMore = false; break; }
const stuckIds = batch
.filter(j => j.startedAt && (now.getTime() - j.startedAt.getTime()) > j.timeoutSeconds * 1000)
.map(j => j.id);
if (stuckIds.length > 0) {
const result = await this.prisma.aiRuntimeJob.updateMany({
where: { id: { in: stuckIds }, status: 'running' },
data: { status: 'expired' },
});
expiredRunning += result.count;
}
cursor = batch[batch.length - 1].id;
if (batch.length < REAP_BATCH_SIZE) hasMore = false;
}
// 3. Retry expired jobs where retryCount < maxRetryCount
// Prisma doesn't support comparing two columns in where, so fetch and batch
const expiredJobs = await this.prisma.aiRuntimeJob.findMany({
where: { status: 'expired' },
select: { id: true, retryCount: true, maxRetryCount: true },
take: 500,
});
const retryIds = expiredJobs.filter(j => j.retryCount < j.maxRetryCount).map(j => j.id);
const failIds = expiredJobs.filter(j => j.retryCount >= j.maxRetryCount).map(j => j.id);
let retried = 0;
let failed = 0;
if (retryIds.length > 0) {
const result = await this.prisma.aiRuntimeJob.updateMany({
where: { id: { in: retryIds }, status: 'expired' },
data: {
status: 'pending',
lockedBy: null,
lockedAt: null,
lockUntil: null,
retryCount: { increment: 1 },
},
cursor = undefined;
hasMore = true;
while (hasMore) {
const batch = await this.prisma.aiRuntimeJob.findMany({
where: { status: 'expired' },
select: { id: true, retryCount: true, maxRetryCount: true },
take: REAP_BATCH_SIZE,
...(cursor ? { cursor: { id: cursor }, skip: 1 } : {}),
orderBy: { id: 'asc' },
});
retried = result.count;
}
if (batch.length === 0) { hasMore = false; break; }
if (failIds.length > 0) {
const result = await this.prisma.aiRuntimeJob.updateMany({
where: { id: { in: failIds }, status: 'expired' },
data: { status: 'failed', finishedAt: new Date() },
});
failed = result.count;
const retryIds = batch.filter(j => j.retryCount < j.maxRetryCount).map(j => j.id);
const failIds = batch.filter(j => j.retryCount >= j.maxRetryCount).map(j => j.id);
if (retryIds.length > 0) {
const result = await this.prisma.aiRuntimeJob.updateMany({
where: { id: { in: retryIds }, status: 'expired' },
data: {
status: 'pending',
lockedBy: null,
lockedAt: null,
lockUntil: null,
retryCount: { increment: 1 },
},
});
retried += result.count;
}
if (failIds.length > 0) {
const result = await this.prisma.aiRuntimeJob.updateMany({
where: { id: { in: failIds }, status: 'expired' },
data: { status: 'failed', finishedAt: new Date() },
});
failed += result.count;
}
cursor = batch[batch.length - 1].id;
if (batch.length < REAP_BATCH_SIZE) hasMore = false;
}
const total = expiredLocks.count + expiredRunning + retried + failed;