Skip to content

Commit a74dcf5

Browse files
committed
refactor: optimize docparse chunking and move resolve_full_path to operator
- Move resolve_full_path to operator.py for reusability - Optimize ai_parse_document chunking to respect DEFAULT_CHUNK_SIZE - Add tokens count to chunk output and num_tokens to metadata - Remove fallback mechanism in ai_parse_document - Unified response format for success and error cases
1 parent 2af6a35 commit a74dcf5

File tree

2 files changed

+108
-58
lines changed

2 files changed

+108
-58
lines changed

databend_aiserver/stages/operator.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,3 +249,25 @@ def load_stage_file(stage: StageLocation, path: str, *, on_missing: Callable[[st
249249

250250
def stage_file_suffix(path: str) -> str:
251251
return Path(path).suffix or ".bin"
252+
253+
254+
def resolve_full_path(stage_location: StageLocation, path: str) -> str:
255+
"""
256+
Resolve the full path (URI) of a file in the stage.
257+
Useful for returning the full S3 path in metadata.
258+
"""
259+
resolved_path = resolve_stage_subpath(stage_location, path)
260+
storage = stage_location.storage or {}
261+
storage_root = str(storage.get("root", "") or "")
262+
bucket = storage.get("bucket") or storage.get("name")
263+
264+
if storage_root.startswith("s3://"):
265+
base = storage_root.rstrip("/")
266+
return f"{base}/{resolved_path}"
267+
elif bucket:
268+
base = f"s3://{bucket}"
269+
if storage_root:
270+
base = f"{base}/{storage_root.strip('/')}"
271+
return f"{base}/{resolved_path}"
272+
273+
return resolved_path or path

databend_aiserver/udfs/docparse.py

Lines changed: 86 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import tempfile
2121
from pathlib import Path
2222
from time import perf_counter, perf_counter_ns
23-
from typing import Any, Dict, List, Optional, Protocol, Tuple
23+
from typing import Any, Dict, List, Optional, Protocol, Tuple, Sequence
2424

2525
from databend_udf import StageLocation, udf
2626
from docling.document_converter import DocumentConverter, PdfFormatOption
@@ -39,6 +39,7 @@
3939
load_stage_file,
4040
stage_file_suffix,
4141
resolve_stage_subpath,
42+
resolve_full_path,
4243
)
4344
from databend_aiserver.config import DEFAULT_EMBED_MODEL, DEFAULT_CHUNK_SIZE
4445

@@ -153,41 +154,67 @@ def _get_hf_tokenizer(model_name: str) -> HuggingFaceTokenizer:
153154
return _TOKENIZER_CACHE[model_name]
154155

155156

156-
def _resolve_full_path(stage_location: StageLocation, path: str) -> str:
157-
resolved_path = resolve_stage_subpath(stage_location, path)
158-
storage = stage_location.storage or {}
159-
storage_root = str(storage.get("root", "") or "")
160-
bucket = storage.get("bucket") or storage.get("name")
161-
162-
if storage_root.startswith("s3://"):
163-
base = storage_root.rstrip("/")
164-
return f"{base}/{resolved_path}"
165-
elif bucket:
166-
base = f"s3://{bucket}"
167-
if storage_root:
168-
base = f"{base}/{storage_root.strip('/')}"
169-
return f"{base}/{resolved_path}"
170-
171-
return resolved_path or path
172157

173158

174-
def _chunk_document(doc: Any) -> Tuple[List[Dict[str, Any]], bool]:
175-
"""Chunk the document and return pages/chunks and a fallback flag."""
176-
markdown = doc.export_to_markdown()
159+
160+
def _chunk_document(doc: Any) -> Tuple[List[Dict[str, Any]], int]:
161+
"""Chunk the document and return pages/chunks and total tokens."""
177162
tokenizer = _get_hf_tokenizer(DEFAULT_EMBED_MODEL)
178163
chunker = HybridChunker(tokenizer=tokenizer)
179164

180-
try:
181-
chunks = list(chunker.chunk(dl_doc=doc))
182-
if not chunks:
183-
return [{"index": 0, "content": markdown}], True
184-
185-
return [
186-
{"index": idx, "content": chunker.contextualize(chunk)}
187-
for idx, chunk in enumerate(chunks)
188-
], False
189-
except Exception:
190-
return [{"index": 0, "content": markdown}], True
165+
chunks = list(chunker.chunk(dl_doc=doc))
166+
if not chunks:
167+
raise ValueError("HybridChunker returned no chunks")
168+
169+
logger.info(
170+
"HybridChunker produced %d chunks. Merging to fit %d tokens...",
171+
len(chunks),
172+
DEFAULT_CHUNK_SIZE,
173+
)
174+
175+
merged_chunks = []
176+
current_chunk_text = ""
177+
current_tokens = 0
178+
total_tokens = 0
179+
delimiter = "\n\n"
180+
delimiter_tokens = tokenizer.count_tokens(delimiter)
181+
182+
for chunk in chunks:
183+
text = chunker.contextualize(chunk)
184+
text_tokens = tokenizer.count_tokens(text)
185+
186+
if current_tokens + delimiter_tokens + text_tokens > DEFAULT_CHUNK_SIZE and current_chunk_text:
187+
merged_chunks.append({
188+
"index": len(merged_chunks),
189+
"content": current_chunk_text,
190+
"tokens": current_tokens,
191+
})
192+
total_tokens += current_tokens
193+
current_chunk_text = text
194+
current_tokens = text_tokens
195+
else:
196+
if current_chunk_text:
197+
current_chunk_text += delimiter + text
198+
current_tokens += delimiter_tokens + text_tokens
199+
else:
200+
current_chunk_text = text
201+
current_tokens = text_tokens
202+
203+
if current_chunk_text:
204+
merged_chunks.append({
205+
"index": len(merged_chunks),
206+
"content": current_chunk_text,
207+
"tokens": current_tokens,
208+
})
209+
total_tokens += current_tokens
210+
211+
logger.info(
212+
"Merged chunks: %d -> %d",
213+
len(chunks),
214+
len(merged_chunks),
215+
)
216+
217+
return merged_chunks, total_tokens
191218

192219

193220
def _format_response(
@@ -196,31 +223,31 @@ def _format_response(
196223
pages: List[Dict[str, Any]],
197224
file_size: int,
198225
timings: Dict[str, float],
199-
fallback: bool
226+
num_tokens: int,
227+
error: Optional[Dict[str, str]] = None,
200228
) -> Dict[str, Any]:
201-
duration_ms = timings["total"]
202-
payload: Dict[str, Any] = {
203-
"metadata": {
204-
"chunk_count": len(pages),
205-
"chunk_size": DEFAULT_CHUNK_SIZE,
206-
"duration_ms": duration_ms,
207-
"file_size": file_size,
208-
"filename": Path(path).name,
209-
"path": full_path,
210-
"timings_ms": timings,
211-
"version": 1,
212-
},
229+
"""Format the response with a fixed template."""
230+
metadata = {
231+
"chunk_count": len(pages),
232+
"chunk_size": DEFAULT_CHUNK_SIZE,
233+
"duration_ms": timings.get("total", 0.0),
234+
"file_size": file_size,
235+
"filename": Path(path).name,
236+
"num_tokens": num_tokens,
237+
"path": full_path,
238+
"timings_ms": timings,
239+
"version": 1,
240+
}
241+
242+
response = {
243+
"metadata": metadata,
213244
"chunks": pages,
214245
}
215-
216-
if fallback:
217-
payload["error_information"] = [
218-
{
219-
"type": "ChunkingFallback",
220-
"message": "chunker failed or returned empty; returned full markdown instead",
221-
}
222-
]
223-
return payload
246+
247+
if error:
248+
response["error_information"] = [error]
249+
250+
return response
224251

225252

226253
@udf(
@@ -247,25 +274,26 @@ def ai_parse_document(stage_location: StageLocation, path: str) -> Dict[str, Any
247274
result, file_size = backend.convert(stage_location, path)
248275
t_convert_end_ns = perf_counter_ns()
249276

250-
pages, fallback = _chunk_document(result.document)
277+
pages, num_tokens = _chunk_document(result.document)
251278
t_chunk_end_ns = perf_counter_ns()
252279

253-
full_path = _resolve_full_path(stage_location, path)
280+
full_path = resolve_full_path(stage_location, path)
254281

255282
timings = {
256283
"convert": (t_convert_end_ns - t_convert_start_ns) / 1_000_000.0,
257284
"chunk": (t_chunk_end_ns - t_convert_end_ns) / 1_000_000.0,
258285
"total": (t_chunk_end_ns - t_total_ns) / 1_000_000.0,
259286
}
260287

261-
payload = _format_response(path, full_path, pages, file_size, timings, fallback)
288+
payload = _format_response(
289+
path, full_path, pages, file_size, timings, num_tokens
290+
)
262291

263292
logger.info(
264-
"ai_parse_document path=%s backend=%s chunks=%s fallback=%s duration_ms=%.1f",
293+
"ai_parse_document path=%s backend=%s chunks=%s duration_ms=%.1f",
265294
path,
266295
getattr(backend, "name", "unknown"),
267296
len(pages),
268-
fallback,
269297
timings["total"],
270298
)
271299
return payload

0 commit comments

Comments
 (0)