From 18e63557ddd9d0156a5e9b1e9c0040419fc2b1b6 Mon Sep 17 00:00:00 2001 From: Renan Souza Date: Tue, 3 May 2022 18:02:26 -0300 Subject: [PATCH 01/19] Updating for release Signed-off-by: Raphael Melo Thiago --- README.md | 12 +- requirements.txt | 2 - setup.py | 6 +- src/provlake/utils/__deprecated.py | 262 ----------------------------- version | 2 +- 5 files changed, 14 insertions(+), 270 deletions(-) delete mode 100644 src/provlake/utils/__deprecated.py diff --git a/README.md b/README.md index eaf23ff..3d6b1a2 100644 --- a/README.md +++ b/README.md @@ -4,10 +4,10 @@ This is part of ProvLake Project. See [ibm.biz/provlake](http://ibm.biz/provlake ## Installation -`pip install git+git://github.com/IBM/multi-data-lineage-capture-py.git` +`pip install provlake` -### A Python lib to capture multiworkflow provenance data from Python Scripts +### A Python lib to capture data processed by multiple workflows using provenance Use this library for code instrumentation to collect provenance data of function calls in a script. Input arguments or output values from functions can come from distributed data storages, including file systems and database systems. @@ -52,7 +52,7 @@ If you prefer, you can use it without context management: ```python from provlake import ProvLake from provlake.capture import ProvWorkflow, ProvTask -prov = ProvLake.get_persister("factorial_dataflow_without_ctx_mgmt") +prov = ProvLake.get_persister("factorial_dataflow") prov_workflow = ProvWorkflow(prov) prov_workflow.begin() @@ -67,3 +67,9 @@ prov_task.end(out_args) prov_workflow.end() ``` + +By executing this file, the library generates raw provenance data in JSON format. If you don't specify the backend using the `service_url` argument in the `ProvLake.get_persister` builder, the JSON provenance data will be dumped to a local file, updated during workflow execution. +Users can analyze the file using their preferred tools. + +To use the analytics tools provided by IBM, such as the Knowledge Exploration System (KES) tool or run more complex queries, users need to use the ProvLake backend service. ProvLake backend processes the JSON provenance data and transforms it into a knowledge graph using W3C standards (PROV, OWL, and RDF) and stores in a knowledge graph system. This backend service is proprietary by IBM. Please contact {rfsouza, lga, rcerq, mstelmar}@br.ibm.com for more information. + diff --git a/requirements.txt b/requirements.txt index e3f747a..dd2be4f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,3 @@ -pyyaml requests-futures numpy pandas - diff --git a/setup.py b/setup.py index 42f147a..f84acb5 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,9 @@ setup(name='provlake', version=VERSION, - description='A Python lib to capture multiworkflow provenance data from Python Scripts', + description='A Python lib to capture multiworkflow provenance data', + long_description='ProvLake is a system that captures and integrates data processed by multiple workflows using provenance data.', + long_description_content_type='text/x-rst', url='http://ibm.biz/provlake', author='IBM Research', author_email='rfsouza@br.ibm.com', @@ -16,6 +18,6 @@ install_requires=requires, package_dir={'': 'src'}, packages=find_packages(where='src'), - python_requires='>=3.6', + python_requires='>=3.6,<3.9', include_package_data=True, zip_safe=False) diff --git a/src/provlake/utils/__deprecated.py b/src/provlake/utils/__deprecated.py deleted file mode 100644 index 53c8178..0000000 --- a/src/provlake/utils/__deprecated.py +++ /dev/null @@ -1,262 +0,0 @@ - -""" -##### -# Exemplary Content of a prov.yaml: -##### -context_name: seismic_pipelines -dataflow_name: - train: seismic_stratigraphic_train - create_data: seismic_stratigraphic_create_training_data - analyze_seismic: seismic_stratigraphic_analyze_seismic -file_storage: -with_validation: false -should_send_to_file: true -log_dir: . -align_terms: - segy_path: seismic_file - horizons_paths: horizon_file -graph_uri: -stringify_params: ["unite_category"] -not_tracked_params: ["prov", "prov_path", "input_path", "log_level", "dataset", "prov_config", "data_path" ] -bag_size: 1 -service: http://localhost:5000 -should_send_to_service: false - log_level: info -""" - -class MLSemantics(Enum): - hyper_parameter = "hyper_parameter" - data_characteristic = "data_characteristic" - dataset = "dataset" - evaluation_measure = "evaluation_measure" - model = "model" - - def __str__(self): - return self.value - - -class DTYPE(Enum): - int = "integer" - str = "string" - bool = "boolean" - list = "list" - float = "float" - complex = "complex_attribute" - any = "any" - - def __str__(self): - return self.value - - -class ProvConf: - conf_data: dict = None - custom_attributes: dict = None - - def __init__(self, prov_conf_path: str = None): - if not prov_conf_path: - logger.warning("[Prov] You are not capturing provenance.") - return - with open(prov_conf_path, 'r') as stream: - try: - ProvConf.conf_data = yaml.safe_load(stream) - except yaml.YAMLError as exc: - raise exc - - if "custom_attributes" in ProvConf.conf_data: - with open(ProvConf.conf_data["custom_attributes"], 'r') as stream: - try: - ProvConf.custom_attributes = yaml.safe_load(stream) - except yaml.YAMLError as exc: - raise exc - - -def get_dtype_from_val(value, should_stringfy=False) -> str: - if should_stringfy: - return DTYPE.str - elif type(value) == str: - return DTYPE.str - elif type(value) == list: - return DTYPE.list - elif type(value) == int: - return DTYPE.int - elif type(value) == float: - return DTYPE.float - elif type(value) == bool: - return DTYPE.bool - elif type(value) == dict: - return DTYPE.complex - else: - return DTYPE.any - - -def get_dtype_from_type(_type: type, should_stringfy=False) -> str: - if should_stringfy: - return str(DTYPE.str) - elif _type == str: - return str(DTYPE.str) - elif _type == list: - return str(DTYPE.list) - elif _type == int: - return str(DTYPE.int) - elif _type == float: - return str(DTYPE.float) - elif _type == bool: - return str(DTYPE.bool) - elif _type == dict: - return str(DTYPE.complex) - else: - return "any" - - -def build_generic_prospective(dataflow_name: str): - prospective_prov = dict() - prospective_prov["dataflow_name"] = ProvConf.conf_data["dataflow_name"][dataflow_name] - prospective_prov["context_name"] = ProvConf.conf_data["context_name"] - prospective_prov["storages"] = {"main_filesystem": {"type": "PhysicalMachine"}} - prospective_prov["data_transformations"] = {} - storage_configuration = { - "configuration": { - "storages": { - "main_filesystem": { - "type": "PhysicalMachine", - "host_address": ProvConf.conf_data["file_storage"] - } - } - } - } - return prospective_prov, storage_configuration - - -# @Deprecated -- this is going to be depricated. Please avoid using it. -def build_prov_input_from_dict(dict_params: dict): - retrospective_input_prov = {} - attributes = [] - - for key in dict_params: - # Remove params not tracked: - if "not_tracked_params" in ProvConf.conf_data and key in ProvConf.conf_data["not_tracked_params"]: - continue - - value = dict_params[key] - if value is None or (value is not None and value == ''): - continue - - # Renaming param names in case we need: - attr_name = key - if "align_terms" in ProvConf.conf_data and key in ProvConf.conf_data["align_terms"]: - attr_name = ProvConf.conf_data["align_terms"][key] - - should_stringify = __get_should_stringify(attr_name) - if should_stringify: - retrospective_input_prov[attr_name] = str(value) - else: - retrospective_input_prov[attr_name] = value - attr = { - "name": attr_name, - #"description": "" - } - if "path" in attr_name or "file" in attr_name: - attr["semantics"] = "FILE_REFERENCE" - attr["storage_references"] = { - "key": "main_filesystem" - } - else: - attr["semantics"] = "PARAMETER" - attr["ml_semantics"] = str(MLSemantics.hyper_parameter) - - # if "_slices" in attr_name: - # # we have a special case for slices in a different method - # continue - - dtype = get_dtype_from_val(value, should_stringify) - if dtype == "list": - attr["dtype"] = dtype - if "REFERENCE" not in attr["semantics"] and len(value) > 0: - attr["elementdtype"] = str(get_dtype_from_val(value[0], should_stringify)) - elif "REFERENCE" not in attr["semantics"]: - attr["dtype"] = dtype - - attributes.append(attr) - - return attributes, retrospective_input_prov - - -def __get_should_stringify(key): - if "stringify_params" not in ProvConf.conf_data: - return False - if key not in ProvConf.conf_data["stringify_params"]: - return False - return True - - -def get_prospective_attribute(key, value): - ml_semantics = None - - if type(value) == type: - dtype = get_dtype_from_type(value) - elif type(value) == DTYPE: - dtype = value - elif type(value) == dict: - dtype = value.get("dtype", "any") - ml_semantics = value.get("ml_semantics", None) - else: - dtype = get_dtype_from_val(value) - - attr = { - "name": key, - "dtype": str(dtype) - } - - if ml_semantics: - attr["ml_semantics"] = str(ml_semantics) - - if "path" in key or "file" in key: - attr["semantics"] = "FILE_REFERENCE" - attr["storage_references"] = {"key": "main_filesystem"} - - if dtype == DTYPE.list: - if len(value) > 0: - attr["elementdtype"] = str(get_dtype_from_val(value[0])) - elif dtype == DTYPE.complex: - attr["attributes"] = get_prospective_from_args(value) - - return attr - - -def get_prospective_from_args(prov_args: dict) -> list: - args_schema = [] - - for k in prov_args: - # removing unwanted attributes - if k == "self" or k.startswith('_') or "prov" in k: - continue - - attr = get_prospective_attribute(k, prov_args[k]) - args_schema.append(attr) - return args_schema - - -def build_prov_for_transformation(prospective_prov: dict, transformation): - # IN - prospective_in, prospective_out = [], [] - if hasattr(transformation, "static_prospective_prov_attributes_in") \ - and transformation.static_prospective_prov_attributes_in: - prospective_in = transformation.static_prospective_prov_attributes_in - elif transformation.static_schema_args_in: - prospective_in = get_prospective_from_args(transformation.static_schema_args_in) - else: - transformation.static_schema_args_in = transformation.get_static_schema_args_from_init() - prospective_in = get_prospective_from_args(transformation.static_schema_args_in) - - # OUT - if transformation.static_schema_args_out: - prospective_out = get_prospective_from_args(transformation.static_schema_args_out) - # elif hasattr(transformation, "prospective_args_out") and transformation.prospective_args_out: - # prospective_out = transformation.prospective_args_out - prospective_prov["data_transformations"].update({ - transformation.dt_name(): { - "input_datasets": [{"attributes": prospective_in}], - "output_datasets": [{"attributes": prospective_out}] - } - }) - return prospective_prov \ No newline at end of file diff --git a/version b/version index c2f0a83..8c60186 100644 --- a/version +++ b/version @@ -1 +1 @@ -0.2.98 +0.2.982 From 01eef33712c81905e5ca3e6cccea00c9019b0d5f Mon Sep 17 00:00:00 2001 From: Raphael Melo Thiago Date: Wed, 25 May 2022 18:01:56 -0300 Subject: [PATCH 02/19] Changing setup to allow python 3.9 to be used Signed-off-by: Raphael Melo Thiago --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index f84acb5..df3990b 100644 --- a/setup.py +++ b/setup.py @@ -18,6 +18,6 @@ install_requires=requires, package_dir={'': 'src'}, packages=find_packages(where='src'), - python_requires='>=3.6,<3.9', + python_requires='>=3.6,<=3.9', include_package_data=True, zip_safe=False) From 35dcd7dc141c18e5479a60aa0dcf60c0ed78306f Mon Sep 17 00:00:00 2001 From: Raphael Melo Thiago Date: Wed, 25 May 2022 18:15:40 -0300 Subject: [PATCH 03/19] U Signed-off-by: Raphael Melo Thiago --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index df3990b..3fc5745 100644 --- a/setup.py +++ b/setup.py @@ -18,6 +18,6 @@ install_requires=requires, package_dir={'': 'src'}, packages=find_packages(where='src'), - python_requires='>=3.6,<=3.9', + python_requires='>=3.6,<=3.9.x', include_package_data=True, zip_safe=False) From 43bf5ea82e70c82fb330569d1787d260c49f32dd Mon Sep 17 00:00:00 2001 From: Raphael Melo Thiago Date: Wed, 25 May 2022 18:20:56 -0300 Subject: [PATCH 04/19] U Signed-off-by: Raphael Melo Thiago --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 3fc5745..861350b 100644 --- a/setup.py +++ b/setup.py @@ -18,6 +18,6 @@ install_requires=requires, package_dir={'': 'src'}, packages=find_packages(where='src'), - python_requires='>=3.6,<=3.9.x', + python_requires='>=3.6,<=3.9.7', include_package_data=True, zip_safe=False) From f805c06e913f91a35a7ff413df4d2eb60232ad4c Mon Sep 17 00:00:00 2001 From: Valesca Moura Date: Tue, 20 Dec 2022 15:42:24 -0300 Subject: [PATCH 05/19] Added py.typed Signed-off-by: Valesca Moura Signed-off-by: Raphael Melo Thiago --- src/provlake/py.typed | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 src/provlake/py.typed diff --git a/src/provlake/py.typed b/src/provlake/py.typed new file mode 100644 index 0000000..e69de29 From 403d28ecd82f68f6f8290580adbf73a126bbd148 Mon Sep 17 00:00:00 2001 From: Valesca Moura Date: Wed, 11 Jan 2023 09:15:58 -0300 Subject: [PATCH 06/19] Added MANIFEST.in Signed-off-by: Valesca Moura Signed-off-by: valescamoura Signed-off-by: Raphael Melo Thiago --- MANIFEST.in | 1 + 1 file changed, 1 insertion(+) create mode 100644 MANIFEST.in diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..ff70094 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +global-include py.typed \ No newline at end of file From e21d24b315ff80e788947d9f5acc6076fa1f563a Mon Sep 17 00:00:00 2001 From: Valesca Moura Date: Wed, 11 Jan 2023 09:19:13 -0300 Subject: [PATCH 07/19] Added MANIFEST.in Signed-off-by: Valesca Moura Signed-off-by: valescamoura Signed-off-by: Raphael Melo Thiago --- MANIFEST.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/MANIFEST.in b/MANIFEST.in index ff70094..e03a7fe 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1 +1 @@ -global-include py.typed \ No newline at end of file +global-include py.typed \ No newline at end of file From 351a67bb4c1476cc3b265c2aeae8df81649c3c86 Mon Sep 17 00:00:00 2001 From: Valesca Moura <71470557+valescamoura@users.noreply.github.com> Date: Wed, 11 Jan 2023 09:59:47 -0300 Subject: [PATCH 08/19] Update version Signed-off-by: valescamoura Signed-off-by: Raphael Melo Thiago --- version | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version b/version index 8c60186..73e44b1 100644 --- a/version +++ b/version @@ -1 +1 @@ -0.2.982 +0.2.983 From 035504253a86652e423420f79fe6bc26c9f950c9 Mon Sep 17 00:00:00 2001 From: valescamoura Date: Wed, 11 Jan 2023 10:33:41 -0300 Subject: [PATCH 09/19] Update version Signed-off-by: valescamoura Signed-off-by: Raphael Melo Thiago --- version | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version b/version index 73e44b1..28fbf1a 100644 --- a/version +++ b/version @@ -1 +1 @@ -0.2.983 +0.2.983 \ No newline at end of file From 7e8744d4f3e9e2f257209e3f55f51adbc988cce2 Mon Sep 17 00:00:00 2001 From: Raphael Melo Thiago Date: Mon, 10 Apr 2023 13:56:12 -0300 Subject: [PATCH 10/19] Allowing module to be used in python 3.11 Signed-off-by: Raphael Melo Thiago --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 861350b..72b4e7b 100644 --- a/setup.py +++ b/setup.py @@ -18,6 +18,6 @@ install_requires=requires, package_dir={'': 'src'}, packages=find_packages(where='src'), - python_requires='>=3.6,<=3.9.7', + python_requires='>=3.6,<=3.11', include_package_data=True, zip_safe=False) From 7c1865ab4bafed25f4c3a66162b923168fd99642 Mon Sep 17 00:00:00 2001 From: Raphael Melo Thiago Date: Mon, 24 Apr 2023 09:55:52 -0300 Subject: [PATCH 11/19] fix: flexibilizing python version to any 3.11.* --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 72b4e7b..b03546d 100644 --- a/setup.py +++ b/setup.py @@ -18,6 +18,6 @@ install_requires=requires, package_dir={'': 'src'}, packages=find_packages(where='src'), - python_requires='>=3.6,<=3.11', + python_requires='>=3.6,<=3.11.*', include_package_data=True, zip_safe=False) From 0c3a59e188fc5ac73a249b428aa1080ad14705c0 Mon Sep 17 00:00:00 2001 From: Raphael Melo Thiago Date: Mon, 24 Apr 2023 09:57:42 -0300 Subject: [PATCH 12/19] fix: invalid python version format --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index b03546d..1e40629 100644 --- a/setup.py +++ b/setup.py @@ -18,6 +18,6 @@ install_requires=requires, package_dir={'': 'src'}, packages=find_packages(where='src'), - python_requires='>=3.6,<=3.11.*', + python_requires='>=3.6,<3.12', include_package_data=True, zip_safe=False) From c1c22c4a78f2fc031ec007576026ad1535a632c7 Mon Sep 17 00:00:00 2001 From: Raphael Melo Thiago Date: Tue, 10 Sep 2024 16:51:32 -0300 Subject: [PATCH 13/19] feat: synchronous post --- src/provlake/__init__.py | 16 ++++++++++------ src/provlake/persistence/managed_persister.py | 5 ++++- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/provlake/__init__.py b/src/provlake/__init__.py index ff84372..d4358db 100644 --- a/src/provlake/__init__.py +++ b/src/provlake/__init__.py @@ -22,7 +22,8 @@ def _build_managed_persister( context: str, with_validation: bool, db_name: str, - log_file_path: str + log_file_path: str, + synchronous: bool, ) -> ManagedPersister: should_send_to_service = False if service_url is not None: @@ -45,7 +46,8 @@ def _build_managed_persister( should_send_to_file=should_send_to_file, log_dir=log_dir, log_file_path=log_file_path, - should_send_to_service=should_send_to_service) + should_send_to_service=should_send_to_service, + synchronous=synchronous,) @staticmethod def _build_unmanaged_persister(log_file_path:str) -> UnmanagedPersister: @@ -56,14 +58,15 @@ def _build_unmanaged_persister(log_file_path:str) -> UnmanagedPersister: def get_persister( log_file_path=None, managed_persistence=True, - context: str = None, + context: str | None = None, with_validation: bool = False, log_level: str = 'error', should_send_to_file=True, log_dir='.', service_url=None, bag_size=None, - db_name: str = None + db_name: str | None = None, + synchronous: bool = False, ) -> Persister: if ProvLake._persister_singleton_instance is None: @@ -81,11 +84,12 @@ def get_persister( context=context, with_validation=with_validation, db_name=db_name, - log_file_path=log_file_path + log_file_path=log_file_path, + synchronous=synchronous, ) else: persister = ProvLake._build_unmanaged_persister( - log_file_path + log_file_path, ) ProvLake._persister_singleton_instance = persister return ProvLake._persister_singleton_instance diff --git a/src/provlake/persistence/managed_persister.py b/src/provlake/persistence/managed_persister.py index 1e5e28a..0bf080b 100644 --- a/src/provlake/persistence/managed_persister.py +++ b/src/provlake/persistence/managed_persister.py @@ -23,7 +23,7 @@ class ManagedPersister(Persister): def __init__(self, log_file_path: str, service_url: str, wf_exec_id=None, context: str = None, with_validation: bool = False, db_name: str = None, bag_size: int = 1, - log_dir: str = '.', should_send_to_file: bool = False, should_send_to_service: bool = True, + log_dir: str = '.', should_send_to_file: bool = False, should_send_to_service: bool = True, synchronous: bool = False ): super().__init__(log_file_path) self.retrospective_url = urljoin(service_url, "retrospective-provenance") @@ -35,6 +35,7 @@ def __init__(self, log_file_path: str, service_url: str, wf_exec_id=None, contex self.bag_size = bag_size self.should_send_to_service = should_send_to_service self.should_send_to_file = should_send_to_file + self.synchronous = synchronous self._session = None self._offline_prov_log = None @@ -121,6 +122,8 @@ def _flush(self, all_and_wait: bool = False): def _send_to_service(self, to_flush: List[dict]): params = {"with_validation": str(self.with_validation), "db_name": self.db_name} + if self.synchronous: + params["synchronous"] = "true" try: logger.debug("[Prov-Persistence]" + json.dumps(to_flush)) # TODO: check whether we need this result() below From dad22180dcdf87fbf8f4e87751b7c19867519518 Mon Sep 17 00:00:00 2001 From: Raphael Melo Thiago Date: Wed, 11 Sep 2024 11:45:12 -0300 Subject: [PATCH 14/19] fix: making storage more robust --- src/provlake/persistence/managed_persister.py | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/src/provlake/persistence/managed_persister.py b/src/provlake/persistence/managed_persister.py index 0bf080b..758040b 100644 --- a/src/provlake/persistence/managed_persister.py +++ b/src/provlake/persistence/managed_persister.py @@ -23,7 +23,8 @@ class ManagedPersister(Persister): def __init__(self, log_file_path: str, service_url: str, wf_exec_id=None, context: str = None, with_validation: bool = False, db_name: str = None, bag_size: int = 1, - log_dir: str = '.', should_send_to_file: bool = False, should_send_to_service: bool = True, synchronous: bool = False + log_dir: str = '.', should_send_to_file: bool = False, should_send_to_service: bool = True, synchronous: bool = False, + retries_on_connection_error: int = 5 ): super().__init__(log_file_path) self.retrospective_url = urljoin(service_url, "retrospective-provenance") @@ -36,6 +37,7 @@ def __init__(self, log_file_path: str, service_url: str, wf_exec_id=None, contex self.should_send_to_service = should_send_to_service self.should_send_to_file = should_send_to_file self.synchronous = synchronous + self.retries_on_connection_error = retries_on_connection_error self._session = None self._offline_prov_log = None @@ -124,22 +126,28 @@ def _send_to_service(self, to_flush: List[dict]): params = {"with_validation": str(self.with_validation), "db_name": self.db_name} if self.synchronous: params["synchronous"] = "true" - try: - logger.debug("[Prov-Persistence]" + json.dumps(to_flush)) - # TODO: check whether we need this result() below - r = self.session.post(self.retrospective_url, json=to_flush, params=params, verify=False).result() - except ConnectionError as ex: - logger.error( - "[Prov][ConnectionError] There is a communication error between client and server -> " + str(ex)) - r = None - pass - except Exception as ex: - traceback.print_exc() - logger.error( - "[Prov] Unexpected exception while adding retrospective provenance: " + type(ex).__name__ - + "->" + str(ex)) - r = None - pass + r = None + for i in range(self.retries_on_connection_error): + try: + logger.debug("[Prov-Persistence] [Retry#" + str(i) + "]" + json.dumps(to_flush)) + # TODO: check whether we need this result() below + r = self.session.post(self.retrospective_url, json=to_flush, params=params, verify=False).result() + break + except ConnectionError as ex: + logger.error( + "[Prov][ConnectionError] There is a communication error between client and server -> " + str(ex)) + sleep_for = (2 ** i) + logging.debug(f'Exponential backoff ({sleep_for})') + sleep(sleep_for) + r = None + pass + except Exception as ex: + traceback.print_exc() + logger.error( + "[Prov] Unexpected exception while adding retrospective provenance: " + type(ex).__name__ + + "->" + str(ex)) + r = None + pass # If requests were validated, check for errors if r and self.with_validation: self._log_validation_message(r) From 0e44da41cb3b1856f4cfc981c63c3c1e40ce501b Mon Sep 17 00:00:00 2001 From: Raphael Melo Thiago Date: Wed, 11 Sep 2024 11:54:15 -0300 Subject: [PATCH 15/19] chore: increasing lib send request retries and improved logging for cases where retrying failed --- src/provlake/persistence/managed_persister.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/provlake/persistence/managed_persister.py b/src/provlake/persistence/managed_persister.py index 758040b..92e8100 100644 --- a/src/provlake/persistence/managed_persister.py +++ b/src/provlake/persistence/managed_persister.py @@ -24,7 +24,7 @@ class ManagedPersister(Persister): def __init__(self, log_file_path: str, service_url: str, wf_exec_id=None, context: str = None, with_validation: bool = False, db_name: str = None, bag_size: int = 1, log_dir: str = '.', should_send_to_file: bool = False, should_send_to_service: bool = True, synchronous: bool = False, - retries_on_connection_error: int = 5 + retries_on_connection_error: int = 8 ): super().__init__(log_file_path) self.retrospective_url = urljoin(service_url, "retrospective-provenance") @@ -147,7 +147,9 @@ def _send_to_service(self, to_flush: List[dict]): "[Prov] Unexpected exception while adding retrospective provenance: " + type(ex).__name__ + "->" + str(ex)) r = None - pass + break + if r is None: + logger.info('Could not process provenance trace {}'.format(json.dumps(to_flush))) # If requests were validated, check for errors if r and self.with_validation: self._log_validation_message(r) From e91f2d8b2030c96288c2546be71cc154c78a0c11 Mon Sep 17 00:00:00 2001 From: Raphael Melo Thiago Date: Wed, 11 Sep 2024 12:35:23 -0300 Subject: [PATCH 16/19] fix: using good ol' requests --- src/provlake/persistence/managed_persister.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/provlake/persistence/managed_persister.py b/src/provlake/persistence/managed_persister.py index 92e8100..9fe68dc 100644 --- a/src/provlake/persistence/managed_persister.py +++ b/src/provlake/persistence/managed_persister.py @@ -5,6 +5,7 @@ import traceback from time import sleep import urllib3 +import requests from requests.exceptions import ConnectionError from requests_futures.sessions import FuturesSession @@ -131,7 +132,10 @@ def _send_to_service(self, to_flush: List[dict]): try: logger.debug("[Prov-Persistence] [Retry#" + str(i) + "]" + json.dumps(to_flush)) # TODO: check whether we need this result() below - r = self.session.post(self.retrospective_url, json=to_flush, params=params, verify=False).result() + if self.synchronous: + r = requests.post(self.retrospective_url, json=to_flush, params=params, verify=False) + else: + r = self.session.post(self.retrospective_url, json=to_flush, params=params, verify=False).result() break except ConnectionError as ex: logger.error( From a86b21836cdc5dd9a171817892fcdd9d45b213b3 Mon Sep 17 00:00:00 2001 From: Raphael Melo Thiago Date: Wed, 11 Sep 2024 15:17:56 -0300 Subject: [PATCH 17/19] chore: increase store retries --- src/provlake/persistence/managed_persister.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/provlake/persistence/managed_persister.py b/src/provlake/persistence/managed_persister.py index 9fe68dc..e162f4d 100644 --- a/src/provlake/persistence/managed_persister.py +++ b/src/provlake/persistence/managed_persister.py @@ -25,7 +25,7 @@ class ManagedPersister(Persister): def __init__(self, log_file_path: str, service_url: str, wf_exec_id=None, context: str = None, with_validation: bool = False, db_name: str = None, bag_size: int = 1, log_dir: str = '.', should_send_to_file: bool = False, should_send_to_service: bool = True, synchronous: bool = False, - retries_on_connection_error: int = 8 + retries_on_connection_error: int = 10 ): super().__init__(log_file_path) self.retrospective_url = urljoin(service_url, "retrospective-provenance") From b28bbc1dfe37247da9ba31c63c4cfe5a78da1ad3 Mon Sep 17 00:00:00 2001 From: Raphael Melo Thiago Date: Wed, 16 Apr 2025 17:54:50 -0300 Subject: [PATCH 18/19] feat: patch dte's custom metadata --- src/provlake/capture/__init__.py | 7 +++++++ src/provlake/persistence/managed_persister.py | 12 ++++++++++++ src/provlake/persistence/persister.py | 2 ++ 3 files changed, 21 insertions(+) diff --git a/src/provlake/capture/__init__.py b/src/provlake/capture/__init__.py index c4ce681..01e64ad 100644 --- a/src/provlake/capture/__init__.py +++ b/src/provlake/capture/__init__.py @@ -177,6 +177,9 @@ def end(self, output_args=dict(), stdout=None, stderr=None, end_time: float = No self.prov_obj.dt_name + " args: " + str(self.prov_obj.values)) return None + def patch_custom_metadata(self, custom_metadata: dict): + self._prov_persister.patch_custom_metadata(self, custom_metadata) + def __enter__(self): self.begin() return self @@ -186,6 +189,10 @@ def __exit__(self, *args): # There is no output, but end of task should be recorded anyway. self.end() + @property + def dte_id(self, ) -> str: + return f'{self.prov_obj.workflow_name}_exec_{self.prov_obj.wf_exec_id}_{self.prov_obj.dt_name}_{self.prov_obj.task_id}' + class ProvCycle(ActivityCapture): diff --git a/src/provlake/persistence/managed_persister.py b/src/provlake/persistence/managed_persister.py index e162f4d..4ea4d24 100644 --- a/src/provlake/persistence/managed_persister.py +++ b/src/provlake/persistence/managed_persister.py @@ -28,6 +28,7 @@ def __init__(self, log_file_path: str, service_url: str, wf_exec_id=None, contex retries_on_connection_error: int = 10 ): super().__init__(log_file_path) + self._service_url = service_url self.retrospective_url = urljoin(service_url, "retrospective-provenance") self.prospective_url = urljoin(service_url, "prospective-provenance") self.context = context @@ -90,6 +91,17 @@ def add_request(self, persistence_request: ProvRequestObj): traceback.print_exc() pass + def patch_custom_metadata(self, prov_task, custom_metadata: dict): + prov_task_id = prov_task.dte_id + url = urljoin(self._service_url, f"/provenance/api/data-transformation-executions/{prov_task_id}/custom-metadata") + + if self.synchronous: + r = requests.patch(url, json=custom_metadata, verify=False) + else: + r = self.session.patch(url, json=custom_metadata, verify=False).result() + + assert r.status_code in [200, 300], f'[{r.status_code}] {r.content=}' + def close(self): if self.session: logger.info("Waiting to get response from all submitted provenance tasks...") diff --git a/src/provlake/persistence/persister.py b/src/provlake/persistence/persister.py index 6f475ae..9a04f4b 100644 --- a/src/provlake/persistence/persister.py +++ b/src/provlake/persistence/persister.py @@ -22,4 +22,6 @@ def close(self): del ProvLake._persister_singleton_instance ProvLake._persister_singleton_instance = None + def patch_custom_metadata(self, prov_task, custom_metadata): + raise NotImplementedError From bab5742b05bf30a5f7e6a307504df87cc534fa35 Mon Sep 17 00:00:00 2001 From: Raphael Melo Thiago Date: Wed, 14 May 2025 16:36:18 -0300 Subject: [PATCH 19/19] feat: replacing attr value iri heuristics to always use hash --- src/provlake/utils/constants.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/provlake/utils/constants.py b/src/provlake/utils/constants.py index eeb871e..c4b3f3f 100644 --- a/src/provlake/utils/constants.py +++ b/src/provlake/utils/constants.py @@ -248,16 +248,16 @@ def get_id_dataset(dte_id): def get_id_atv(attribute_id, value, value_type=None): if value_type: if value_type in {Vocabulary.DATA_REFERENCE_TYPE, Vocabulary.KG_REFERENCE_TYPE}: - return attribute_id + "_" + str(value) + return attribute_id + "_" + id_hash(str(value)) elif value_type == Vocabulary.DATASET_ITEM: return "dataset_item_"+str(uuid.uuid4()) else: - return attribute_id + "_" + str(value) + return attribute_id + "_" + id_hash(str(value)) else: if type(value) in [dict, list]: return attribute_id + "_" + id_hash(str(value)) else: - # TODO if its a float, replace the dots + # TODO: if its a float, replace the dots return attribute_id + "_" + str(value) @staticmethod