diff --git a/src/modules/document-import/document-import.service.ts b/src/modules/document-import/document-import.service.ts index e75bdc2..6711782 100644 --- a/src/modules/document-import/document-import.service.ts +++ b/src/modules/document-import/document-import.service.ts @@ -22,34 +22,37 @@ export class DocumentImportService { }) { this.logger.log(`createImport called: userId=${dto.userId}, kbId=${dto.knowledgeBaseId}, fileName=${dto.fileName}, sourceType=${dto.sourceType}`); try { - const lockKey = `lock:document-import:${dto.fileName || Date.now()}`; - this.logger.log(`Attempting Redis lock: ${lockKey}`); - const lockToken = await this.redis.lock(lockKey, 1800); + // Use unique lock key per upload attempt to avoid blocking retries + const lockKey = `lock:document-import:${dto.fileName || 'file'}:${Date.now()}`; + this.logger.log(`Acquiring lock: ${lockKey}`); + const lockToken = await this.redis.lock(lockKey, 300); // 5 min TTL, enough for one upload if (!lockToken) { throw new Error('相同文件正在导入中,请稍候'); } - this.logger.log(`Creating DB record...`); - const job = await this.repository.create(dto); - this.logger.log(`DB record created: ${job.id}`); + try { + this.logger.log(`Creating DB record...`); + const job = await this.repository.create(dto); + this.logger.log(`DB record created: ${job.id}`); - await this.redis.set(`job:document-import:${job.id}:status`, 'pending', 86400); - await this.redis.set(`job:document-import:${job.id}:progress`, '0', 86400); - await this.redis.set(`job:document-import:${job.id}:message`, '任务已加入队列', 86400); + await this.redis.set(`job:document-import:${job.id}:status`, 'pending', 86400); + await this.redis.set(`job:document-import:${job.id}:progress`, '0', 86400); + await this.redis.set(`job:document-import:${job.id}:message`, '任务已加入队列', 86400); - this.logger.log(`Enqueuing job...`); - await this.queue.add('document-import', { - importId: job.id, - userId: dto.userId || 'anonymous', - knowledgeBaseId: dto.knowledgeBaseId, - rawText: dto.rawText, - fileName: dto.fileName, - }); - this.logger.log(`Job enqueued: ${job.id}`); + this.logger.log(`Enqueuing job...`); + await this.queue.add('document-import', { + importId: job.id, + userId: dto.userId || 'anonymous', + knowledgeBaseId: dto.knowledgeBaseId, + rawText: dto.rawText, + fileName: dto.fileName, + }); + this.logger.log(`Job enqueued: ${job.id}`); - await this.redis.unlock(lockKey, lockToken); - - return { jobId: job.id, status: 'queued' }; + return { jobId: job.id, status: 'queued' }; + } finally { + await this.redis.unlock(lockKey, lockToken); + } } catch (err: any) { this.logger.error(`createImport failed: ${err.message}`, err.stack); throw err;