Files
WeKnora/docreader/scripts/parse_local.py
wizardchen 7b1bb1054f feat(docreader): speed up scanned-PDF parsing, stream image results, isolate heavy async queues
Large scanned PDFs (hundreds of pages) were slow and fragile end-to-end.
This change addresses the parse, transport, and task-scheduling layers:

docreader (parse + transport):
- Parallelize per-page scanned rendering across processes (forkserver/fork),
  with serial fallback. ~4-7x faster on large scanned PDFs; pdfium is not
  thread-safe so we fan out across processes. Configurable via
  DOCREADER_PDF_RENDER_PARALLELISM.
- Add server-streaming ReadStream RPC: emit one meta frame then one frame per
  image, so documents with many page images are no longer capped by the unary
  gRPC message-size limit (a 874-page PDF produced ~193MiB of images, far over
  the 50MB cap) and memory is bounded on both ends. Unary Read is kept for
  backward compatibility; the Go production reader switches to ReadStream.

VLM:
- Make the VLM HTTP timeout configurable (VLM_HTTP_TIMEOUT_SECONDS) and raise
  the default 90s -> 180s so dense scanned-page OCR does not time out with
  "context deadline exceeded".

Async task queues:
- Isolate high-volume, model-heavy fan-out tasks into dedicated asynq queues so
  a single large document cannot saturate the shared worker pool and block
  user-facing document parsing:
    image:multimodal  -> "multimodal"
    chunk:extract     -> "graph"
    question:generation -> "question"
- Register the new queues in the server weight map and the cancel inspector's
  scanned-queue set (so cancelling a knowledge still purges its pending tasks).
2026-06-03 12:29:13 +08:00

120 lines
4.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""本地解析调试脚本:直接调用 Parser 解析本地文件,不经过 gRPC 服务。
用法(在仓库根目录 WeKnora 下执行):
PYTHONPATH=. docreader/.venv/bin/python docreader/scripts/parse_local.py <文件路径> [--engine markitdown] [--out out.md]
示例:
PYTHONPATH=. docreader/.venv/bin/python docreader/scripts/parse_local.py docreader/testdata/test.md
PYTHONPATH=. docreader/.venv/bin/python docreader/scripts/parse_local.py ~/Desktop/demo.pdf --out /tmp/demo.md
"""
import argparse
import logging
import os
import sys
import time
from docreader.parser import Parser
def main() -> int:
parser = argparse.ArgumentParser(description="解析本地文件并输出 markdown")
parser.add_argument("path", help="待解析的本地文件路径")
parser.add_argument(
"--engine",
default="",
help="解析引擎名称builtin / markitdown留空使用内置引擎",
)
parser.add_argument(
"--type",
default="",
help="文件类型(如 pdf/docx/md留空则按扩展名推断",
)
parser.add_argument(
"--out",
default="",
help="将完整 markdown 写入该文件,并把图片导出到同目录的 images/ 下",
)
parser.add_argument(
"--scanned",
action="store_true",
help="跳过 markitdown 文本抽取,直接把 PDF 每页渲染成图片(扫描件用,避免 pdfminer 卡死)",
)
parser.add_argument(
"--log-level",
default="INFO",
help="日志级别DEBUG/INFO/WARNING/ERROR",
)
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level.upper(), logging.INFO),
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
stream=sys.stderr,
)
if not os.path.isfile(args.path):
print(f"文件不存在: {args.path}", file=sys.stderr)
return 1
file_name = os.path.basename(args.path)
file_type = args.type or os.path.splitext(file_name)[1].lstrip(".")
with open(args.path, "rb") as f:
content = f.read()
started = time.monotonic()
if args.scanned:
from docreader.parser.pdf_parser import PDFScannedParser
doc = PDFScannedParser(file_name=file_name, file_type=file_type).parse_into_text(
content
)
else:
doc = Parser().parse_file(
file_name=file_name,
file_type=file_type,
content=content,
parser_engine=args.engine or None,
)
elapsed = time.monotonic() - started
print("=" * 60, file=sys.stderr)
print(f"file : {file_name}", file=sys.stderr)
print(f"type : {file_type}", file=sys.stderr)
print(f"engine : {args.engine or 'builtin'}", file=sys.stderr)
print(f"scanned : {args.scanned}", file=sys.stderr)
print(f"content_len: {len(doc.content)}", file=sys.stderr)
print(f"images : {len(doc.images)}", file=sys.stderr)
print(f"metadata : {doc.metadata}", file=sys.stderr)
print(f"elapsed : {elapsed:.2f}s", file=sys.stderr)
print("=" * 60, file=sys.stderr)
if args.out:
import base64
out_dir = os.path.dirname(os.path.abspath(args.out))
os.makedirs(out_dir, exist_ok=True)
with open(args.out, "w", encoding="utf-8") as f:
f.write(doc.content)
if doc.images:
img_root = os.path.join(out_dir, "images")
os.makedirs(img_root, exist_ok=True)
for ref_path, b64data in doc.images.items():
try:
raw = base64.b64decode(b64data)
except Exception:
raw = b64data if isinstance(b64data, bytes) else b64data.encode()
dest = os.path.join(out_dir, ref_path)
os.makedirs(os.path.dirname(dest), exist_ok=True)
with open(dest, "wb") as imgf:
imgf.write(raw)
print(f"已写入: {args.out}(图片导出到 {out_dir}/images/", file=sys.stderr)
else:
print(doc.content)
return 0
if __name__ == "__main__":
raise SystemExit(main())