diff --git a/docker/nginx/conf.d/default.conf b/docker/nginx/conf.d/default.conf index dad9b45cf..078ec56a9 100644 --- a/docker/nginx/conf.d/default.conf +++ b/docker/nginx/conf.d/default.conf @@ -50,4 +50,15 @@ server { rewrite ^/workspace(/.*)$ $1 break; proxy_pass http://minio:9000; } + + # 添加 /bucket/ 路径的 MinIO 代理配置 + location ~ ^/bucket/(bisheng|tmp-dir)/ { + rewrite ^/bucket/(.*)$ /$1 break; + proxy_pass http://minio:9000; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_read_timeout 300s; + proxy_connect_timeout 75s; + } } \ No newline at end of file diff --git a/src/backend/bisheng/api/services/knowledge_imp.py b/src/backend/bisheng/api/services/knowledge_imp.py index f095e772e..86fe041d9 100644 --- a/src/backend/bisheng/api/services/knowledge_imp.py +++ b/src/backend/bisheng/api/services/knowledge_imp.py @@ -2,7 +2,7 @@ import os import re import time -from typing import Any, Dict, List, Optional, BinaryIO, Union +from typing import Any, Dict, List, Optional, BinaryIO, Union, Tuple import requests from bisheng_langchain.rag.extract_info import extract_title @@ -25,6 +25,8 @@ from bisheng.api.errcode.knowledge import KnowledgeSimilarError from bisheng.api.services.etl4lm_loader import Etl4lmLoader +from bisheng.api.services.mineru_loader import MineruLoader +from bisheng.api.services.mineru_text_splitter import MinerUTextSplitter from bisheng.api.services.handler.impl.xls_split_handle import XlsSplitHandle from bisheng.api.services.handler.impl.xlsx_split_handle import XlsxSplitHandle from bisheng.api.services.libreoffice_converter import ( @@ -56,7 +58,7 @@ from bisheng.interface.embeddings.custom import FakeEmbedding from bisheng.interface.importing.utils import import_vectorstore from bisheng.interface.initialize.loading import instantiate_vectorstore -from bisheng.settings import settings +from bisheng.settings import settings as bisheng_settings from bisheng.utils.embedding import decide_embeddings from bisheng.utils.minio_client import minio_client @@ -67,6 +69,9 @@ "md": TextLoader, "docx": UnstructuredWordDocumentLoader, "pptx": UnstructuredPowerPointLoader, + "png": TextLoader, # 图片格式使用TextLoader作为默认处理 + "jpg": TextLoader, + "jpeg": TextLoader, } split_handles = [ @@ -324,7 +329,7 @@ def decide_vectorstores( param: dict = {"embedding": embedding} if vector_store == "ElasticKeywordsSearch": - vector_config = settings.get_vectors_conf().elasticsearch.model_dump() + vector_config = bisheng_settings.get_vectors_conf().elasticsearch.model_dump() if not vector_config: # 无相关配置 raise RuntimeError("vector_stores.elasticsearch not find in config.yaml") @@ -333,7 +338,7 @@ def decide_vectorstores( vector_config["ssl_verify"] = eval(vector_config["ssl_verify"]) elif vector_store == "Milvus": - vector_config = settings.get_vectors_conf().milvus.model_dump() + vector_config = bisheng_settings.get_vectors_conf().milvus.model_dump() if not vector_config: # 无相关配置 raise RuntimeError("vector_stores.milvus not find in config.yaml") @@ -644,56 +649,118 @@ def parse_document_title(title: str) -> str: def read_chunk_text( - input_file, - file_name, - separator: Optional[List[str]], - separator_rule: Optional[List[str]], + input_file: str, + file_name: str, + separator: List[str], + separator_rule: List[str], chunk_size: int, chunk_overlap: int, - knowledge_id: Optional[int] = None, + knowledge_id: int = None, retain_images: int = 1, enable_formula: int = 1, - force_ocr: int = 1, + force_ocr: int = 0, filter_page_header_footer: int = 0, excel_rule: ExcelRule = None, - no_summary: bool = False, - -) -> (List[str], List[dict], str, Any): # type: ignore +) -> Tuple[List[str], List[Dict], str, List]: """ - 0:chunks text - 1:chunks metadata - 2:parse_type: etl4lm or un_etl4lm - 3: ocr bbox data: maybe None + 读取文件内容并切分 """ - # 获取文档总结标题的llm - llm = None - if not no_summary: - try: - llm = decide_knowledge_llm() - knowledge_llm = LLMService.get_knowledge_llm() - except Exception as e: - logger.exception("knowledge_llm_error:") - raise Exception( - f"文档知识库总结模型已失效,请前往模型管理-系统模型设置中进行配置。{str(e)}" - ) - - text_splitter = ElemCharacterTextSplitter( - separators=separator, - separator_rule=separator_rule, - chunk_size=chunk_size, - chunk_overlap=chunk_overlap, - is_separator_regex=True, - ) - # 加载文档内容 - logger.info(f"start_file_loader file_name={file_name}") - parse_type = ParseType.UN_ETL4LM.value - # excel 文件的处理单独出来 + # 调试日志:记录传入的参数 + logger.info(f"read_chunk_text called with: file_name={file_name}, knowledge_id={knowledge_id}, retain_images={retain_images}") + + if not os.path.exists(input_file): + raise FileNotFoundError(f"文件不存在: {input_file}") + + # 获取文件扩展名 + file_extension_name = file_name.split(".")[-1].lower() + logger.info(f"Processing file: {file_name}, extension: {file_extension_name}") + + # 获取知识库配置 + settings = bisheng_settings + etl_for_lm_url = settings.get_knowledge().get("etl4lm", {}).get("url", "") + provider = settings.get_knowledge().get("etl4lm", {}).get("provider", "etl4lm") + + logger.info(f"ETL4LM settings: url={etl_for_lm_url}, provider={provider}") + + # 初始化 parse_type 和 partitions 变量 + parse_type = ParseType.ETL4LM.value partitions = [] + + # 初始化 texts 变量,避免 NameError texts = [] - etl_for_lm_url = settings.get_knowledge().get("etl4lm", {}).get("url", None) - file_extension_name = file_name.split(".")[-1].lower() + documents = [] + + # 根据文件类型选择加载器 + if file_extension_name in ["pdf", "png", "jpg", "jpeg"]: + # mineru只支持PDF和图片格式 + if provider == "mineru": + # 通过 MinerU FastAPI 服务解析PDF和图片 + logger.info(f"Using MinerU loader for {file_extension_name} with knowledge_id={knowledge_id}") + loader = MineruLoader( + file_name, + input_file, + base_url=etl_for_lm_url, # 例如 http://172.19.0.3:8009 + timeout=settings.get_knowledge().get("etl4lm", {}).get("timeout", 600), + backend=settings.get_knowledge().get("etl4lm", {}).get("backend", "pipeline"), + knowledge_id=knowledge_id, + # pipeline 后端可选配置 + parse_method=settings.get_knowledge().get("etl4lm", {}).get("parse_method", "auto"), + lang=settings.get_knowledge().get("etl4lm", {}).get("lang", "ch"), + formula_enable=settings.get_knowledge().get("etl4lm", {}).get("formula_enable", True), + table_enable=settings.get_knowledge().get("etl4lm", {}).get("table_enable", True), + # vlm-sglang-client 后端可选配置 + server_url=settings.get_knowledge().get("etl4lm", {}).get("server_url", None), + ) + documents = loader.load() + parse_type = ParseType.ETL4LM.value + partitions = getattr(loader, "partitions", None) or [] + + # 使用MinerU专用切分器 + logger.info(f"Using MinerU text splitter for {file_extension_name}") + mineru_splitter = MinerUTextSplitter( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + preserve_headers=True, + preserve_tables=True, + preserve_formulas=True, + min_chunk_size=100, + max_chunk_size=2000 + ) + + # 直接返回切分结果,跳过后续的通用切分 + split_documents, metadatas = mineru_splitter.split_mineru_documents( + documents, knowledge_id + ) + + # 提取文本内容 + raw_texts = [doc.page_content for doc in split_documents] + + # 设置parse_type和partitions + parse_type = ParseType.ETL4LM.value + partitions = parse_partitions(partitions) + + logger.info(f"MinerU processing completed. Generated {len(raw_texts)} chunks") + return raw_texts, metadatas, parse_type, partitions + else: + # 使用 ETL4LM 处理 PDF 和图片 + logger.info(f"Using ETL4LM loader for {file_extension_name} with knowledge_id={knowledge_id}") + loader = Etl4lmLoader( + file_name, + input_file, + unstructured_api_url=etl_for_lm_url, + ocr_sdk_url=settings.get_knowledge().get("etl4lm", {}).get("ocr_sdk_url", ""), + force_ocr=bool(force_ocr), + enable_formular=bool(enable_formula), + timeout=settings.get_knowledge().get("etl4lm", {}).get("timeout", 60), + filter_page_header_footer=bool(filter_page_header_footer), + knowledge_id=knowledge_id, + ) + documents = loader.load() + parse_type = ParseType.ETL4LM.value + partitions = loader.partitions + partitions = parse_partitions(partitions) - if file_extension_name in ["xls", "xlsx", "csv"]: + elif file_extension_name in ["xls", "xlsx", "csv"]: # set default values. if not excel_rule: excel_rule = ExcelRule() @@ -713,6 +780,9 @@ def read_chunk_text( # skip following processes and return splited values. texts, documents = combine_multiple_md_files_to_raw_texts(path=md_files_path) + + # 设置 Excel 文件的 parse_type + parse_type = ParseType.UN_ETL4LM.value elif file_extension_name in ["doc", "docx", "html", "mhtml", "ppt", "pptx"]: @@ -754,10 +824,17 @@ def read_chunk_text( # 沿用原来的方法处理md文件 loader = filetype_load_map["md"](file_path=md_file_name) documents = loader.load() + + # 设置 Office 文档的 parse_type + parse_type = ParseType.UN_ETL4LM.value elif file_extension_name in ["txt", "md"]: loader = filetype_load_map[file_extension_name](file_path=input_file, autodetect_encoding=True) documents = loader.load() + + # 设置文本文件的 parse_type + parse_type = ParseType.UN_ETL4LM.value + else: if etl_for_lm_url: if file_extension_name in ["pdf"]: @@ -765,48 +842,89 @@ def read_chunk_text( if is_pdf_damaged(input_file): raise Exception('The file is damaged.') etl4lm_settings = settings.get_knowledge().get("etl4lm", {}) - loader = Etl4lmLoader( - file_name, - input_file, - unstructured_api_url=etl4lm_settings.get("url", ""), - ocr_sdk_url=etl4lm_settings.get("ocr_sdk_url", ""), - force_ocr=bool(force_ocr), - enable_formular=bool(enable_formula), - timeout=etl4lm_settings.get("timeout", 60), - filter_page_header_footer=bool(filter_page_header_footer), - knowledge_id=knowledge_id, - ) - documents = loader.load() - parse_type = ParseType.ETL4LM.value - partitions = loader.partitions - partitions = parse_partitions(partitions) - else: - if file_extension_name in ['pdf']: - md_file_name, local_image_dir, doc_id = convert_file_to_md( - file_name=file_name, - input_file_name=input_file, + provider = etl4lm_settings.get("provider", "etl4lm").lower() + + # 只有在 mineru 配置下且文件格式是 mineru 支持的格式时,才使用 mineru 处理 + if provider == "mineru" and file_extension_name in ["pdf", "png", "jpg", "jpeg"]: + # 通过 MinerU FastAPI 服务解析PDF和图片 + loader = MineruLoader( + file_name, + input_file, + base_url=etl4lm_settings.get("url", ""), # 例如 http://172.19.0.3:8009 + timeout=etl4lm_settings.get("timeout", 600), + backend=etl4lm_settings.get("backend", "pipeline"), knowledge_id=knowledge_id, - retain_images=bool(retain_images), + # pipeline 后端可选配置 + parse_method=etl4lm_settings.get("parse_method", "auto"), + lang=etl4lm_settings.get("lang", "ch"), + formula_enable=etl4lm_settings.get("formula_enable", True), + table_enable=etl4lm_settings.get("table_enable", True), + # vlm-sglang-client 后端可选配置 + server_url=etl4lm_settings.get("server_url", None), ) - if not md_file_name: raise Exception(f"failed to parse {file_name}, please check backend log") - - # save images to minio - if local_image_dir and retain_images == 1: - put_images_to_minio( - local_image_dir=local_image_dir, - knowledge_id=knowledge_id, - doc_id=doc_id, - ) - # 沿用原来的方法处理md文件 - loader = filetype_load_map["md"](file_path=md_file_name) documents = loader.load() + parse_type = ParseType.ETL4LM.value + partitions = getattr(loader, "partitions", None) or [] + + # 使用MinerU专用切分器 + logger.info(f"Using MinerU text splitter for {file_extension_name}") + mineru_splitter = MinerUTextSplitter( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + preserve_headers=True, + preserve_tables=True, + preserve_formulas=True, + min_chunk_size=100, + max_chunk_size=2000 + ) + + # 直接返回切分结果,跳过后续的通用切分 + split_documents, metadatas = mineru_splitter.split_mineru_documents( + documents, knowledge_id + ) + + # 提取文本内容 + raw_texts = [doc.page_content for doc in split_documents] + + # 设置parse_type和partitions + parse_type = ParseType.ETL4LM.value + partitions = parse_partitions(partitions) + + logger.info(f"MinerU processing completed. Generated {len(raw_texts)} chunks") + return raw_texts, metadatas, parse_type, partitions + elif provider == "etl4lm": + # 只有在明确配置为 etl4lm 时才使用 Etl4lmLoader + logger.info(f"Using ETL4LM loader for {file_extension_name} with knowledge_id={knowledge_id}") + loader = Etl4lmLoader( + file_name, + input_file, + unstructured_api_url=etl4lm_settings.get("url", ""), + ocr_sdk_url=etl4lm_settings.get("ocr_sdk_url", ""), + force_ocr=bool(force_ocr), + enable_formular=bool(enable_formula), + timeout=etl4lm_settings.get("timeout", 60), + filter_page_header_footer=bool(filter_page_header_footer), + knowledge_id=knowledge_id, + ) + documents = loader.load() + parse_type = ParseType.ETL4LM.value + partitions = loader.partitions + partitions = parse_partitions(partitions) else: + # 其他情况(包括 mineru 配置但不支持的文件格式)使用默认处理逻辑 + logger.info(f"Provider {provider} not supported for {file_extension_name}, using default processing") if file_extension_name not in filetype_load_map: raise Exception("类型不支持") loader = filetype_load_map[file_extension_name](file_path=input_file) documents = loader.load() + parse_type = ParseType.UN_ETL4LM.value logger.info(f"start_extract_title file_name={file_name}") + + # 获取知识库 LLM 配置和对象 + llm = decide_knowledge_llm() + knowledge_llm = LLMService.get_knowledge_llm() if llm else None + if llm: t = time.time() for one in documents: @@ -820,12 +938,109 @@ def read_chunk_text( one.metadata["title"] = parse_document_title(title) logger.info("file_extract_title=success timecost={}", time.time() - t) + # 为 Office 文档类型优化分隔符策略 + if file_extension_name in ["doc", "docx", "html", "mhtml", "ppt", "pptx"]: + # 使用传入的 separator 参数,为所有 Office 文档类型提供更好的切分器选择 + + # 修复分隔符问题:将转义的分隔符转换为真正的换行符 + processed_separators = [] + if separator: + for sep in separator: + if sep == '\\n\\n': + processed_separators.append('\n\n') + elif sep == '\\n': + processed_separators.append('\n') + else: + processed_separators.append(sep) + else: + # 如果没有提供分隔符,使用默认值 + processed_separators = ['\n\n', '\n'] + + # 参数验证和默认值保护 - 只在参数无效时才使用默认值 + original_chunk_size = chunk_size + original_chunk_overlap = chunk_overlap + + if not chunk_size or chunk_size <= 0: + chunk_size = 1000 + logger.warning(f"WARNING: Invalid chunk_size ({original_chunk_size}), using default value: {chunk_size}") + + # 修复 chunk_overlap = 0 的问题 + if not chunk_overlap or chunk_overlap < 0: + chunk_overlap = 100 + logger.warning(f"WARNING: Invalid chunk_overlap ({original_chunk_overlap}), using default value: {chunk_overlap}") + elif chunk_overlap == 0: + # chunk_overlap = 0 会导致切分失败,强制使用合理值 + chunk_overlap = min(chunk_size // 10, 200) # 使用 chunk_size 的 1/10,但不超过 200 + logger.warning(f"WARNING: chunk_overlap is 0, which will cause splitting failure. Adjusted to: {chunk_overlap}") + + # 参数合理性检查 - 只给出警告,不强制修改 + if chunk_size > 10000: + logger.warning(f"WARNING: chunk_size ({chunk_size}) is very large, this may cause issues") + if chunk_overlap >= chunk_size: + chunk_overlap = min(chunk_size // 2, 200) + logger.warning(f"WARNING: chunk_overlap ({original_chunk_overlap}) >= chunk_size ({chunk_size}), adjusted to: {chunk_overlap}") + + # 为所有 Office 文档类型使用 RecursiveCharacterTextSplitter 以支持多个分隔符 + from langchain.text_splitter import RecursiveCharacterTextSplitter + text_splitter = RecursiveCharacterTextSplitter( + separators=processed_separators, # 使用处理后的分隔符 + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + add_start_index=True, + # 添加更多配置选项以确保正确切分 + length_function=len, # 使用字符长度函数 + is_separator_regex=False, # 分隔符不是正则表达式 + ) + else: + # 其他文件使用 CharacterTextSplitter + separator_str = separator[0] if separator else "\n\n" + + # 参数验证和默认值保护 - 只在参数无效时才使用默认值 + original_chunk_size = chunk_size + original_chunk_overlap = chunk_overlap + + if not chunk_size or chunk_size <= 0: + chunk_size = 1000 + logger.warning(f"WARNING: Invalid chunk_size ({original_chunk_size}), using default value: {chunk_size}") + if not chunk_overlap or chunk_overlap < 0: + chunk_overlap = 100 + logger.warning(f"WARNING: Invalid chunk_overlap ({original_chunk_overlap}), using default value: {chunk_overlap}") + + # 参数合理性检查 - 只给出警告,不强制修改 + if chunk_size > 10000: + logger.warning(f"WARNING: chunk_size ({chunk_size}) is very large, this may cause issues") + if chunk_overlap >= chunk_size: + logger.warning(f"WARNING: chunk_overlap ({chunk_overlap}) >= chunk_size ({chunk_size}), this may cause issues") + + text_splitter = CharacterTextSplitter( + separator=separator_str, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + add_start_index=True, + ) + + # 统一进行文本切分 if file_extension_name in ["xls", "xlsx", "csv"]: - for one in texts: - one.metadata["title"] = documents[0].metadata.get("title", "") + # Excel 文件已经在前面处理过了,只需要设置标题 + if texts and len(texts) > 0: + for one in texts: + if hasattr(one, 'metadata') and one.metadata: + one.metadata["title"] = documents[0].metadata.get("title", "") if documents and len(documents) > 0 else "" + logger.info(f"Excel file processing completed - generated {len(texts)} chunks") else: - logger.info(f"start_split_text file_name={file_name}") - texts = text_splitter.split_documents(documents) + # 其他文件类型使用 text_splitter 进行切分 + if documents and len(documents) > 0: + logger.info(f"start_split_text file_name={file_name}") + texts = text_splitter.split_documents(documents) + + # 检查是否有过大的chunk + for i, text in enumerate(texts): + chunk_size_actual = len(text.page_content) if hasattr(text, 'page_content') else len(str(text)) + if chunk_size_actual > 10000: + logger.warning(f"WARNING: Chunk {i+1} is very large ({chunk_size_actual} characters), may indicate splitting failure") + else: + logger.warning(f"WARNING: No documents to split for {file_name}") + texts = [] raw_texts = [t.page_content for t in texts] logger.info(f"start_process_metadata file_name={file_name}") diff --git a/src/backend/bisheng/api/services/mineru_loader.py b/src/backend/bisheng/api/services/mineru_loader.py new file mode 100644 index 000000000..a1a12acdb --- /dev/null +++ b/src/backend/bisheng/api/services/mineru_loader.py @@ -0,0 +1,274 @@ +# bisheng/api/services/mineru_loader.py +from typing import List, Optional, Dict, Any +from langchain_community.document_loaders.pdf import BasePDFLoader +from langchain_community.docstore.document import Document +import requests +import os +from uuid import uuid4 +import base64 +import re +from pathlib import Path + +from bisheng.api.services.etl4lm_loader import merge_partitions +from bisheng.utils.minio_client import MinioClient +from bisheng.utils.logger import logger + +class MineruLoader(BasePDFLoader): + def __init__( + self, + file_name: str, + file_path: str, + base_url: str, + timeout: int = 600, + backend: str = "pipeline", + knowledge_id: int | None = None, + **kwargs: Any + ) -> None: + self.file_name = file_name + self.base_url = base_url.rstrip("/") + self.timeout = timeout + self.backend = backend + self.knowledge_id = knowledge_id + self.extra = kwargs + self.partitions = [] + super().__init__(file_path) + + def _store_images_to_minio(self, images_data: Dict[str, Any], doc_id: str) -> Dict[str, str]: + """将 MinerU 返回的 base64 图片存储到 MinIO,返回图片路径映射""" + if not images_data: + return {} + + # 调试日志:确认 knowledge_id 的值 + logger.info(f"Storing images to MinIO: knowledge_id={self.knowledge_id}, doc_id={doc_id}, images count={len(images_data)}") + + minio_client = MinioClient() + image_path_mapping = {} + + for image_name, image_info in images_data.items(): + try: + # 检查是否是 base64 图片数据 + if isinstance(image_info, str) and image_info.startswith('data:image/'): + # 提取 base64 数据 + header, base64_data = image_info.split(',', 1) + # 获取图片格式 + format_match = re.search(r'data:image/(\w+)', header) + image_format = format_match.group(1) if format_match else 'jpg' + + # 解码 base64 数据 + image_bytes = base64.b64decode(base64_data) + + if self.knowledge_id: + # 正式模式:存储到知识库目录,包含文档 ID + minio_path = f"knowledge/images/{self.knowledge_id}/{doc_id}/{image_name}" + bucket_name = minio_client.bucket + else: + # 预览模式:存储到主桶的临时目录,避免临时桶访问权限问题 + minio_path = f"tmp/preview_images/{doc_id}/{image_name}" + bucket_name = minio_client.bucket # 使用主桶而不是临时桶 + + # 上传到 MinIO + minio_client.upload_minio_data( + object_name=minio_path, + data=image_bytes, + length=len(image_bytes), + content_type=f"image/{image_format}" + ) + + # 测试 MinIO 访问策略是否生效 + try: + # 尝试直接访问刚上传的图片 + test_url = f"http://minio:9000/{bucket_name}/{minio_path}" + logger.info(f"Testing MinIO access: {test_url}") + + # 检查文件是否存在 + if minio_client.object_exists(bucket_name, minio_path): + logger.info(f"Image {image_name} successfully uploaded and accessible") + else: + logger.warning(f"Image {image_name} uploaded but not accessible") + except Exception as e: + logger.error(f"Failed to test MinIO access for {image_name}: {str(e)}") + + # 生成可访问的 URL + if self.knowledge_id: + # 正式模式:使用知识库路径,包含文档 ID + image_url = f"/bucket/bisheng/knowledge/images/{self.knowledge_id}/{doc_id}/{image_name}" + else: + # 预览模式:使用 MinIO 路径,避免文本块过长 + # 这样既能显示图片,又不会影响后续的 Embedding 处理 + image_url = f"/bucket/bisheng/tmp/preview_images/{doc_id}/{image_name}" + logger.info(f"Using MinIO path for {image_name} in preview mode: {image_url}") + + image_path_mapping[image_name] = image_url + + logger.info(f"Successfully stored image {image_name} to MinIO: {minio_path} (bucket: {bucket_name})") + + except Exception as e: + logger.error(f"Failed to store image {image_name} to MinIO: {str(e)}") + continue + + return image_path_mapping + + def _replace_image_links_in_markdown(self, markdown_content: str, image_path_mapping: Dict[str, str]) -> str: + """替换 Markdown 中的相对图片路径为可访问的 URL""" + if not image_path_mapping: + # 如果没有图片映射(预览模式),保持原始链接 + logger.info("No image mapping available, keeping original image links in markdown") + return markdown_content + + # 有图片映射时,替换为可访问的 URL + logger.info(f"Replacing {len(image_path_mapping)} image links with accessible URLs") + for image_name, image_url in image_path_mapping.items(): + # 匹配相对路径格式 + relative_pattern = f"images/{image_name}" + + # 替换为对应的 URL + markdown_content = markdown_content.replace(relative_pattern, image_url) + logger.debug(f"Replaced {relative_pattern} with: {image_url}") + + return markdown_content + + def load(self) -> List[Document]: + with open(self.file_path, "rb") as f: + files = [("files", f)] + data: Dict[str, Any] = { + "backend": self.backend, + "return_md": True, + "return_content_list": True, + "return_info": True, + "return_layout": False, + "return_images": True, + "is_json_md_dump": False, + "output_dir": "output", + **self.extra, + } + resp = requests.post( + f"{self.base_url}/file_parse", files=files, data=data, timeout=self.timeout + ) + resp.raise_for_status() + result = resp.json() + + # 从 MinerU API 响应中正确提取数据 + # MinerU 返回的数据结构:{"results": {"file_hash": {...}}} + results = result.get("results", {}) + if not results: + logger.warning("MinerU API returned empty results") + return [Document(page_content="", metadata={"source": self.file_name})] + + # 获取第一个结果(通常只有一个文件) + first_result = next(iter(results.values())) + md_content = first_result.get("md_content", "") + middle_json = first_result.get("middle_json", {}) + images_data = first_result.get("images", {}) + + logger.info(f"Successfully extracted from MinerU: md_content length={len(md_content)}, images count={len(images_data)}") + + # 生成文档 ID + doc_id = str(uuid4()) + + # 存储图片到 MinIO 并获取路径映射 + image_path_mapping = self._store_images_to_minio(images_data, doc_id) + + # 替换 Markdown 中的图片链接 + if image_path_mapping: + md_content = self._replace_image_links_in_markdown(md_content, image_path_mapping) + + # 构建 partitions 结构(用于原文定位) + if middle_json and "pdf_info" in middle_json: + pdf_info = middle_json["pdf_info"] + partitions = [] + + for page_num, page_data in enumerate(pdf_info.get("pages", [])): + page_partitions = [] + + # 处理预处理块 + for block in page_data.get("preproc_blocks", []): + if "lines" in block: + text_parts = [] + bboxes = [] + indexes = [] + + for line in block["lines"]: + for span in line.get("spans", []): + if "text" in span: + text_parts.append(span["text"]) + if "bbox" in span: + bboxes.append(span["bbox"]) + indexes.append(len("".join(text_parts)) - len(span["text"])) + + if text_parts: + text = "".join(text_parts) + partition = { + "text": text, + "metadata": { + "extra_data": { + "bboxes": bboxes, + "indexes": indexes, + "pages": [page_num + 1], + "types": [block.get("type", "Paragraph")] + } + } + } + page_partitions.append(partition) + + # 处理丢弃块 + for block in page_data.get("discarded_blocks", []): + if "lines" in block: + text_parts = [] + bboxes = [] + indexes = [] + + for line in block["lines"]: + for span in line.get("spans", []): + if "text" in span: + text_parts.append(span["text"]) + if "bbox" in span: + bboxes.append(span["bbox"]) + indexes.append(len("".join(text_parts)) - len(span["text"])) + + if text_parts: + text = "".join(text_parts) + partition = { + "text": text, + "metadata": { + "extra_data": { + "bboxes": bboxes, + "indexes": indexes, + "pages": [page_num + 1], + "types": [block.get("type", "Paragraph")] + } + } + } + page_partitions.append(partition) + + partitions.extend(page_partitions) + + self.partitions = partitions + logger.info(f"Built {len(partitions)} partitions from middle_json") + + # 确保 md_content 不为空 + if not md_content or not md_content.strip(): + logger.error("MinerU returned empty md_content") + return [Document(page_content="", metadata={"source": self.file_name})] + + # 尝试使用 merge_partitions 处理图片和生成最终内容 + try: + if self.partitions and self.knowledge_id: + content, metadata = merge_partitions(self.file_path, self.partitions, self.knowledge_id) + # 若合成内容为空,则使用处理后的 md_content + if not content or not str(content).strip(): + logger.info("merge_partitions returned empty content, using md_content") + final_doc = Document(page_content=md_content, metadata={"source": self.file_name}) + else: + logger.info("Successfully merged partitions") + final_doc = Document(page_content=content, metadata=metadata) + else: + # 没有 partitions 或 knowledge_id,直接使用处理后的 md_content + logger.info(f"No partitions or knowledge_id, using md_content directly. partitions: {len(self.partitions)}, knowledge_id: {self.knowledge_id}") + final_doc = Document(page_content=md_content, metadata={"source": self.file_name}) + except Exception as e: + logger.warning(f"Failed to merge partitions, falling back to md_content: {str(e)}") + # 回退到处理后的 md_content + final_doc = Document(page_content=md_content, metadata={"source": self.file_name}) + + logger.info(f"Final document content length: {len(final_doc.page_content)}") + return [final_doc] diff --git a/src/backend/bisheng/api/services/mineru_text_splitter.py b/src/backend/bisheng/api/services/mineru_text_splitter.py new file mode 100644 index 000000000..157d02566 --- /dev/null +++ b/src/backend/bisheng/api/services/mineru_text_splitter.py @@ -0,0 +1,557 @@ +import json +import re +from typing import List, Dict, Tuple +from langchain.schema.document import Document +from loguru import logger + + +class MinerUTextSplitter: + """专门为MinerU解析结果设计的文本切分器""" + + def __init__( + self, + chunk_size: int = 1000, + chunk_overlap: int = 200, + separator_patterns: List[str] = None, + preserve_headers: bool = True, + preserve_tables: bool = True, + preserve_formulas: bool = True, + min_chunk_size: int = 100, + max_chunk_size: int = 2000 + ): + self.chunk_size = chunk_size + self.chunk_overlap = chunk_overlap + self.separator_patterns = separator_patterns or [ + "\n\n", # 段落分隔 + "\n", # 行分隔 + "。", # 中文句号 + "!", # 中文感叹号 + "?", # 中文问号 + ". ", # 英文句号 + "! ", # 英文感叹号 + "? ", # 英文问号 + ";", # 中文分号 + "; ", # 英文分号 + ] + self.preserve_headers = preserve_headers + self.preserve_tables = preserve_tables + self.preserve_formulas = preserve_formulas + self.min_chunk_size = min_chunk_size + self.max_chunk_size = max_chunk_size + + def split_mineru_documents( + self, + documents: List[Document], + knowledge_id: int = None + ) -> Tuple[List[Document], List[Dict]]: + """ + 专门处理MinerU返回的Document对象 + + Args: + documents: MinerU返回的Document列表 + knowledge_id: 知识库ID + + Returns: + Tuple[List[Document], List[Dict]]: 切分后的文档和元数据 + """ + split_documents = [] + metadatas = [] + + logger.info(f"Starting MinerU text splitting for {len(documents)} documents") + + for doc_index, doc in enumerate(documents): + logger.info(f"Processing document {doc_index + 1}/{len(documents)}") + + # 分析文档结构 + doc_structure = self._analyze_document_structure(doc.page_content) + + # 智能切分 + chunks = self._smart_split_text(doc.page_content, doc_structure) + + logger.info(f"Document {doc_index + 1} split into {len(chunks)} chunks") + + # 为每个chunk创建Document对象 + for chunk_index, chunk in enumerate(chunks): + chunk_doc = Document( + page_content=chunk, + metadata={ + "source": doc.metadata.get("source", ""), + "page": doc.metadata.get("page", 1), + "chunk_index": chunk_index, + "total_chunks": len(chunks), + "doc_index": doc_index, + "knowledge_id": knowledge_id, + "parse_type": "mineru", + "chunk_bboxes": doc.metadata.get("chunk_bboxes", []), + "bbox": json.dumps({"chunk_bboxes": doc.metadata.get("chunk_bboxes", "")}), + "title": doc.metadata.get("title", ""), + "extra": "", + } + ) + split_documents.append(chunk_doc) + + # 创建对应的metadata + metadata = { + "bbox": json.dumps({"chunk_bboxes": doc.metadata.get("chunk_bboxes", "")}), + "page": doc.metadata.get("page", 1), + "source": doc.metadata.get("source", ""), + "title": doc.metadata.get("title", ""), + "chunk_index": chunk_index, + "extra": "", + } + metadatas.append(metadata) + + logger.info(f"MinerU text splitting completed. Total chunks: {len(split_documents)}") + return split_documents, metadatas + + def _analyze_document_structure(self, text: str) -> Dict: + """分析文档结构,识别标题层级关系、表格、公式和内容段落""" + structure = { + "headers": [], + "header_hierarchy": {}, # 标题层级关系 + "content_sections": [], # 内容段落 + "tables": [], + "formulas": [], + "code_blocks": [] # 代码块 + } + + lines = text.split('\n') + current_header = None + current_content = [] + in_table = False + in_formula = False + in_code_block = False + current_table = [] + current_formula = [] + current_code_block = [] + + for line_num, line in enumerate(lines): + original_line = line + line = line.strip() + + # 检查是否进入或退出代码块 + if line.startswith('```'): + if in_code_block: + # 退出代码块 + current_code_block.append(original_line) + structure["code_blocks"].append({ + "content": "\n".join(current_code_block), + "start_line": current_code_block[0].split('\n')[0] if current_code_block else line_num, + "end_line": line_num + }) + current_code_block = [] + in_code_block = False + else: + # 进入代码块 + in_code_block = True + current_code_block = [original_line] + continue + + # 如果在代码块中,直接添加 + if in_code_block: + current_code_block.append(original_line) + continue + + # 检查是否进入或退出公式 + if line.startswith('$$') or line.startswith('$'): + if in_formula: + # 退出公式 + current_formula.append(original_line) + structure["formulas"].append({ + "content": "\n".join(current_formula), + "start_line": current_formula[0].split('\n')[0] if current_formula else line_num, + "end_line": line_num, + "type": "block" if line.startswith('$$') else "inline" + }) + current_formula = [] + in_formula = False + else: + # 进入公式 + in_formula = True + current_formula = [original_line] + continue + + # 如果在公式中,直接添加 + if in_formula: + current_formula.append(original_line) + continue + + # 检查表格 + if self._is_table_line(line): + if not in_table: + # 开始新表格 + in_table = True + current_table = [original_line] + else: + # 继续当前表格 + current_table.append(original_line) + continue + elif in_table: + # 退出表格 + if current_table: + structure["tables"].append({ + "content": "\n".join(current_table), + "start_line": current_table[0].split('\n')[0] if current_table else line_num - len(current_table), + "end_line": line_num - 1, + "rows": len(current_table) + }) + current_table = [] + in_table = False + + # 检查标题 + if line.startswith('#'): + # 保存之前的内容段落 + if current_header and current_content: + structure["content_sections"].append({ + "header": current_header, + "content": "\n".join(current_content), + "start_line": current_header["line"], + "end_line": line_num - 1 + }) + + # 识别新标题 + header_level = len(line) - len(line.lstrip('#')) + header_text = line.lstrip('# ').strip() + + current_header = { + "level": header_level, + "text": header_text, + "line": line_num, + "full_line": original_line + } + + structure["headers"].append(current_header) + current_content = [] + else: + # 非标题行,添加到当前内容 + if current_header: + current_content.append(original_line) + else: + # 文档开头没有标题的内容 + current_content.append(original_line) + + # 保存最后一个内容段落 + if current_header and current_content: + structure["content_sections"].append({ + "header": current_header, + "content": "\n".join(current_content), + "start_line": current_header["line"], + "end_line": len(lines) - 1 + }) + elif current_content and not current_header: + # 处理没有标题的内容 + structure["content_sections"].append({ + "header": None, + "content": "\n".join(current_content), + "start_line": 0, + "end_line": len(lines) - 1 + }) + + # 处理未闭合的表格、公式、代码块 + if in_table and current_table: + structure["tables"].append({ + "content": "\n".join(current_table), + "start_line": current_table[0].split('\n')[0] if current_table else len(lines) - len(current_table), + "end_line": len(lines) - 1, + "rows": len(current_table) + }) + + if in_formula and current_formula: + structure["formulas"].append({ + "content": "\n".join(current_formula), + "start_line": current_formula[0].split('\n')[0] if current_formula else len(lines) - len(current_formula), + "end_line": len(lines) - 1, + "type": "unclosed" + }) + + if in_code_block and current_code_block: + structure["code_blocks"].append({ + "content": "\n".join(current_code_block), + "start_line": current_code_block[0].split('\n')[0] if current_code_block else len(lines) - len(current_code_block), + "end_line": len(lines) - 1 + }) + + logger.info(f"Document structure analysis completed: {len(structure['headers'])} headers, {len(structure['tables'])} tables, {len(structure['formulas'])} formulas, {len(structure['code_blocks'])} code blocks") + return structure + + def _is_table_line(self, line: str) -> bool: + """判断是否为表格行""" + # 检查是否包含表格分隔符 + if '|' in line: + # 计算分隔符数量,至少需要2个分隔符才能形成表格 + pipe_count = line.count('|') + if pipe_count >= 2: + return True + + # 检查是否包含表格对齐标记(如 :---, ---: 等) + if re.match(r'^[\s]*:?[-]+:?[\s]*$', line): + return True + + return False + + def _smart_split_text(self, text: str, structure: Dict) -> List[str]: + """智能切分文本,保持标题、表格、公式和内容的完整性""" + chunks = [] + + # 基于内容段落进行智能切分 + content_sections = structure.get("content_sections", []) + tables = structure.get("tables", []) + formulas = structure.get("formulas", []) + code_blocks = structure.get("code_blocks", []) + + logger.info(f"Found {len(content_sections)} content sections, {len(tables)} tables, {len(formulas)} formulas, {len(code_blocks)} code blocks") + + # 处理内容段落 + for i, section in enumerate(content_sections): + header = section.get("header") + content = section.get("content", "") + + if header: + # 有标题的段落:标题 + 内容 + section_text = header["full_line"] + "\n" + content if content else header["full_line"] + logger.info(f"Section {i+1}: Header '{header['text']}' (level {header['level']}) with {len(content)} chars content") + else: + # 没有标题的段落:只有内容 + section_text = content + logger.info(f"Section {i+1}: No header, content length: {len(content)} chars") + + if not section_text.strip(): + continue + + # 如果单个段落超过限制,需要进一步切分 + if len(section_text) > self.chunk_size: + logger.info(f"Section {i+1} exceeds chunk size ({len(section_text)} > {self.chunk_size}), splitting...") + if header: + # 有标题的段落,在保持标题完整性的前提下切分 + sub_chunks = self._split_chunk_with_header_preservation(section_text, header) + logger.info(f"Header-preserved splitting resulted in {len(sub_chunks)} sub-chunks") + chunks.extend(sub_chunks) + else: + # 没有标题的段落,按句子切分 + sub_chunks = self._split_long_paragraph(section_text) + logger.info(f"Paragraph splitting resulted in {len(sub_chunks)} sub-chunks") + chunks.extend(sub_chunks) + else: + # 段落大小合适,直接添加 + chunks.append(section_text.strip()) + logger.info(f"Section {i+1} fits in single chunk ({len(section_text)} chars)") + + # 处理表格 - 表格应该保持完整,不被分割 + for i, table in enumerate(tables): + table_content = table["content"] + if len(table_content) > self.chunk_size: + logger.warning(f"Table {i+1} is very large ({len(table_content)} chars), attempting to split while preserving structure") + # 尝试切分表格,保持结构完整性 + table_chunks = self._split_large_table(table_content) + chunks.extend(table_chunks) + logger.info(f"Table {i+1} split into {len(table_chunks)} chunks while preserving structure") + else: + # 表格作为一个独立的chunk添加 + chunks.append(table_content) + logger.info(f"Added table {i+1} as complete chunk ({len(table_content)} chars)") + + # 处理公式 - 公式应该保持完整 + for i, formula in enumerate(formulas): + formula_content = formula["content"] + formula_type = formula.get("type", "unknown") + + if len(formula_content) > self.chunk_size: + logger.warning(f"Formula {i+1} ({formula_type}) is very large ({len(formula_content)} chars), but will be kept intact") + + # 公式作为一个独立的chunk添加 + chunks.append(formula_content) + logger.info(f"Added formula {i+1} ({formula_type}) as complete chunk ({len(formula_content)} chars)") + + # 处理代码块 - 代码块应该保持完整 + for i, code_block in enumerate(code_blocks): + code_content = code_block["content"] + + if len(code_content) > self.chunk_size: + logger.warning(f"Code block {i+1} is very large ({len(code_content)} chars), but will be kept intact") + + # 代码块作为一个独立的chunk添加 + chunks.append(code_content) + logger.info(f"Added code block {i+1} as complete chunk ({len(code_content)} chars)") + + # 如果chunks为空,回退到原来的切分方法 + if not chunks: + logger.warning("No chunks generated from smart splitting, falling back to original method") + chunks = self._fallback_split_text(text) + + # 确保chunk大小在合理范围内 + chunks = self._adjust_chunk_sizes(chunks) + + logger.info(f"Smart text splitting completed. Generated {len(chunks)} chunks") + return chunks + + def _split_large_table(self, table_content: str, max_chunk_size: int = None) -> List[str]: + """切分过大的表格,保持表格结构完整性""" + if max_chunk_size is None: + max_chunk_size = self.chunk_size + + if len(table_content) <= max_chunk_size: + return [table_content] + + lines = table_content.split('\n') + chunks = [] + current_chunk = [] + current_size = 0 + + for line in lines: + line_size = len(line) + 1 # +1 for newline + + # 如果当前行是表格分隔符(如 | --- | --- |),应该与前面的内容保持在一起 + if re.match(r'^[\s]*:?[-]+:?[\s]*$', line): + # 分隔符行,强制添加到当前chunk + current_chunk.append(line) + current_size += line_size + elif current_size + line_size <= max_chunk_size: + # 可以添加到当前chunk + current_chunk.append(line) + current_size += line_size + else: + # 当前chunk已满,保存并开始新的 + if current_chunk: + chunks.append('\n'.join(current_chunk)) + + # 开始新的chunk + current_chunk = [line] + current_size = line_size + + # 添加最后一个chunk + if current_chunk: + chunks.append('\n'.join(current_chunk)) + + return chunks + + def _split_long_paragraph(self, paragraph: str) -> List[str]: + """切分过长的段落""" + chunks = [] + current_chunk = "" + + # 按句子切分 + sentences = re.split(r'([。!?.!?])', paragraph) + + for i in range(0, len(sentences), 2): + sentence = sentences[i] + if i + 1 < len(sentences): + sentence += sentences[i + 1] # 加上标点符号 + + if len(current_chunk) + len(sentence) <= self.chunk_size: + current_chunk += sentence + else: + if current_chunk: + chunks.append(current_chunk.strip()) + current_chunk = sentence + + if current_chunk: + chunks.append(current_chunk.strip()) + + return chunks + + def _adjust_chunk_sizes(self, chunks: List[str]) -> List[str]: + """调整chunk大小,确保在合理范围内""" + adjusted_chunks = [] + + for chunk in chunks: + if len(chunk) < self.min_chunk_size: + # 如果chunk太小,尝试合并 + if adjusted_chunks and len(adjusted_chunks[-1]) + len(chunk) <= self.max_chunk_size: + adjusted_chunks[-1] += "\n\n" + chunk + else: + adjusted_chunks.append(chunk) + elif len(chunk) > self.max_chunk_size: + # 如果chunk太大,进一步切分 + sub_chunks = self._split_long_paragraph(chunk) + adjusted_chunks.extend(sub_chunks) + else: + adjusted_chunks.append(chunk) + + return adjusted_chunks + + def _split_chunk_with_header_preservation(self, chunk: str, header: dict) -> List[str]: + """在保持标题完整性的前提下切分chunk""" + if not header: + # 如果没有标题,按段落切分 + return self._split_long_paragraph(chunk) + + chunks = [] + lines = chunk.split('\n') + + # 找到标题行 + header_index = -1 + for i, line in enumerate(lines): + if line.strip() == header["full_line"].strip(): + header_index = i + break + + if header_index == -1: + return [chunk] + + # 从标题开始,按段落切分 + current_chunk = header["full_line"] + current_size = len(header["full_line"]) + + for i in range(header_index + 1, len(lines)): + line = lines[i] + line_size = len(line) + 1 # +1 for newline + + if current_size + line_size <= self.chunk_size: + current_chunk += "\n" + line + current_size += line_size + else: + # 当前chunk已满,保存并开始新的 + if current_chunk: + chunks.append(current_chunk.strip()) + + # 如果下一行也是标题,从标题开始新chunk + if line.startswith('#'): + current_chunk = line + current_size = len(line) + else: + # 否则从内容开始,但保持上下文 + current_chunk = header["full_line"] + "\n" + line + current_size = len(header["full_line"]) + 1 + len(line) + + # 添加最后一个chunk + if current_chunk: + chunks.append(current_chunk.strip()) + + return chunks + + def _fallback_split_text(self, text: str) -> List[str]: + """回退到原来的切分方法""" + chunks = [] + current_chunk = "" + + # 按段落切分 + paragraphs = text.split('\n\n') + + for paragraph in paragraphs: + if not paragraph.strip(): + continue + + # 如果当前chunk加上新段落不超过限制,则添加 + if len(current_chunk) + len(paragraph) <= self.chunk_size: + if current_chunk: + current_chunk += "\n\n" + paragraph + else: + current_chunk = paragraph + else: + # 当前chunk已满,保存并开始新的chunk + if current_chunk: + chunks.append(current_chunk.strip()) + + # 如果单个段落就超过限制,需要进一步切分 + if len(paragraph) > self.chunk_size: + sub_chunks = self._split_long_paragraph(paragraph) + chunks.extend(sub_chunks) + current_chunk = "" + else: + current_chunk = paragraph + + # 添加最后一个chunk + if current_chunk: + chunks.append(current_chunk.strip()) + + return chunks diff --git a/src/backend/bisheng/initdb_config.yaml b/src/backend/bisheng/initdb_config.yaml index e09bb1d56..4fe04e39a 100644 --- a/src/backend/bisheng/initdb_config.yaml +++ b/src/backend/bisheng/initdb_config.yaml @@ -1,9 +1,19 @@ knowledges: # 知识库相关配置 etl4lm: # 文档解析模型服务配置,包括OCR、版式分析、表格识别、公式识别等 - url: "" # http://192.168.106.12:8180/v1/etl4llm/predict + # 当 provider=etl4lm 时:url 建议配置为 etl4lm 的 partition 接口地址 + # 当 provider=mineru 时:url 配置为 MinerU FastAPI 的基础地址(不带 /file_parse),例如 http://172.19.0.3:8009 + url: "http://172.19.0.3:8009" # http://192.168.106.12:8180/v1/etl4llm/predict timeout: 600 - # OCR SDK服务地址,默认为空则使用ETL4LM自带的轻量OCR模型(速度快,对于困难场景效果一般),若填写OCR SDK服务地址则使用高精度的OCR模型。 + # 解析后端提供方:etl4lm 或 mineru + provider: "mineru" + # 以下参数主要用于 provider=mineru 且 backend=pipeline 的场景 + backend: "pipeline" # pipeline 或 vlm-sglang-client + parse_method: "auto" # auto/ocr/txt 等(pipeline 后端) + lang: "ch" + formula_enable: true + table_enable: true + # OCR SDK服务地址,仅 provider=etl4lm 时生效;为空则使用内置OCR ocr_sdk_url: "" llm_request: @@ -69,6 +79,6 @@ linsight: # 灵思任务执行过程中模型调用重试间隔时间(秒) retry_sleep: 5 # 生成SOP时,prompt里放的用户上传文件信息的数量 - max_file_num: 5 + max_file_num: 50 # 生成SOP时,prompt里放的组织知识库的最大数量 - max_knowledge_num: 20 + max_knowledge_num: 200