From f3906ff6d6539c55171464e724109c0c255097a4 Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Thu, 7 Aug 2025 14:13:24 +0200 Subject: [PATCH] add record descriptor name to writer exception message --- flow/record/adapter/elastic.py | 11 ++++++++- tests/test_elastic_adapter.py | 45 ++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/flow/record/adapter/elastic.py b/flow/record/adapter/elastic.py index d6c09324..2e8aafdf 100644 --- a/flow/record/adapter/elastic.py +++ b/flow/record/adapter/elastic.py @@ -1,6 +1,7 @@ from __future__ import annotations import hashlib +import json import logging import queue import threading @@ -279,7 +280,15 @@ def enrich_elastic_exception(exception: Exception) -> Exception: error_type = error_dict.get("type") error_reason = error_dict.get("reason", "") - errors.add(f"({status} {error_type} {error_reason})") + try: + data = json.loads(index_dict.get("data", "{}")) + record_metadata = data.get("_record_metadata", {}) + descriptor = record_metadata.get("descriptor", {}) + descriptor_name = descriptor.get("name", "unknown_descriptor") + except json.JSONDecodeError: + descriptor_name = "unknown_descriptor" + + errors.add(f"({descriptor_name}: {status} {error_type} {error_reason})") except Exception: errors.add("unable to extend errors") diff --git a/tests/test_elastic_adapter.py b/tests/test_elastic_adapter.py index 6df5384b..07d30c25 100644 --- a/tests/test_elastic_adapter.py +++ b/tests/test_elastic_adapter.py @@ -4,6 +4,7 @@ from typing import TYPE_CHECKING import pytest +from elasticsearch.helpers import BulkIndexError from flow.record import RecordDescriptor from flow.record.adapter.elastic import ElasticWriter @@ -57,3 +58,47 @@ def test_elastic_writer_metadata(record: Record) -> None: } ), } + + +def test_elastic_writer_metadata_exception() -> None: + with ElasticWriter(uri="elasticsearch:9200") as writer: + writer.excepthook( + BulkIndexError( + "1 document(s) failed to index.", + errors=[ + { + "index": { + "_index": "example-index", + "_id": "bWFkZSB5b3UgbG9vayDwn5GA", + "status": 400, + "error": { + "type": "document_parsing_exception", + "reason": "[1:225] failed to parse field [example] of type [long] in document with id " + "'bWFkZSB5b3UgbG9vayDwn5GA'. Preview of field's value: 'Foo'", + "caused_by": { + "type": "illegal_argument_exception", + "reason": 'For input string: "Foo"', + }, + }, + "data": '{"example":"Foo","_record_metadata":{"descriptor":{"name":"example/record",' + '"hash":1234567890},"source":"/path/to/source","classification":null,' + '"generated":"2025-12-31T12:34:56.789012+00:00","version":1}}', + } + } + ], + ) + ) + + assert writer.exception.args == ( + ( + "1 document(s) failed to index. (example/record: 400 " + "document_parsing_exception [1:225] failed to parse field " + "[example] of type [long] in document with id 'bWFkZSB5b3UgbG9vayDwn5GA'. " + "Preview of field's value: 'Foo')" + ), + ) + + with pytest.raises(BulkIndexError): + writer.__exit__() + + writer.exception = None