Files
WeKnora/docreader/parser/doc_parser.py
wizardchen 959eba2136 fix(doc_parser): enhance DOC to DOCX conversion reliability
- Implemented a retry mechanism for DOC to DOCX conversion to handle concurrent `soffice` invocations, ensuring each attempt uses a dedicated user profile directory.
- Added logging for each conversion attempt, including success and failure messages, to improve visibility into the conversion process.
- Adjusted the handling of temporary directories for both conversion output and user profiles, enhancing robustness against conversion failures.
2026-06-01 20:50:02 +08:00

363 lines
13 KiB
Python

import logging
import os
import subprocess
import time
import uuid
from pathlib import Path
from typing import List, Optional
import textract
from docreader.config import CONFIG
from docreader.models.document import Document
from docreader.parser.docx2_parser import Docx2Parser
from docreader.utils.tempfile import TempDirContext, TempFileContext
logger = logging.getLogger(__name__)
class SandboxExecutor:
"""Sandbox executor for running commands with proxy configuration"""
def __init__(self, proxy: Optional[str] = None, default_timeout: int = 60):
"""Initialize sandbox executor with configuration
Args:
proxy: Proxy URL to use for network access. If None, will use WEB_PROXY environment variable
default_timeout: Default timeout in seconds for command execution
"""
# Get proxy from parameter, environment variable, or use default blocking proxy
# Use 'or None' to convert empty string to None, then apply default value
self.proxy = proxy or CONFIG.external_https_proxy or "http://128.0.0.1:1"
self.default_timeout = default_timeout
def execute_in_sandbox(self, cmd: List[str]) -> tuple:
"""Execute command in sandbox with proxy configuration
Args:
cmd: Command to execute
Returns:
Tuple of (stdout, stderr, returncode)
"""
# Try different sandbox methods in order of preference
sandbox_methods = [
self._execute_with_proxy,
]
for method in sandbox_methods:
try:
return method(cmd)
except Exception as e:
logger.warning(f"Sandbox method {method.__name__} failed: {e}")
continue
raise RuntimeError("All sandbox methods failed")
def _execute_with_proxy(self, cmd: List[str]) -> tuple:
"""Execute command with proxy configuration
Args:
cmd: Command to execute
Returns:
Tuple of (stdout, stderr, returncode)
"""
# Set up environment with proxy configuration
env = os.environ.copy()
if self.proxy:
env["http_proxy"] = self.proxy
env["https_proxy"] = self.proxy
env["HTTP_PROXY"] = self.proxy
env["HTTPS_PROXY"] = self.proxy
logger.info(f"Executing command with proxy: {' '.join(cmd)}")
if self.proxy:
logger.info(f"Using proxy: {self.proxy}")
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
)
try:
stdout, stderr = process.communicate(timeout=self.default_timeout)
return stdout, stderr, process.returncode
except subprocess.TimeoutExpired:
process.kill()
raise RuntimeError(
f"Command execution timeout after {self.default_timeout} seconds"
)
logger = logging.getLogger(__name__)
class DocParser(Docx2Parser):
"""DOC document parser"""
def __init__(self, *args, **kwargs):
"""Initialize DOC parser with sandbox executor"""
super().__init__(*args, **kwargs)
self.sandbox_executor = SandboxExecutor()
def parse_into_text(self, content: bytes) -> Document:
logger.info(f"Parsing DOC document, content size: {len(content)} bytes")
handle_chain = [
# 1. Try to convert to docx format to extract images
self._parse_with_docx,
# 2. If image extraction is not needed or conversion failed,
# try using antiword to extract text
self._parse_with_antiword,
# 3. If antiword extraction fails, use textract
# NOTE: _parse_with_textract is disabled due to SSRF vulnerability
# self._parse_with_textract,
]
# Save byte content as a temporary file
with TempFileContext(content, ".doc") as temp_file_path:
for handle in handle_chain:
try:
document = handle(temp_file_path)
if document:
return document
except Exception as e:
logger.warning(f"Failed to parse DOC with {handle.__name__} {e}")
return Document(content="")
def _parse_with_docx(self, temp_file_path: str) -> Document:
logger.info("Multimodal enabled, attempting to extract images from DOC")
docx_content = self._try_convert_doc_to_docx(temp_file_path)
if not docx_content:
raise RuntimeError("Failed to convert DOC to DOCX")
logger.info("Successfully converted DOC to DOCX, using DocxParser")
# Use existing DocxParser to parse the converted docx
document = super(Docx2Parser, self).parse_into_text(docx_content)
logger.info(f"Extracted {len(document.content)} characters using DocxParser")
return document
def _parse_with_antiword(self, temp_file_path: str) -> Document:
logger.info("Attempting to parse DOC file with antiword")
# Check if antiword is installed
antiword_path = self._try_find_antiword()
if not antiword_path:
raise RuntimeError("antiword not found in PATH")
# Use antiword to extract text directly in sandbox
cmd = [antiword_path, temp_file_path]
logger.info("Executing antiword in sandbox with proxy configuration")
stdout, stderr, returncode = self.sandbox_executor.execute_in_sandbox(cmd)
if returncode != 0:
raise RuntimeError(
f"antiword extraction failed: {stderr.decode('utf-8', errors='ignore')}"
)
text = stdout.decode("utf-8", errors="ignore")
logger.info(f"Successfully extracted {len(text)} characters using antiword")
return Document(content=text)
def _parse_with_textract(self, temp_file_path: str) -> Document:
logger.info(f"Parsing DOC file with textract: {temp_file_path}")
text = textract.process(temp_file_path, method="antiword").decode("utf-8")
logger.info(f"Successfully extracted {len(text)} bytes of DOC using textract")
return Document(content=str(text))
def _try_convert_doc_to_docx(self, doc_path: str) -> Optional[bytes]:
"""Convert DOC file to DOCX format
Uses LibreOffice/OpenOffice for conversion
Args:
doc_path: DOC file path
Returns:
Byte stream of DOCX file content, or None if conversion fails
"""
logger.info(f"Converting DOC to DOCX: {doc_path}")
# Check if LibreOffice or OpenOffice is installed
soffice_path = self._try_find_soffice()
if not soffice_path:
return None
# Execute conversion command
logger.info(f"Using {soffice_path} to convert DOC to DOCX")
# LibreOffice shares a single user profile by default, so concurrent
# `soffice` invocations contend for the same profile lock and the loser
# silently fails to convert. Give each attempt a dedicated profile dir
# and retry a few times so concurrent requests don't fall back to the
# lower-fidelity antiword path.
max_attempts = 3
for attempt in range(1, max_attempts + 1):
# Create a temporary directory to store the converted file
with TempDirContext() as temp_dir, TempDirContext() as profile_dir:
user_installation = Path(profile_dir).as_uri()
cmd = [
soffice_path,
"--headless",
f"-env:UserInstallation={user_installation}",
"--convert-to",
"docx",
"--outdir",
temp_dir,
doc_path,
]
logger.info(
f"Running command in sandbox (attempt {attempt}/{max_attempts}): "
f"{' '.join(cmd)}"
)
# Execute in sandbox with proxy configuration
stdout, stderr, returncode = self.sandbox_executor.execute_in_sandbox(
cmd
)
if returncode != 0:
logger.warning(
f"Error converting DOC to DOCX (attempt {attempt}/"
f"{max_attempts}): {stderr.decode('utf-8', errors='ignore')}"
)
if attempt < max_attempts:
time.sleep(0.5 * attempt)
continue
return None
# Find the converted file
docx_file = [
file for file in os.listdir(temp_dir) if file.endswith(".docx")
]
logger.info(
f"Found {len(docx_file)} DOCX file(s) in temporary directory"
)
for file in docx_file:
converted_file = os.path.join(temp_dir, file)
logger.info(f"Found converted file: {converted_file}")
# Read the converted file content
with open(converted_file, "rb") as f:
docx_content = f.read()
logger.info(
f"Successfully read DOCX file, size: {len(docx_content)}"
)
return docx_content
# Conversion reported success but produced no docx; retry.
logger.warning(
f"No DOCX produced despite success (attempt {attempt}/"
f"{max_attempts})"
)
if attempt < max_attempts:
time.sleep(0.5 * attempt)
return None
def _try_find_executable_path(
self,
executable_name: str,
possible_path: List[str] = [],
environment_variable: List[str] = [],
) -> Optional[str]:
"""Find executable path
Args:
executable_name: Executable name
possible_path: List of possible paths
environment_variable: List of environment variables to check
Returns:
Executable path, or None if not found
"""
# Common executable paths
paths: List[str] = []
paths.extend(possible_path)
paths.extend(os.environ.get(env_var, "") for env_var in environment_variable)
paths = list(set(paths))
# Check if path is set in environment variable
for path in paths:
if os.path.exists(path):
logger.info(f"Found {executable_name} at {path}")
return path
# Try to find in PATH
result = subprocess.run(
["which", executable_name], capture_output=True, text=True
)
if result.returncode == 0 and result.stdout.strip():
path = result.stdout.strip()
logger.info(f"Found {executable_name} at {path}")
return path
logger.warning(f"Failed to find {executable_name}")
return None
def _try_find_soffice(self) -> Optional[str]:
"""Find LibreOffice/OpenOffice executable path
Returns:
Executable path, or None if not found
"""
# Common LibreOffice/OpenOffice executable paths
possible_paths = [
# Linux
"/usr/bin/soffice",
"/usr/lib/libreoffice/program/soffice",
"/opt/libreoffice25.2/program/soffice",
# macOS
"/Applications/LibreOffice.app/Contents/MacOS/soffice",
# Windows
"C:\\Program Files\\LibreOffice\\program\\soffice.exe",
"C:\\Program Files (x86)\\LibreOffice\\program\\soffice.exe",
]
return self._try_find_executable_path(
executable_name="soffice",
possible_path=possible_paths,
environment_variable=["LIBREOFFICE_PATH"],
)
def _try_find_antiword(self) -> Optional[str]:
"""Find antiword executable path
Returns:
Executable path, or None if not found
"""
# Common antiword executable paths
possible_paths = [
# Linux/macOS
"/usr/bin/antiword",
"/usr/local/bin/antiword",
# Windows
"C:\\Program Files\\Antiword\\antiword.exe",
"C:\\Program Files (x86)\\Antiword\\antiword.exe",
]
return self._try_find_executable_path(
executable_name="antiword",
possible_path=possible_paths,
environment_variable=["ANTIWORD_PATH"],
)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
file_name = "/path/to/your/test.doc"
logger.info(f"Processing file: {file_name}")
doc_parser = DocParser(
file_name=file_name,
enable_multimodal=True,
chunk_size=512,
chunk_overlap=60,
)
with open(file_name, "rb") as f:
content = f.read()
document = doc_parser.parse_into_text(content)
logger.info(f"Processing complete, extracted text length: {len(document.content)}")
logger.info(f"Sample text: {document.content[:200]}...")