WangDL fbdae9078f
Some checks failed
Deploy API Server / build-and-deploy (push) Failing after 22s
feat: Python RAG Worker + NestJS 内部 API(文档解析/切片/embedding/Qdrant/候选生成)
2026-05-19 22:35:12 +08:00

138 lines
4.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""文档解析PDF / DOCX / TXT / MD / CSV / XLSX"""
import os
import io
import base64
import httpx
from config import SILICONFLOW_API_KEY, SILICONFLOW_BASE_URL
async def download_file(url: str, local_path: str) -> str:
"""从 COS 预签名 URL 下载文件到本地"""
async with httpx.AsyncClient(timeout=120, follow_redirects=True) as client:
resp = await client.get(url)
resp.raise_for_status()
os.makedirs(os.path.dirname(local_path), exist_ok=True)
with open(local_path, "wb") as f:
f.write(resp.content)
return local_path
def parse_txt(file_path: str) -> str:
with open(file_path, "r", encoding="utf-8", errors="replace") as f:
return f.read()
def parse_markdown(file_path: str) -> str:
return parse_txt(file_path)
def parse_docx(file_path: str) -> str:
from docx import Document
doc = Document(file_path)
return "\n\n".join(p.text for p in doc.paragraphs if p.text.strip())
def parse_pdf_text(file_path: str) -> str:
"""用 PyMuPDF 提取 PDF 文本层"""
import fitz
doc = fitz.open(file_path)
pages = []
for page in doc:
text = page.get_text()
if text.strip():
pages.append(text)
doc.close()
return "\n\n".join(pages)
def pdf_needs_ocr(file_path: str) -> bool:
"""判断 PDF 是否需要 OCR文本层为空或极少文字"""
import fitz
doc = fitz.open(file_path)
total_len = sum(len(page.get_text().strip()) for page in doc)
doc.close()
# 平均每页少于 50 字符 → 扫描件
page_count = max(doc.page_count if hasattr(doc, 'page_count') else len(doc), 1)
return (total_len / page_count) < 50
async def ocr_with_siliconflow(image_bytes: bytes) -> str:
"""用硅基流动多模态模型做 OCR / 图文识别"""
b64 = base64.b64encode(image_bytes).decode()
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.post(
f"{SILICONFLOW_BASE_URL}/chat/completions",
headers={"Authorization": f"Bearer {SILICONFLOW_API_KEY}"},
json={
"model": "Qwen/Qwen3-VL-32B-Instruct",
"messages": [{
"role": "user",
"content": [
{"type": "text", "text": "请识别并提取这张图片中的所有文字内容。如果有表格,请用 Markdown 表格格式输出。不要添加任何解释。"},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}"}},
],
}],
"max_tokens": 4096,
},
)
data = resp.json()
return data["choices"][0]["message"]["content"]
async def parse_image_with_ocr(file_path: str) -> str:
"""对图片进行 OCR"""
with open(file_path, "rb") as f:
image_bytes = f.read()
return await ocr_with_siliconflow(image_bytes)
def parse_csv(file_path: str) -> str:
import pandas as pd
df = pd.read_csv(file_path)
return df.to_markdown(index=False)
def parse_xlsx(file_path: str) -> str:
import pandas as pd
df = pd.read_excel(file_path)
return df.to_markdown(index=False)
async def parse_document(file_path: str, mime_type: str) -> str:
"""根据文件类型路由到合适的解析器"""
ext = os.path.splitext(file_path)[1].lower()
if ext in (".txt",):
return parse_txt(file_path)
elif ext in (".md", ".markdown"):
return parse_markdown(file_path)
elif ext in (".docx",):
return parse_docx(file_path)
elif ext in (".csv",):
return parse_csv(file_path)
elif ext in (".xlsx",):
return parse_xlsx(file_path)
elif ext in (".pdf",):
if pdf_needs_ocr(file_path):
# 扫描件——先尝试文本提取,空则走多模态
text = parse_pdf_text(file_path)
if len(text.strip()) < 100:
# 全扫描件,逐页 OCR
import fitz
doc = fitz.open(file_path)
results = []
for i, page in enumerate(doc):
pix = page.get_pixmap(dpi=150)
img_bytes = pix.tobytes("png")
page_text = await ocr_with_siliconflow(img_bytes)
results.append(page_text)
doc.close()
return "\n\n".join(results)
return text
return parse_pdf_text(file_path)
elif ext in (".png", ".jpg", ".jpeg", ".webp", ".heic", ".bmp"):
return await parse_image_with_ocr(file_path)
else:
raise ValueError(f"不支持的文件类型: {ext}")