From dbc6fa32379bb22753c266717f558e7a76c9cd88 Mon Sep 17 00:00:00 2001 From: Br1an67 <932039080@qq.com> Date: Mon, 2 Mar 2026 00:48:42 +0800 Subject: [PATCH] fix: avoid releasing un-acquired lock during blob rotation Extract blob rotation logic into a dedicated _rotate_blob() method instead of calling self.__init__() which re-initializes the logging.Handler and its lock, causing 'cannot release un-acquired lock' error after hitting 25k rows. Fixes #2170 --- .../graphrag/logger/blob_workflow_logger.py | 34 +++---- tests/unit/test_blob_workflow_logger.py | 97 +++++++++++++++++++ 2 files changed, 113 insertions(+), 18 deletions(-) create mode 100644 tests/unit/test_blob_workflow_logger.py diff --git a/packages/graphrag/graphrag/logger/blob_workflow_logger.py b/packages/graphrag/graphrag/logger/blob_workflow_logger.py index fd7ccac4f..c5261a268 100644 --- a/packages/graphrag/graphrag/logger/blob_workflow_logger.py +++ b/packages/graphrag/graphrag/logger/blob_workflow_logger.py @@ -56,18 +56,10 @@ def __init__( credential=DefaultAzureCredential(), ) - if blob_name == "": - blob_name = f"report/{datetime.now(tz=timezone.utc).strftime('%Y-%m-%d-%H:%M:%S:%f')}.logs.json" - - self._blob_name = str(Path(base_dir or "") / blob_name) self._container_name = container_name - self._blob_client = self._blob_service_client.get_blob_client( - self._container_name, self._blob_name - ) - if not self._blob_client.exists(): - self._blob_client.create_append_blob() + self._base_dir = base_dir - self._num_blocks = 0 # refresh block counter + self._rotate_blob(blob_name or None) def emit(self, record) -> None: """Emit a log record to blob storage.""" @@ -98,17 +90,23 @@ def _get_log_type(self, level: int) -> str: return "warning" return "log" + def _rotate_blob(self, blob_name: str | None = None): + """Create a new blob file when the current one reaches max block count.""" + if not blob_name: + blob_name = f"report/{datetime.now(tz=timezone.utc).strftime('%Y-%m-%d-%H:%M:%S:%f')}.logs.json" + self._blob_name = str(Path(self._base_dir or "") / blob_name) + self._blob_client = self._blob_service_client.get_blob_client( + self._container_name, self._blob_name + ) + if not self._blob_client.exists(): + self._blob_client.create_append_blob() + self._num_blocks = 0 + def _write_log(self, log: dict[str, Any]): """Write log data to blob storage.""" # create a new file when block count hits close 25k - if ( - self._num_blocks >= self._max_block_count - ): # Check if block count exceeds 25k - self.__init__( - self._connection_string, - self._container_name, - account_url=self.account_url, - ) + if self._num_blocks >= self._max_block_count: + self._rotate_blob() blob_client = self._blob_service_client.get_blob_client( self._container_name, self._blob_name diff --git a/tests/unit/test_blob_workflow_logger.py b/tests/unit/test_blob_workflow_logger.py new file mode 100644 index 000000000..c7ae4ce99 --- /dev/null +++ b/tests/unit/test_blob_workflow_logger.py @@ -0,0 +1,97 @@ +# Copyright (c) 2024 Microsoft Corporation. +# Licensed under the MIT License + +"""Tests for BlobWorkflowLogger blob rotation logic.""" + +import json +from unittest.mock import MagicMock, patch + +import pytest + +from graphrag.logger.blob_workflow_logger import BlobWorkflowLogger + + +@pytest.fixture +def mock_blob_service(): + """Create a mock BlobServiceClient and related objects.""" + with patch( + "graphrag.logger.blob_workflow_logger.BlobServiceClient" + ) as mock_bsc_cls: + mock_blob_client = MagicMock() + mock_blob_client.exists.return_value = False + + mock_bsc = MagicMock() + mock_bsc.get_blob_client.return_value = mock_blob_client + + mock_bsc_cls.return_value = mock_bsc + + yield mock_bsc, mock_blob_client + + +@pytest.fixture +def mock_credential(): + """Mock DefaultAzureCredential.""" + with patch( + "graphrag.logger.blob_workflow_logger.DefaultAzureCredential" + ) as mock_cred: + yield mock_cred + + +def test_rotate_blob_does_not_reinitialize_handler( + mock_blob_service, mock_credential +): + """Test that blob rotation does not call __init__ on the handler. + + This verifies the fix for issue #2170 where calling self.__init__() + during rotation caused 'cannot release un-acquired lock' errors. + """ + mock_bsc, mock_blob_client = mock_blob_service + + logger = BlobWorkflowLogger( + connection_string=None, + container_name="test-container", + blob_name="test.logs.json", + base_dir="logs", + account_url="https://test.blob.core.windows.net", + ) + + # Simulate reaching max block count + logger._num_blocks = logger._max_block_count + + # Store reference to the original lock to verify it's not replaced + original_lock = logger.lock + + # Write a log entry, which should trigger rotation + logger._write_log({"type": "log", "data": "test message"}) + + # Verify the lock was NOT replaced (i.e., __init__ was not called) + assert logger.lock is original_lock + + # Verify block counter was reset (rotation happened) + # After rotation (reset to 0) + 1 write = 1 + assert logger._num_blocks == 1 + + # Verify a new blob client was created during rotation + assert mock_bsc.get_blob_client.call_count > 1 + + +def test_rotate_blob_creates_new_blob_name(mock_blob_service, mock_credential): + """Test that rotation generates a new blob name.""" + mock_bsc, mock_blob_client = mock_blob_service + + logger = BlobWorkflowLogger( + connection_string=None, + container_name="test-container", + blob_name="test.logs.json", + base_dir="logs", + account_url="https://test.blob.core.windows.net", + ) + + original_blob_name = logger._blob_name + + # Trigger rotation + logger._rotate_blob() + + # New blob name should be different (auto-generated with timestamp) + assert logger._blob_name != original_blob_name + assert logger._num_blocks == 0