fix: make import lock unique per attempt + release in finally
All checks were successful
Deploy API Server / build-and-deploy (push) Successful in 47s
All checks were successful
Deploy API Server / build-and-deploy (push) Successful in 47s
- Add timestamp to lock key to prevent blocking retries - Wrap create logic in try-finally to always release lock - Reduce lock TTL from 30min to 5min Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
433e0095ee
commit
aef8e2a416
@ -22,34 +22,37 @@ export class DocumentImportService {
|
|||||||
}) {
|
}) {
|
||||||
this.logger.log(`createImport called: userId=${dto.userId}, kbId=${dto.knowledgeBaseId}, fileName=${dto.fileName}, sourceType=${dto.sourceType}`);
|
this.logger.log(`createImport called: userId=${dto.userId}, kbId=${dto.knowledgeBaseId}, fileName=${dto.fileName}, sourceType=${dto.sourceType}`);
|
||||||
try {
|
try {
|
||||||
const lockKey = `lock:document-import:${dto.fileName || Date.now()}`;
|
// Use unique lock key per upload attempt to avoid blocking retries
|
||||||
this.logger.log(`Attempting Redis lock: ${lockKey}`);
|
const lockKey = `lock:document-import:${dto.fileName || 'file'}:${Date.now()}`;
|
||||||
const lockToken = await this.redis.lock(lockKey, 1800);
|
this.logger.log(`Acquiring lock: ${lockKey}`);
|
||||||
|
const lockToken = await this.redis.lock(lockKey, 300); // 5 min TTL, enough for one upload
|
||||||
if (!lockToken) {
|
if (!lockToken) {
|
||||||
throw new Error('相同文件正在导入中,请稍候');
|
throw new Error('相同文件正在导入中,请稍候');
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.log(`Creating DB record...`);
|
try {
|
||||||
const job = await this.repository.create(dto);
|
this.logger.log(`Creating DB record...`);
|
||||||
this.logger.log(`DB record created: ${job.id}`);
|
const job = await this.repository.create(dto);
|
||||||
|
this.logger.log(`DB record created: ${job.id}`);
|
||||||
|
|
||||||
await this.redis.set(`job:document-import:${job.id}:status`, 'pending', 86400);
|
await this.redis.set(`job:document-import:${job.id}:status`, 'pending', 86400);
|
||||||
await this.redis.set(`job:document-import:${job.id}:progress`, '0', 86400);
|
await this.redis.set(`job:document-import:${job.id}:progress`, '0', 86400);
|
||||||
await this.redis.set(`job:document-import:${job.id}:message`, '任务已加入队列', 86400);
|
await this.redis.set(`job:document-import:${job.id}:message`, '任务已加入队列', 86400);
|
||||||
|
|
||||||
this.logger.log(`Enqueuing job...`);
|
this.logger.log(`Enqueuing job...`);
|
||||||
await this.queue.add('document-import', {
|
await this.queue.add('document-import', {
|
||||||
importId: job.id,
|
importId: job.id,
|
||||||
userId: dto.userId || 'anonymous',
|
userId: dto.userId || 'anonymous',
|
||||||
knowledgeBaseId: dto.knowledgeBaseId,
|
knowledgeBaseId: dto.knowledgeBaseId,
|
||||||
rawText: dto.rawText,
|
rawText: dto.rawText,
|
||||||
fileName: dto.fileName,
|
fileName: dto.fileName,
|
||||||
});
|
});
|
||||||
this.logger.log(`Job enqueued: ${job.id}`);
|
this.logger.log(`Job enqueued: ${job.id}`);
|
||||||
|
|
||||||
await this.redis.unlock(lockKey, lockToken);
|
return { jobId: job.id, status: 'queued' };
|
||||||
|
} finally {
|
||||||
return { jobId: job.id, status: 'queued' };
|
await this.redis.unlock(lockKey, lockToken);
|
||||||
|
}
|
||||||
} catch (err: any) {
|
} catch (err: any) {
|
||||||
this.logger.error(`createImport failed: ${err.message}`, err.stack);
|
this.logger.error(`createImport failed: ${err.message}`, err.stack);
|
||||||
throw err;
|
throw err;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user