diff --git a/src/modules/ai-runtime/job-reaper.service.spec.ts b/src/modules/ai-runtime/job-reaper.service.spec.ts index 9deb65a..8c547c3 100644 --- a/src/modules/ai-runtime/job-reaper.service.spec.ts +++ b/src/modules/ai-runtime/job-reaper.service.spec.ts @@ -6,8 +6,8 @@ describe('JobReaperService', () => { let mockFindMany: jest.Mock; beforeEach(() => { - mockUpdateMany = jest.fn(); - mockFindMany = jest.fn(); + mockUpdateMany = jest.fn().mockResolvedValue({ count: 0 }); + mockFindMany = jest.fn().mockResolvedValue([]); // default: empty batch → breaks loop const mockPrisma = { aiRuntimeJob: { updateMany: mockUpdateMany, findMany: mockFindMany }, } as any; @@ -20,8 +20,7 @@ describe('JobReaperService', () => { }); it('expires locked jobs past lockUntil', async () => { - mockUpdateMany.mockResolvedValue({ count: 2 }); - mockFindMany.mockResolvedValue([]); + mockUpdateMany.mockResolvedValueOnce({ count: 2 }); // locked step const result = await service.reap(); @@ -34,12 +33,13 @@ describe('JobReaperService', () => { it('expires running jobs past timeout', async () => { const now = Date.now(); - mockUpdateMany.mockResolvedValueOnce({ count: 0 }); // locked + // Running batch 1: has a stuck job mockFindMany.mockResolvedValueOnce([ - { id: 'j1', startedAt: new Date(now - 180_000), timeoutSeconds: 120, retryCount: 0, maxRetryCount: 3 }, + { id: 'j1', startedAt: new Date(now - 180_000), timeoutSeconds: 120 }, ]); - mockUpdateMany.mockResolvedValueOnce({ count: 1 }); // running expired - mockFindMany.mockResolvedValueOnce([]); // expired jobs + // Running batch 2: empty → loop breaks (default mock handles this) + mockUpdateMany.mockResolvedValueOnce({ count: 0 }); // locked step + mockUpdateMany.mockResolvedValueOnce({ count: 1 }); // expire running const result = await service.reap(); @@ -51,14 +51,18 @@ describe('JobReaperService', () => { }); it('retries expired jobs with remaining retries', async () => { - mockUpdateMany.mockResolvedValueOnce({ count: 0 }); // locked - mockFindMany.mockResolvedValueOnce([]); // running - mockFindMany.mockResolvedValueOnce([ - { id: 'j1', retryCount: 0, maxRetryCount: 3 }, - { id: 'j2', retryCount: 3, maxRetryCount: 3 }, - ]); - mockUpdateMany.mockResolvedValueOnce({ count: 1 }); // retry j1 - mockUpdateMany.mockResolvedValueOnce({ count: 1 }); // fail j2 + // Running loop: empty on first call → breaks + // Expired batch 1: has 2 expired jobs + mockFindMany + .mockResolvedValueOnce([]) // running batch 1 (empty → skip) + .mockResolvedValueOnce([ + { id: 'j1', retryCount: 0, maxRetryCount: 3 }, + { id: 'j2', retryCount: 3, maxRetryCount: 3 }, + ]); + mockUpdateMany + .mockResolvedValueOnce({ count: 0 }) // locked + .mockResolvedValueOnce({ count: 1 }) // retry j1 + .mockResolvedValueOnce({ count: 1 }); // fail j2 const result = await service.reap(); @@ -67,11 +71,23 @@ describe('JobReaperService', () => { }); it('handles no stuck jobs gracefully', async () => { - mockUpdateMany.mockResolvedValue({ count: 0 }); - mockFindMany.mockResolvedValue([]); - const result = await service.reap(); - expect(result).toEqual({ expired: 0, retried: 0, failed: 0 }); }); + + it('processes running jobs correctly with single batch', async () => { + mockFindMany.mockReset().mockResolvedValue([]); + mockUpdateMany.mockReset().mockResolvedValue({ count: 0 }); + const now = Date.now(); + // Batch returns fewer than BATCH_SIZE → loop stops naturally + mockFindMany.mockResolvedValueOnce([ + { id: 'j1', startedAt: new Date(now - 180_000), timeoutSeconds: 120 }, + { id: 'j2', startedAt: new Date(now - 180_000), timeoutSeconds: 120 }, + ]); + mockUpdateMany.mockResolvedValueOnce({ count: 0 }); // locked step + mockUpdateMany.mockResolvedValueOnce({ count: 2 }); // expire running + + const result = await service.reap(); + expect(result.expired).toBe(2); + }); }); diff --git a/src/modules/ai-runtime/job-reaper.service.ts b/src/modules/ai-runtime/job-reaper.service.ts index 7418863..861643a 100644 --- a/src/modules/ai-runtime/job-reaper.service.ts +++ b/src/modules/ai-runtime/job-reaper.service.ts @@ -2,6 +2,7 @@ import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/commo import { PrismaService } from '../../infrastructure/database/prisma.service'; const REAP_INTERVAL_MS = 30_000; // every 30 seconds +const REAP_BATCH_SIZE = 500; @Injectable() export class JobReaperService implements OnModuleInit, OnModuleDestroy { @@ -30,59 +31,76 @@ export class JobReaperService implements OnModuleInit, OnModuleDestroy { }); // 2. Expire stuck running jobs (startedAt + timeoutSeconds < now) - // We can't do arithmetic in Prisma where, so fetch IDs and update - const stuckRunning = await this.prisma.aiRuntimeJob.findMany({ - where: { status: 'running' }, - select: { id: true, startedAt: true, timeoutSeconds: true, retryCount: true, maxRetryCount: true }, - take: 500, - }); - const stuckIds = stuckRunning - .filter(j => j.startedAt && (now.getTime() - j.startedAt.getTime()) > j.timeoutSeconds * 1000) - .map(j => j.id); - + // We can't do arithmetic in Prisma where, so fetch IDs in batches and filter in memory let expiredRunning = 0; - if (stuckIds.length > 0) { - const result = await this.prisma.aiRuntimeJob.updateMany({ - where: { id: { in: stuckIds }, status: 'running' }, - data: { status: 'expired' }, + let cursor: string | undefined; + let hasMore = true; + while (hasMore) { + const batch = await this.prisma.aiRuntimeJob.findMany({ + where: { status: 'running' }, + select: { id: true, startedAt: true, timeoutSeconds: true }, + take: REAP_BATCH_SIZE, + ...(cursor ? { cursor: { id: cursor }, skip: 1 } : {}), + orderBy: { id: 'asc' }, }); - expiredRunning = result.count; + if (batch.length === 0) { hasMore = false; break; } + + const stuckIds = batch + .filter(j => j.startedAt && (now.getTime() - j.startedAt.getTime()) > j.timeoutSeconds * 1000) + .map(j => j.id); + + if (stuckIds.length > 0) { + const result = await this.prisma.aiRuntimeJob.updateMany({ + where: { id: { in: stuckIds }, status: 'running' }, + data: { status: 'expired' }, + }); + expiredRunning += result.count; + } + cursor = batch[batch.length - 1].id; + if (batch.length < REAP_BATCH_SIZE) hasMore = false; } // 3. Retry expired jobs where retryCount < maxRetryCount - // Prisma doesn't support comparing two columns in where, so fetch and batch - const expiredJobs = await this.prisma.aiRuntimeJob.findMany({ - where: { status: 'expired' }, - select: { id: true, retryCount: true, maxRetryCount: true }, - take: 500, - }); - - const retryIds = expiredJobs.filter(j => j.retryCount < j.maxRetryCount).map(j => j.id); - const failIds = expiredJobs.filter(j => j.retryCount >= j.maxRetryCount).map(j => j.id); - let retried = 0; let failed = 0; - - if (retryIds.length > 0) { - const result = await this.prisma.aiRuntimeJob.updateMany({ - where: { id: { in: retryIds }, status: 'expired' }, - data: { - status: 'pending', - lockedBy: null, - lockedAt: null, - lockUntil: null, - retryCount: { increment: 1 }, - }, + cursor = undefined; + hasMore = true; + while (hasMore) { + const batch = await this.prisma.aiRuntimeJob.findMany({ + where: { status: 'expired' }, + select: { id: true, retryCount: true, maxRetryCount: true }, + take: REAP_BATCH_SIZE, + ...(cursor ? { cursor: { id: cursor }, skip: 1 } : {}), + orderBy: { id: 'asc' }, }); - retried = result.count; - } + if (batch.length === 0) { hasMore = false; break; } - if (failIds.length > 0) { - const result = await this.prisma.aiRuntimeJob.updateMany({ - where: { id: { in: failIds }, status: 'expired' }, - data: { status: 'failed', finishedAt: new Date() }, - }); - failed = result.count; + const retryIds = batch.filter(j => j.retryCount < j.maxRetryCount).map(j => j.id); + const failIds = batch.filter(j => j.retryCount >= j.maxRetryCount).map(j => j.id); + + if (retryIds.length > 0) { + const result = await this.prisma.aiRuntimeJob.updateMany({ + where: { id: { in: retryIds }, status: 'expired' }, + data: { + status: 'pending', + lockedBy: null, + lockedAt: null, + lockUntil: null, + retryCount: { increment: 1 }, + }, + }); + retried += result.count; + } + + if (failIds.length > 0) { + const result = await this.prisma.aiRuntimeJob.updateMany({ + where: { id: { in: failIds }, status: 'expired' }, + data: { status: 'failed', finishedAt: new Date() }, + }); + failed += result.count; + } + cursor = batch[batch.length - 1].id; + if (batch.length < REAP_BATCH_SIZE) hasMore = false; } const total = expiredLocks.count + expiredRunning + retried + failed;