Some checks failed
Deploy API Server / build-and-deploy (push) Has been cancelled
定义 5 种 Job 类型、7 种状态、完整状态流转图、数据库字段、防并发锁定 机制、retryable/non-retryable 分类、超时释放、幂等规则、Poll 调度策略。 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
8.2 KiB
8.2 KiB
AI Job 状态机与任务调度设计
1. Job 类型
| jobType | 说明 | 输入 | 输出 |
|---|---|---|---|
learning_state_analysis |
学习状态分析 | Snapshot | AiLearningAnalysis |
weak_point_analysis |
薄弱点分析 | Snapshot | WeakPointCandidate[] |
next_action_planning |
下一步建议 | Snapshot | NextActionRecommendation[] |
quiz_generation |
题目候选生成 | Snapshot + params | QuizQuestion[] |
flashcard_generation |
卡片候选生成 | Snapshot + params | Flashcard[] |
2. 状态定义
| 状态 | 含义 | 进入条件 | 退出条件 |
|---|---|---|---|
pending |
等待消费 | API 创建 or retryable fail 回退 | 被 Runtime lock |
locked |
已被 Runtime 获取 | Runtime POST /lock 成功 | lockUntil 超时 → expired / Runtime 开始执行 |
running |
正在执行 | Runtime 开始执行(heartbeat 或隐式) | 执行完成 → succeeded/failed |
succeeded |
执行成功 | API POST /result 处理完毕 | 终态 |
failed |
执行失败 | non-retryable 错误 or 超过 maxRetryCount | 终态(除非 Admin 重跑) |
cancelled |
已取消 | 用户/Admin 取消 pending job | 终态 |
expired |
超时 | lockUntil 超时未 heartbeat or 执行超时 | 可被 Runtime 重新 poll(retryable) |
3. 状态流转
┌──────────┐
│ pending │ ←──────────────────────┐
└────┬─────┘ │
│ │
POST /lock │
│ │
┌────▼─────┐ │
┌───→│ locked │──→ expired ───────────┘
│ └────┬─────┘ (lockUntil 超时)
│ │
│ heartbeat
│ │
│ ┌────▼─────┐
│ │ running │──→ expired ───────────┘
│ └────┬─────┘ (timeoutSeconds 超时)
│ │
┌───────┼─────────┼──────────┐
│ │ │ │
succeeded failed failed cancelled
(result) (non- (retry- (用户/Admin
retry) able 取消pending)
│
└──→ pending (retryCount++)
4. 数据库字段
model AiRuntimeJob {
id String @id @default(cuid())
userId String
jobType String // learning_state_analysis | weak_point_analysis | next_action_planning | quiz_generation | flashcard_generation
targetType String // user | material | knowledge_point
targetId String
snapshotId String?
status String @default("pending") // pending | locked | running | succeeded | failed | cancelled | expired
priority Int @default(0) // 0=最高
idempotencyKey String? @unique
apiKeyMode String @default("platform_key") // platform_key | user_deepseek_key
credentialId String?
modelProvider String @default("deepseek")
modelName String @default("deepseek-chat")
promptVersion String?
outputSchemaVersion String?
attemptNo Int @default(0)
retriedFromJobId String?
// 锁定
lockedBy String? // runtimeInstanceId
lockedAt DateTime?
lockUntil DateTime?
// 时间
startedAt DateTime?
finishedAt DateTime?
// 重试
retryCount Int @default(0)
maxRetryCount Int @default(3)
timeoutSeconds Int @default(120)
// 错误
errorCode String?
errorMessage String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
result AiRuntimeResult?
@@index([status])
@@index([jobType])
@@index([userId])
@@index([targetType, targetId])
@@index([lockUntil])
}
5. 锁定机制
5.1 Lock 流程
Runtime POST /internal/runtime/jobs/{jobId}/lock
→ API 检查 job.status === pending
→ API 检查 job.lockUntil < now (未被其他 Runtime 持有)
→ API 设置 lockedBy, lockedAt, lockUntil=now+60s, status=locked
→ 返回 lockUntil
5.2 防并发
基于数据库行级写操作保证只有一个 Runtime 锁定成功:
UPDATE ... WHERE status='pending' AND (lockUntil IS NULL OR lockUntil < NOW())- 影响行数 = 0 则锁定失败(JOB_ALREADY_LOCKED)
5.3 Heartbeat
Runtime POST /internal/runtime/jobs/{jobId}/heartbeat
→ API 检查 lockedBy === runtimeInstanceId
→ API 延长 lockUntil = now + 60s
→ 204 No Content
5.4 超时释放
lockUntil 超时后:
- 原 Runtime 的 lock 失效
- job 状态变为
expired - 其他 Runtime poll 时可重新获取(retryable)
- 如 retryCount < maxRetryCount,job 自动回到
pending
6. 重试策略
6.1 重试触发
| 场景 | 处理 |
|---|---|
| Runtime 提交 retryable fail | job → pending, retryCount++ |
| Runtime lock 后无 heartbeat 超时 | job → expired → pending, retryCount++ |
| Runtime 执行超时 | job → expired → pending, retryCount++ |
6.2 重试上限
retryCount >= maxRetryCount:job → failed(终态)maxRetryCount默认 3,可配置- Admin 可手动重跑 failed job(创建新 job,记录 retriedFromJobId)
6.3 retryable vs non-retryable
| 错误类型 | retryable | 示例 |
|---|---|---|
| MODEL_TIMEOUT | true | DeepSeek 超时 |
| MODEL_RATE_LIMIT | true | 限流 |
| NETWORK_ERROR | true | 网络中断 |
| TEMPORARY_PROVIDER_ERROR | true | 5xx |
| INVALID_SNAPSHOT | false | 快照结构错 |
| INVALID_SCHEMA | false | 输出 schema 错 |
| INVALID_CREDENTIAL | false | Key 无效 |
| JOB_TIMEOUT | true | 执行超时 |
7. 超时
| 超时类型 | 默认值 | 说明 |
|---|---|---|
| lockUntil | 60s | lock 后未 heartbeat 自动释放 |
| timeoutSeconds | 120s | 总执行超时 |
| heartbeat 间隔 | Runtime 自行决定 | 建议 15-30s |
8. 幂等
8.1 Job 创建幂等
idempotencyKey 唯一索引:相同 userId + jobType + targetType + targetId + idempotencyKey 的 job 不重复创建。如果没有传 idempotencyKey,则允许重复创建。
8.2 Result 提交幂等
resultIdempotencyKey = jobId + ":" + attemptNo + ":" + outputHash
- 相同 key 重复提交:返回 200(幂等,不重复落库)
- 已有 succeeded result 但 outputHash 不同:返回 409 RESULT_ALREADY_EXISTS
8.3 Admin 重跑
Admin 重跑创建新 job,记录 retriedFromJobId,不复用旧 job。
9. Cancelled / Expired
| 状态 | 能否被 Runtime 消费 | 处理 |
|---|---|---|
| cancelled | 否 | API 直接设置,不进入 poll 结果 |
| expired | 是(如 retryable) | lockUntil 超时后自动变为 expired,retryable 时回到 pending |
用户关闭 AI 授权时:
- 所有 pending job → cancelled
- 所有 running job → cancelRequested(Runtime 下次 heartbeat 获知)
10. 任务调度
Poll 规则
POST /internal/runtime/jobs/poll
→ 返回 status=pending 的 job
→ 按 priority ASC, createdAt ASC 排序
→ 只返回 Runtime capabilities 支持的 jobType
→ limit 最大 50
无可用 job 时
返回空数组。Runtime 按 pollIntervalMs 等待后重试。
11. 验收清单
- 输出 Job 状态机设计文档
- 明确每个状态的进入条件和退出条件
- 明确 Runtime 如何锁定任务(DB 行级写 + lockUntil)
- 明确 lockUntil 超时后如何释放
- 明确 retryCount / maxRetryCount 规则
- 明确 idempotencyKey 防重复
- 明确 Admin 可重跑 failed job
- 明确 cancelled / expired 不应被 Runtime 再次消费