feat: AI Job 状态机与任务调度设计 (API-AI-002)
Some checks failed
Deploy API Server / build-and-deploy (push) Has been cancelled

定义 5 种 Job 类型、7 种状态、完整状态流转图、数据库字段、防并发锁定
机制、retryable/non-retryable 分类、超时释放、幂等规则、Poll 调度策略。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
wangdl 2026-06-11 20:35:59 +08:00
parent eea9e3e7c6
commit 045e0b2501

View File

@ -0,0 +1,233 @@
# AI Job 状态机与任务调度设计
## 1. Job 类型
| jobType | 说明 | 输入 | 输出 |
|---------|------|------|------|
| `learning_state_analysis` | 学习状态分析 | Snapshot | AiLearningAnalysis |
| `weak_point_analysis` | 薄弱点分析 | Snapshot | WeakPointCandidate[] |
| `next_action_planning` | 下一步建议 | Snapshot | NextActionRecommendation[] |
| `quiz_generation` | 题目候选生成 | Snapshot + params | QuizQuestion[] |
| `flashcard_generation` | 卡片候选生成 | Snapshot + params | Flashcard[] |
## 2. 状态定义
| 状态 | 含义 | 进入条件 | 退出条件 |
|------|------|----------|----------|
| `pending` | 等待消费 | API 创建 or retryable fail 回退 | 被 Runtime lock |
| `locked` | 已被 Runtime 获取 | Runtime POST /lock 成功 | lockUntil 超时 → expired / Runtime 开始执行 |
| `running` | 正在执行 | Runtime 开始执行heartbeat 或隐式) | 执行完成 → succeeded/failed |
| `succeeded` | 执行成功 | API POST /result 处理完毕 | 终态 |
| `failed` | 执行失败 | non-retryable 错误 or 超过 maxRetryCount | 终态(除非 Admin 重跑) |
| `cancelled` | 已取消 | 用户/Admin 取消 pending job | 终态 |
| `expired` | 超时 | lockUntil 超时未 heartbeat or 执行超时 | 可被 Runtime 重新 pollretryable |
## 3. 状态流转
```
┌──────────┐
│ pending │ ←──────────────────────┐
└────┬─────┘ │
│ │
POST /lock │
│ │
┌────▼─────┐ │
┌───→│ locked │──→ expired ───────────┘
│ └────┬─────┘ (lockUntil 超时)
│ │
│ heartbeat
│ │
│ ┌────▼─────┐
│ │ running │──→ expired ───────────┘
│ └────┬─────┘ (timeoutSeconds 超时)
│ │
┌───────┼─────────┼──────────┐
│ │ │ │
succeeded failed failed cancelled
(result) (non- (retry- (用户/Admin
retry) able 取消pending)
└──→ pending (retryCount++)
```
## 4. 数据库字段
```prisma
model AiRuntimeJob {
id String @id @default(cuid())
userId String
jobType String // learning_state_analysis | weak_point_analysis | next_action_planning | quiz_generation | flashcard_generation
targetType String // user | material | knowledge_point
targetId String
snapshotId String?
status String @default("pending") // pending | locked | running | succeeded | failed | cancelled | expired
priority Int @default(0) // 0=最高
idempotencyKey String? @unique
apiKeyMode String @default("platform_key") // platform_key | user_deepseek_key
credentialId String?
modelProvider String @default("deepseek")
modelName String @default("deepseek-chat")
promptVersion String?
outputSchemaVersion String?
attemptNo Int @default(0)
retriedFromJobId String?
// 锁定
lockedBy String? // runtimeInstanceId
lockedAt DateTime?
lockUntil DateTime?
// 时间
startedAt DateTime?
finishedAt DateTime?
// 重试
retryCount Int @default(0)
maxRetryCount Int @default(3)
timeoutSeconds Int @default(120)
// 错误
errorCode String?
errorMessage String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
result AiRuntimeResult?
@@index([status])
@@index([jobType])
@@index([userId])
@@index([targetType, targetId])
@@index([lockUntil])
}
```
## 5. 锁定机制
### 5.1 Lock 流程
```
Runtime POST /internal/runtime/jobs/{jobId}/lock
→ API 检查 job.status === pending
→ API 检查 job.lockUntil < now (未被其他 Runtime 持有)
→ API 设置 lockedBy, lockedAt, lockUntil=now+60s, status=locked
→ 返回 lockUntil
```
### 5.2 防并发
基于数据库行级写操作保证只有一个 Runtime 锁定成功:
- `UPDATE ... WHERE status='pending' AND (lockUntil IS NULL OR lockUntil < NOW())`
- 影响行数 = 0 则锁定失败JOB_ALREADY_LOCKED
### 5.3 Heartbeat
```
Runtime POST /internal/runtime/jobs/{jobId}/heartbeat
→ API 检查 lockedBy === runtimeInstanceId
→ API 延长 lockUntil = now + 60s
→ 204 No Content
```
### 5.4 超时释放
`lockUntil` 超时后:
- 原 Runtime 的 lock 失效
- job 状态变为 `expired`
- 其他 Runtime poll 时可重新获取retryable
- 如 retryCount < maxRetryCountjob 自动回到 `pending`
## 6. 重试策略
### 6.1 重试触发
| 场景 | 处理 |
|------|------|
| Runtime 提交 retryable fail | job → pending, retryCount++ |
| Runtime lock 后无 heartbeat 超时 | job → expired → pending, retryCount++ |
| Runtime 执行超时 | job → expired → pending, retryCount++ |
### 6.2 重试上限
- `retryCount >= maxRetryCount`job → failed终态
- `maxRetryCount` 默认 3可配置
- Admin 可手动重跑 failed job创建新 job记录 retriedFromJobId
### 6.3 retryable vs non-retryable
| 错误类型 | retryable | 示例 |
|---------|-----------|------|
| MODEL_TIMEOUT | true | DeepSeek 超时 |
| MODEL_RATE_LIMIT | true | 限流 |
| NETWORK_ERROR | true | 网络中断 |
| TEMPORARY_PROVIDER_ERROR | true | 5xx |
| INVALID_SNAPSHOT | false | 快照结构错 |
| INVALID_SCHEMA | false | 输出 schema 错 |
| INVALID_CREDENTIAL | false | Key 无效 |
| JOB_TIMEOUT | true | 执行超时 |
## 7. 超时
| 超时类型 | 默认值 | 说明 |
|---------|--------|------|
| lockUntil | 60s | lock 后未 heartbeat 自动释放 |
| timeoutSeconds | 120s | 总执行超时 |
| heartbeat 间隔 | Runtime 自行决定 | 建议 15-30s |
## 8. 幂等
### 8.1 Job 创建幂等
`idempotencyKey` 唯一索引:相同 `userId + jobType + targetType + targetId + idempotencyKey` 的 job 不重复创建。如果没有传 idempotencyKey则允许重复创建。
### 8.2 Result 提交幂等
```
resultIdempotencyKey = jobId + ":" + attemptNo + ":" + outputHash
```
- 相同 key 重复提交:返回 200幂等不重复落库
- 已有 succeeded result 但 outputHash 不同:返回 409 RESULT_ALREADY_EXISTS
### 8.3 Admin 重跑
Admin 重跑创建新 job记录 `retriedFromJobId`,不复用旧 job。
## 9. Cancelled / Expired
| 状态 | 能否被 Runtime 消费 | 处理 |
|------|-------------------|------|
| cancelled | 否 | API 直接设置,不进入 poll 结果 |
| expired | 是(如 retryable | lockUntil 超时后自动变为 expiredretryable 时回到 pending |
用户关闭 AI 授权时:
- 所有 pending job → cancelled
- 所有 running job → cancelRequestedRuntime 下次 heartbeat 获知)
## 10. 任务调度
### Poll 规则
```
POST /internal/runtime/jobs/poll
→ 返回 status=pending 的 job
→ 按 priority ASC, createdAt ASC 排序
→ 只返回 Runtime capabilities 支持的 jobType
→ limit 最大 50
```
### 无可用 job 时
返回空数组。Runtime 按 pollIntervalMs 等待后重试。
## 11. 验收清单
- [x] 输出 Job 状态机设计文档
- [x] 明确每个状态的进入条件和退出条件
- [x] 明确 Runtime 如何锁定任务DB 行级写 + lockUntil
- [x] 明确 lockUntil 超时后如何释放
- [x] 明确 retryCount / maxRetryCount 规则
- [x] 明确 idempotencyKey 防重复
- [x] 明确 Admin 可重跑 failed job
- [x] 明确 cancelled / expired 不应被 Runtime 再次消费