138 lines
4.6 KiB
Python
138 lines
4.6 KiB
Python
"""文档解析:PDF / DOCX / TXT / MD / CSV / XLSX"""
|
||
|
||
import os
|
||
import io
|
||
import base64
|
||
import httpx
|
||
from config import SILICONFLOW_API_KEY, SILICONFLOW_BASE_URL
|
||
|
||
|
||
async def download_file(url: str, local_path: str) -> str:
|
||
"""从 COS 预签名 URL 下载文件到本地"""
|
||
async with httpx.AsyncClient(timeout=120, follow_redirects=True) as client:
|
||
resp = await client.get(url)
|
||
resp.raise_for_status()
|
||
os.makedirs(os.path.dirname(local_path), exist_ok=True)
|
||
with open(local_path, "wb") as f:
|
||
f.write(resp.content)
|
||
return local_path
|
||
|
||
|
||
def parse_txt(file_path: str) -> str:
|
||
with open(file_path, "r", encoding="utf-8", errors="replace") as f:
|
||
return f.read()
|
||
|
||
|
||
def parse_markdown(file_path: str) -> str:
|
||
return parse_txt(file_path)
|
||
|
||
|
||
def parse_docx(file_path: str) -> str:
|
||
from docx import Document
|
||
doc = Document(file_path)
|
||
return "\n\n".join(p.text for p in doc.paragraphs if p.text.strip())
|
||
|
||
|
||
def parse_pdf_text(file_path: str) -> str:
|
||
"""用 PyMuPDF 提取 PDF 文本层"""
|
||
import fitz
|
||
doc = fitz.open(file_path)
|
||
pages = []
|
||
for page in doc:
|
||
text = page.get_text()
|
||
if text.strip():
|
||
pages.append(text)
|
||
doc.close()
|
||
return "\n\n".join(pages)
|
||
|
||
|
||
def pdf_needs_ocr(file_path: str) -> bool:
|
||
"""判断 PDF 是否需要 OCR(文本层为空或极少文字)"""
|
||
import fitz
|
||
doc = fitz.open(file_path)
|
||
total_len = sum(len(page.get_text().strip()) for page in doc)
|
||
doc.close()
|
||
# 平均每页少于 50 字符 → 扫描件
|
||
page_count = max(doc.page_count if hasattr(doc, 'page_count') else len(doc), 1)
|
||
return (total_len / page_count) < 50
|
||
|
||
|
||
async def ocr_with_siliconflow(image_bytes: bytes) -> str:
|
||
"""用硅基流动多模态模型做 OCR / 图文识别"""
|
||
b64 = base64.b64encode(image_bytes).decode()
|
||
async with httpx.AsyncClient(timeout=60) as client:
|
||
resp = await client.post(
|
||
f"{SILICONFLOW_BASE_URL}/chat/completions",
|
||
headers={"Authorization": f"Bearer {SILICONFLOW_API_KEY}"},
|
||
json={
|
||
"model": "Qwen/Qwen3-VL-32B-Instruct",
|
||
"messages": [{
|
||
"role": "user",
|
||
"content": [
|
||
{"type": "text", "text": "请识别并提取这张图片中的所有文字内容。如果有表格,请用 Markdown 表格格式输出。不要添加任何解释。"},
|
||
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}"}},
|
||
],
|
||
}],
|
||
"max_tokens": 4096,
|
||
},
|
||
)
|
||
data = resp.json()
|
||
return data["choices"][0]["message"]["content"]
|
||
|
||
|
||
async def parse_image_with_ocr(file_path: str) -> str:
|
||
"""对图片进行 OCR"""
|
||
with open(file_path, "rb") as f:
|
||
image_bytes = f.read()
|
||
return await ocr_with_siliconflow(image_bytes)
|
||
|
||
|
||
def parse_csv(file_path: str) -> str:
|
||
import pandas as pd
|
||
df = pd.read_csv(file_path)
|
||
return df.to_markdown(index=False)
|
||
|
||
|
||
def parse_xlsx(file_path: str) -> str:
|
||
import pandas as pd
|
||
df = pd.read_excel(file_path)
|
||
return df.to_markdown(index=False)
|
||
|
||
|
||
async def parse_document(file_path: str, mime_type: str) -> str:
|
||
"""根据文件类型路由到合适的解析器"""
|
||
ext = os.path.splitext(file_path)[1].lower()
|
||
|
||
if ext in (".txt",):
|
||
return parse_txt(file_path)
|
||
elif ext in (".md", ".markdown"):
|
||
return parse_markdown(file_path)
|
||
elif ext in (".docx",):
|
||
return parse_docx(file_path)
|
||
elif ext in (".csv",):
|
||
return parse_csv(file_path)
|
||
elif ext in (".xlsx",):
|
||
return parse_xlsx(file_path)
|
||
elif ext in (".pdf",):
|
||
if pdf_needs_ocr(file_path):
|
||
# 扫描件——先尝试文本提取,空则走多模态
|
||
text = parse_pdf_text(file_path)
|
||
if len(text.strip()) < 100:
|
||
# 全扫描件,逐页 OCR
|
||
import fitz
|
||
doc = fitz.open(file_path)
|
||
results = []
|
||
for i, page in enumerate(doc):
|
||
pix = page.get_pixmap(dpi=150)
|
||
img_bytes = pix.tobytes("png")
|
||
page_text = await ocr_with_siliconflow(img_bytes)
|
||
results.append(page_text)
|
||
doc.close()
|
||
return "\n\n".join(results)
|
||
return text
|
||
return parse_pdf_text(file_path)
|
||
elif ext in (".png", ".jpg", ".jpeg", ".webp", ".heic", ".bmp"):
|
||
return await parse_image_with_ocr(file_path)
|
||
else:
|
||
raise ValueError(f"不支持的文件类型: {ext}")
|