Files
WeKnora/docreader/parser/markdown_parser.py
wizardchen bbd3f6324a refactor(parser): reorganize Markdown parser and enhance gRPC document reading
- Moved the _SEPARATOR_CELL regex definition to a more appropriate location in the Markdown parser.
- Implemented a fallback mechanism in the gRPC document reader to handle cases where the ReadStream RPC is unimplemented, ensuring compatibility with older versions.
- Added a readUnary method to maintain backward compatibility with the legacy unary Read RPC.
- Improved cancellation handling in the MinerUCloud and PaddleOCR-VL readers to prevent excessive API calls during context cancellation.
2026-06-03 12:29:13 +08:00

470 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Markdown Parser Module
This module provides comprehensive Markdown parsing functionality including:
- Table formatting and standardization
- Base64 image extraction and conversion
- Image path replacement and URL generation
- Pipeline-based parsing with multiple stages
The parser uses a pipeline approach to process Markdown content through
multiple stages: table formatting -> image processing.
"""
import base64
import logging
import os
import re
import uuid
from typing import Dict, List, Match, Optional, Tuple
from docreader.models.document import Document
from docreader.parser.base_parser import BaseParser
from docreader.parser.chain_parser import PipelineParser
from docreader.utils import endecode
# Get logger object
logger = logging.getLogger(__name__)
_SEPARATOR_CELL = re.compile(r"^:?-{3,}:?$")
class MarkdownTableUtil:
"""Utility class for formatting Markdown tables.
This class standardizes Markdown table formatting by:
- Normalizing column alignment markers (e.g., :---, :---:, ---:)
- Adding consistent spacing around pipes (|)
- Preserving indentation levels
- Handling both header rows and data rows
Example:
Input: |姓名|年龄|城市|
|:---|---:|:---:|
|张三|25|北京|
Output: | 姓名 | 年龄 | 城市 |
| :--- | ---: | :---: |
| 张三 | 25 | 北京 |
"""
def __init__(self):
# Pattern to match alignment row (e.g., |:---|---:|:---:|)
self.align_pattern = re.compile(
r"^([\t ]*)\|[\t ]*[:-]+(?:[\t ]*\|[\t ]*[:-]+)*[\t ]*\|[\t ]*$",
re.MULTILINE,
)
# Pattern to match regular table rows (header or data)
self.line_pattern = re.compile(
r"^([\t ]*)\|[\t ]*[^|\r\n]*(?:[\t ]*\|[^|\r\n]*)*\|[\t ]*$",
re.MULTILINE,
)
@staticmethod
def _split_row_cells(row_line: str) -> List[str]:
"""Split a markdown table row into cells, preserving empty cells."""
inner = row_line.strip()
if not inner.startswith("|"):
return []
parts = inner.split("|")
if parts and parts[0].strip() == "":
parts = parts[1:]
if parts and parts[-1].strip() == "":
parts = parts[:-1]
return [part.strip() for part in parts]
@staticmethod
def _is_table_row(line: str) -> bool:
stripped = line.strip()
return stripped.startswith("|") and "|" in stripped[1:]
@classmethod
def _is_separator_row(cls, line: str) -> bool:
cells = cls._split_row_cells(line)
return bool(cells) and all(_SEPARATOR_CELL.match(cell) for cell in cells)
@classmethod
def _is_empty_row(cls, line: str) -> bool:
cells = cls._split_row_cells(line)
return bool(cells) and all(cell == "" for cell in cells)
@classmethod
def _separator_row_for(cls, header_line: str) -> str:
cells = cls._split_row_cells(header_line)
return "| " + " | ".join("---" for _ in cells) + " |"
@classmethod
def _normalize_table_block(cls, block: List[str]) -> List[str]:
"""Fix MarkItDown-style tables: drop bogus prefix rows, ensure GFM delimiter."""
while block and cls._is_empty_row(block[0]):
block.pop(0)
if block and cls._is_separator_row(block[0]):
block.pop(0)
# GFM/marked need "| --- |" after the first row. Headerless Word tables
# only have data rows after we strip the fake empty+separator prefix.
if len(block) >= 2 and not cls._is_separator_row(block[1]):
sep = cls._separator_row_for(block[0])
block = [block[0], sep] + block[1:]
return block
def normalize_spurious_table_prefixes(self, content: str) -> str:
"""Remove bogus empty/separator prefix rows from MarkItDown table output."""
lines = content.split("\n")
out: List[str] = []
i = 0
while i < len(lines):
line = lines[i]
if not self._is_table_row(line):
out.append(line)
i += 1
continue
block: List[str] = []
while i < len(lines) and self._is_table_row(lines[i]):
block.append(lines[i])
i += 1
out.extend(self._normalize_table_block(block))
return "\n".join(out)
def format_table(self, content: str) -> str:
"""Format all Markdown tables in the content.
Args:
content: Raw Markdown text containing tables
Returns:
Formatted Markdown text with standardized table formatting
"""
def process_align(match: Match[str]) -> str:
"""Process alignment row to standardize format."""
columns = self._split_row_cells(match.group(0))
processed = []
for col in columns:
# Preserve left alignment marker (:---)
left_colon = ":" if col.startswith(":") else ""
# Preserve right alignment marker (---:)
right_colon = ":" if col.endswith(":") else ""
processed.append(left_colon + "---" + right_colon)
# Preserve original indentation
prefix = match.group(1)
return prefix + "| " + " | ".join(processed) + " |"
def process_line(match: Match[str]) -> str:
"""Process regular table row to standardize format."""
columns = self._split_row_cells(match.group(0))
# Preserve original indentation
prefix = match.group(1)
return prefix + "| " + " | ".join(columns) + " |"
formatted_content = content
# First format regular rows (header and data)
formatted_content = self.line_pattern.sub(process_line, formatted_content)
# Then format alignment rows (must be done after to avoid conflicts)
formatted_content = self.align_pattern.sub(process_align, formatted_content)
return self.normalize_spurious_table_prefixes(formatted_content)
@staticmethod
def _self_test():
test_content = """
# 测试表格
普通文本---不会被匹配
## 表格1无前置空格
| 姓名 | 年龄 | 城市 |
| :---------- | -------: | :------ |
| 张三 | 25 | 北京 |
## 表格3前置4个空格+首尾|
| 产品 | 价格 | 库存 |
| :-------------: | ----------- | :-----------: |
| 手机 | 5999 | 100 |
"""
util = MarkdownTableUtil()
format_content = util.format_table(test_content)
print(format_content)
class MarkdownTableFormatter(BaseParser):
"""Parser for formatting Markdown tables.
This parser standardizes the formatting of all Markdown tables in the
document to ensure consistent spacing and alignment markers.
Example:
>>> formatter = MarkdownTableFormatter()
>>> content = b"|Name|Age|\n|---|---|\n|John|30|"
>>> doc = formatter.parse_into_text(content)
>>> print(doc.content)
| Name | Age |
| --- | --- |
| John | 30 |
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.table_helper = MarkdownTableUtil()
def parse_into_text(self, content: bytes) -> Document:
"""Parse and format Markdown tables.
Args:
content: Raw Markdown content as bytes
Returns:
Document with formatted table content
"""
# Decode bytes to string with automatic encoding detection
text = endecode.decode_bytes(content)
# Format all tables in the content
text = self.table_helper.format_table(text)
return Document(content=text)
class MarkdownImageUtil:
"""Utility class for handling images in Markdown.
This class provides functionality to:
- Extract base64-encoded images from Markdown
- Extract image paths from Markdown
- Replace image paths with new URLs
- Convert base64 images to binary format
Supported formats:
- Base64 embedded images: ![alt](data:image/png;base64,iVBORw0...)
- Regular image links: ![alt](path/to/image.png)
"""
def __init__(self):
# Pattern to match base64 embedded images
# Captures: (1) alt text, (2) image format, (3) base64 data
# Alt text uses .*? (non-greedy) to allow literal ] (e.g. Windows paths).
# MIME subtype uses [^;]+ to handle types with hyphens like x-emf.
self.b64_pattern = re.compile(
r"!\[(.*?)\]\(data:image/([^;]+);base64,([^\)]+)\)"
)
# Pattern to match regular image syntax (alt text allows ])
self.image_pattern = re.compile(r"!\[(.*?)\]\(([^)]+)\)")
# Pattern for replacing image paths
self.replace_pattern = re.compile(r"!\[(.*?)\]\(([^)]+)\)")
def extract_image(
self,
content: str,
path_prefix: Optional[str] = None,
replace: bool = True,
) -> Tuple[str, List[str]]:
"""Extract image paths from Markdown content.
Args:
content: Markdown text containing images
path_prefix: Optional prefix to add to image paths
replace: Whether to replace image syntax in content
Returns:
Tuple of (processed_text, list_of_image_paths)
Example:
>>> util = MarkdownImageUtil()
>>> text, images = util.extract_image("![logo](img/logo.png)")
>>> print(images)
['img/logo.png']
"""
# List to store extracted image paths
images: List[str] = []
def repl(match: Match[str]) -> str:
"""Replacement function for each image match."""
title = match.group(1) # Alt text
image_path = match.group(2) # Image path
# Add prefix if specified
if path_prefix:
image_path = f"{path_prefix}/{image_path}"
images.append(image_path)
# Keep original if replace is False
if not replace:
return match.group(0)
# Replace image path with potentially prefixed path
return f"![{title}]({image_path})"
text = self.image_pattern.sub(repl, content)
logger.debug(f"Extracted {len(images)} images from markdown")
return text, images
def extract_base64(
self,
content: str,
path_prefix: Optional[str] = None,
replace: bool = True,
) -> Tuple[str, Dict[str, bytes]]:
"""Extract and decode base64 embedded images from Markdown.
This method finds all base64-encoded images in the Markdown content,
decodes them to binary format, generates unique filenames, and
optionally replaces them with file path references.
Args:
content: Markdown text containing base64 images
path_prefix: Optional directory prefix for generated paths
replace: Whether to replace base64 syntax with file paths
Returns:
Tuple of (processed_text, dict_of_path_to_bytes)
Example:
>>> util = MarkdownImageUtil()
>>> text = "![logo](data:image/png;base64,iVBORw0KGg...)"
>>> new_text, images = util.extract_base64(text, "images")
>>> print(new_text)
![logo](images/uuid.png)
>>> print(len(images))
1
"""
# Dictionary mapping generated file paths to binary image data
images: Dict[str, bytes] = {}
def repl(match: Match[str]) -> str:
"""Replacement function for each base64 image match."""
title = match.group(1) # Alt text
img_ext = match.group(2) # Image format (png, jpg, etc.)
img_b64 = match.group(3) # Base64 encoded data
# Decode base64 string to bytes
image_byte = endecode.encode_image(img_b64, errors="ignore")
if not image_byte:
logger.error(f"Failed to decode base64 image skip it: {img_b64}")
return title # Return just the alt text if decode fails
# Generate unique filename with original extension
image_path = f"{uuid.uuid4()}.{img_ext}"
if path_prefix:
image_path = f"{path_prefix}/{image_path}"
images[image_path] = image_byte
# Keep original base64 if replace is False
if not replace:
return match.group(0)
# Replace base64 data with file path reference
return f"![{title}]({image_path})"
text = self.b64_pattern.sub(repl, content)
logger.debug(f"Extracted {len(images)} base64 images from markdown")
return text, images
def replace_path(self, content: str, images: Dict[str, str]) -> str:
"""Replace image paths in Markdown with new URLs.
This method is typically used to replace local file paths with
uploaded URLs after images have been stored.
Args:
content: Markdown text with image references
images: Mapping of old paths to new URLs
Returns:
Markdown text with updated image URLs
Example:
>>> util = MarkdownImageUtil()
>>> content = "![logo](temp/img.png)"
>>> mapping = {"temp/img.png": "https://cdn.com/img.png"}
>>> result = util.replace_path(content, mapping)
>>> print(result)
![logo](https://cdn.com/img.png)
"""
# Track which paths were actually replaced
content_replace: set = set()
def repl(match: Match[str]) -> str:
"""Replacement function for each image match."""
title = match.group(1) # Alt text
image_path = match.group(2) # Current image path
# Only replace if path exists in mapping
if image_path not in images:
return match.group(0) # Keep original
content_replace.add(image_path)
# Get new URL from mapping
image_path = images[image_path]
return f"![{title}]({image_path})" if image_path else title
text = self.replace_pattern.sub(repl, content)
logger.debug(f"Replaced {len(content_replace)} images in markdown")
return text
@staticmethod
def _self_test():
your_content = "test![](data:image/png;base64,iVBORw0KGgoAAAA)test"
image_handle = MarkdownImageUtil()
text, images = image_handle.extract_base64(your_content)
print(text)
for image_url, image_byte in images.items():
with open(image_url, "wb") as f:
f.write(image_byte)
class MarkdownImageBase64(BaseParser):
"""Parser for extracting base64 images from Markdown.
Extracts base64-encoded images, replaces them with path references,
and returns the raw image data in Document.images for the Go-side
ImageResolver (or main.py _resolve_images) to handle storage.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.image_helper = MarkdownImageUtil()
def parse_into_text(self, content: bytes) -> Document:
text = endecode.decode_bytes(content)
text, img_b64 = self.image_helper.extract_base64(text, path_prefix="images")
images: Dict[str, str] = {}
for ipath, raw_bytes in img_b64.items():
images[ipath] = base64.b64encode(raw_bytes).decode()
logger.debug("Extracted %d base64 images from markdown", len(images))
return Document(content=text, images=images)
class MarkdownParser(PipelineParser):
"""Complete Markdown parser using pipeline approach.
This parser processes Markdown content through multiple stages:
1. MarkdownTableFormatter: Standardizes table formatting
2. MarkdownImageBase64: Extracts and uploads base64 images
The pipeline ensures that content flows through each parser in sequence,
with each stage's output becoming the next stage's input.
"""
_parser_cls = (MarkdownTableFormatter, MarkdownImageBase64)
if __name__ == "__main__":
# Example usage and testing
logging.basicConfig(level=logging.DEBUG)
# Test the complete MarkdownParser pipeline
your_content = "test![](data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAMgA)test"
parser = MarkdownParser()
# Parse content and display results
document = parser.parse_into_text(your_content.encode())
logger.info(document.content)
logger.info(f"Images: {len(document.images)}, name: {document.images.keys()}")
# Run individual utility tests
MarkdownImageUtil._self_test()
MarkdownTableUtil._self_test()