diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..e03a7fe --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +global-include py.typed \ No newline at end of file diff --git a/README.md b/README.md index eaf23ff..3d6b1a2 100644 --- a/README.md +++ b/README.md @@ -4,10 +4,10 @@ This is part of ProvLake Project. See [ibm.biz/provlake](http://ibm.biz/provlake ## Installation -`pip install git+git://github.com/IBM/multi-data-lineage-capture-py.git` +`pip install provlake` -### A Python lib to capture multiworkflow provenance data from Python Scripts +### A Python lib to capture data processed by multiple workflows using provenance Use this library for code instrumentation to collect provenance data of function calls in a script. Input arguments or output values from functions can come from distributed data storages, including file systems and database systems. @@ -52,7 +52,7 @@ If you prefer, you can use it without context management: ```python from provlake import ProvLake from provlake.capture import ProvWorkflow, ProvTask -prov = ProvLake.get_persister("factorial_dataflow_without_ctx_mgmt") +prov = ProvLake.get_persister("factorial_dataflow") prov_workflow = ProvWorkflow(prov) prov_workflow.begin() @@ -67,3 +67,9 @@ prov_task.end(out_args) prov_workflow.end() ``` + +By executing this file, the library generates raw provenance data in JSON format. If you don't specify the backend using the `service_url` argument in the `ProvLake.get_persister` builder, the JSON provenance data will be dumped to a local file, updated during workflow execution. +Users can analyze the file using their preferred tools. + +To use the analytics tools provided by IBM, such as the Knowledge Exploration System (KES) tool or run more complex queries, users need to use the ProvLake backend service. ProvLake backend processes the JSON provenance data and transforms it into a knowledge graph using W3C standards (PROV, OWL, and RDF) and stores in a knowledge graph system. This backend service is proprietary by IBM. Please contact {rfsouza, lga, rcerq, mstelmar}@br.ibm.com for more information. + diff --git a/requirements.txt b/requirements.txt index e3f747a..dd2be4f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,3 @@ -pyyaml requests-futures numpy pandas - diff --git a/setup.py b/setup.py index 42f147a..1e40629 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,9 @@ setup(name='provlake', version=VERSION, - description='A Python lib to capture multiworkflow provenance data from Python Scripts', + description='A Python lib to capture multiworkflow provenance data', + long_description='ProvLake is a system that captures and integrates data processed by multiple workflows using provenance data.', + long_description_content_type='text/x-rst', url='http://ibm.biz/provlake', author='IBM Research', author_email='rfsouza@br.ibm.com', @@ -16,6 +18,6 @@ install_requires=requires, package_dir={'': 'src'}, packages=find_packages(where='src'), - python_requires='>=3.6', + python_requires='>=3.6,<3.12', include_package_data=True, zip_safe=False) diff --git a/src/provlake/__init__.py b/src/provlake/__init__.py index ff84372..d4358db 100644 --- a/src/provlake/__init__.py +++ b/src/provlake/__init__.py @@ -22,7 +22,8 @@ def _build_managed_persister( context: str, with_validation: bool, db_name: str, - log_file_path: str + log_file_path: str, + synchronous: bool, ) -> ManagedPersister: should_send_to_service = False if service_url is not None: @@ -45,7 +46,8 @@ def _build_managed_persister( should_send_to_file=should_send_to_file, log_dir=log_dir, log_file_path=log_file_path, - should_send_to_service=should_send_to_service) + should_send_to_service=should_send_to_service, + synchronous=synchronous,) @staticmethod def _build_unmanaged_persister(log_file_path:str) -> UnmanagedPersister: @@ -56,14 +58,15 @@ def _build_unmanaged_persister(log_file_path:str) -> UnmanagedPersister: def get_persister( log_file_path=None, managed_persistence=True, - context: str = None, + context: str | None = None, with_validation: bool = False, log_level: str = 'error', should_send_to_file=True, log_dir='.', service_url=None, bag_size=None, - db_name: str = None + db_name: str | None = None, + synchronous: bool = False, ) -> Persister: if ProvLake._persister_singleton_instance is None: @@ -81,11 +84,12 @@ def get_persister( context=context, with_validation=with_validation, db_name=db_name, - log_file_path=log_file_path + log_file_path=log_file_path, + synchronous=synchronous, ) else: persister = ProvLake._build_unmanaged_persister( - log_file_path + log_file_path, ) ProvLake._persister_singleton_instance = persister return ProvLake._persister_singleton_instance diff --git a/src/provlake/capture/__init__.py b/src/provlake/capture/__init__.py index c4ce681..01e64ad 100644 --- a/src/provlake/capture/__init__.py +++ b/src/provlake/capture/__init__.py @@ -177,6 +177,9 @@ def end(self, output_args=dict(), stdout=None, stderr=None, end_time: float = No self.prov_obj.dt_name + " args: " + str(self.prov_obj.values)) return None + def patch_custom_metadata(self, custom_metadata: dict): + self._prov_persister.patch_custom_metadata(self, custom_metadata) + def __enter__(self): self.begin() return self @@ -186,6 +189,10 @@ def __exit__(self, *args): # There is no output, but end of task should be recorded anyway. self.end() + @property + def dte_id(self, ) -> str: + return f'{self.prov_obj.workflow_name}_exec_{self.prov_obj.wf_exec_id}_{self.prov_obj.dt_name}_{self.prov_obj.task_id}' + class ProvCycle(ActivityCapture): diff --git a/src/provlake/persistence/managed_persister.py b/src/provlake/persistence/managed_persister.py index 1e5e28a..4ea4d24 100644 --- a/src/provlake/persistence/managed_persister.py +++ b/src/provlake/persistence/managed_persister.py @@ -5,6 +5,7 @@ import traceback from time import sleep import urllib3 +import requests from requests.exceptions import ConnectionError from requests_futures.sessions import FuturesSession @@ -23,9 +24,11 @@ class ManagedPersister(Persister): def __init__(self, log_file_path: str, service_url: str, wf_exec_id=None, context: str = None, with_validation: bool = False, db_name: str = None, bag_size: int = 1, - log_dir: str = '.', should_send_to_file: bool = False, should_send_to_service: bool = True, + log_dir: str = '.', should_send_to_file: bool = False, should_send_to_service: bool = True, synchronous: bool = False, + retries_on_connection_error: int = 10 ): super().__init__(log_file_path) + self._service_url = service_url self.retrospective_url = urljoin(service_url, "retrospective-provenance") self.prospective_url = urljoin(service_url, "prospective-provenance") self.context = context @@ -35,6 +38,8 @@ def __init__(self, log_file_path: str, service_url: str, wf_exec_id=None, contex self.bag_size = bag_size self.should_send_to_service = should_send_to_service self.should_send_to_file = should_send_to_file + self.synchronous = synchronous + self.retries_on_connection_error = retries_on_connection_error self._session = None self._offline_prov_log = None @@ -86,6 +91,17 @@ def add_request(self, persistence_request: ProvRequestObj): traceback.print_exc() pass + def patch_custom_metadata(self, prov_task, custom_metadata: dict): + prov_task_id = prov_task.dte_id + url = urljoin(self._service_url, f"/provenance/api/data-transformation-executions/{prov_task_id}/custom-metadata") + + if self.synchronous: + r = requests.patch(url, json=custom_metadata, verify=False) + else: + r = self.session.patch(url, json=custom_metadata, verify=False).result() + + assert r.status_code in [200, 300], f'[{r.status_code}] {r.content=}' + def close(self): if self.session: logger.info("Waiting to get response from all submitted provenance tasks...") @@ -121,22 +137,35 @@ def _flush(self, all_and_wait: bool = False): def _send_to_service(self, to_flush: List[dict]): params = {"with_validation": str(self.with_validation), "db_name": self.db_name} - try: - logger.debug("[Prov-Persistence]" + json.dumps(to_flush)) - # TODO: check whether we need this result() below - r = self.session.post(self.retrospective_url, json=to_flush, params=params, verify=False).result() - except ConnectionError as ex: - logger.error( - "[Prov][ConnectionError] There is a communication error between client and server -> " + str(ex)) - r = None - pass - except Exception as ex: - traceback.print_exc() - logger.error( - "[Prov] Unexpected exception while adding retrospective provenance: " + type(ex).__name__ - + "->" + str(ex)) - r = None - pass + if self.synchronous: + params["synchronous"] = "true" + r = None + for i in range(self.retries_on_connection_error): + try: + logger.debug("[Prov-Persistence] [Retry#" + str(i) + "]" + json.dumps(to_flush)) + # TODO: check whether we need this result() below + if self.synchronous: + r = requests.post(self.retrospective_url, json=to_flush, params=params, verify=False) + else: + r = self.session.post(self.retrospective_url, json=to_flush, params=params, verify=False).result() + break + except ConnectionError as ex: + logger.error( + "[Prov][ConnectionError] There is a communication error between client and server -> " + str(ex)) + sleep_for = (2 ** i) + logging.debug(f'Exponential backoff ({sleep_for})') + sleep(sleep_for) + r = None + pass + except Exception as ex: + traceback.print_exc() + logger.error( + "[Prov] Unexpected exception while adding retrospective provenance: " + type(ex).__name__ + + "->" + str(ex)) + r = None + break + if r is None: + logger.info('Could not process provenance trace {}'.format(json.dumps(to_flush))) # If requests were validated, check for errors if r and self.with_validation: self._log_validation_message(r) diff --git a/src/provlake/persistence/persister.py b/src/provlake/persistence/persister.py index 6f475ae..9a04f4b 100644 --- a/src/provlake/persistence/persister.py +++ b/src/provlake/persistence/persister.py @@ -22,4 +22,6 @@ def close(self): del ProvLake._persister_singleton_instance ProvLake._persister_singleton_instance = None + def patch_custom_metadata(self, prov_task, custom_metadata): + raise NotImplementedError diff --git a/src/provlake/py.typed b/src/provlake/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/src/provlake/utils/__deprecated.py b/src/provlake/utils/__deprecated.py deleted file mode 100644 index 53c8178..0000000 --- a/src/provlake/utils/__deprecated.py +++ /dev/null @@ -1,262 +0,0 @@ - -""" -##### -# Exemplary Content of a prov.yaml: -##### -context_name: seismic_pipelines -dataflow_name: - train: seismic_stratigraphic_train - create_data: seismic_stratigraphic_create_training_data - analyze_seismic: seismic_stratigraphic_analyze_seismic -file_storage: -with_validation: false -should_send_to_file: true -log_dir: . -align_terms: - segy_path: seismic_file - horizons_paths: horizon_file -graph_uri: -stringify_params: ["unite_category"] -not_tracked_params: ["prov", "prov_path", "input_path", "log_level", "dataset", "prov_config", "data_path" ] -bag_size: 1 -service: http://localhost:5000 -should_send_to_service: false - log_level: info -""" - -class MLSemantics(Enum): - hyper_parameter = "hyper_parameter" - data_characteristic = "data_characteristic" - dataset = "dataset" - evaluation_measure = "evaluation_measure" - model = "model" - - def __str__(self): - return self.value - - -class DTYPE(Enum): - int = "integer" - str = "string" - bool = "boolean" - list = "list" - float = "float" - complex = "complex_attribute" - any = "any" - - def __str__(self): - return self.value - - -class ProvConf: - conf_data: dict = None - custom_attributes: dict = None - - def __init__(self, prov_conf_path: str = None): - if not prov_conf_path: - logger.warning("[Prov] You are not capturing provenance.") - return - with open(prov_conf_path, 'r') as stream: - try: - ProvConf.conf_data = yaml.safe_load(stream) - except yaml.YAMLError as exc: - raise exc - - if "custom_attributes" in ProvConf.conf_data: - with open(ProvConf.conf_data["custom_attributes"], 'r') as stream: - try: - ProvConf.custom_attributes = yaml.safe_load(stream) - except yaml.YAMLError as exc: - raise exc - - -def get_dtype_from_val(value, should_stringfy=False) -> str: - if should_stringfy: - return DTYPE.str - elif type(value) == str: - return DTYPE.str - elif type(value) == list: - return DTYPE.list - elif type(value) == int: - return DTYPE.int - elif type(value) == float: - return DTYPE.float - elif type(value) == bool: - return DTYPE.bool - elif type(value) == dict: - return DTYPE.complex - else: - return DTYPE.any - - -def get_dtype_from_type(_type: type, should_stringfy=False) -> str: - if should_stringfy: - return str(DTYPE.str) - elif _type == str: - return str(DTYPE.str) - elif _type == list: - return str(DTYPE.list) - elif _type == int: - return str(DTYPE.int) - elif _type == float: - return str(DTYPE.float) - elif _type == bool: - return str(DTYPE.bool) - elif _type == dict: - return str(DTYPE.complex) - else: - return "any" - - -def build_generic_prospective(dataflow_name: str): - prospective_prov = dict() - prospective_prov["dataflow_name"] = ProvConf.conf_data["dataflow_name"][dataflow_name] - prospective_prov["context_name"] = ProvConf.conf_data["context_name"] - prospective_prov["storages"] = {"main_filesystem": {"type": "PhysicalMachine"}} - prospective_prov["data_transformations"] = {} - storage_configuration = { - "configuration": { - "storages": { - "main_filesystem": { - "type": "PhysicalMachine", - "host_address": ProvConf.conf_data["file_storage"] - } - } - } - } - return prospective_prov, storage_configuration - - -# @Deprecated -- this is going to be depricated. Please avoid using it. -def build_prov_input_from_dict(dict_params: dict): - retrospective_input_prov = {} - attributes = [] - - for key in dict_params: - # Remove params not tracked: - if "not_tracked_params" in ProvConf.conf_data and key in ProvConf.conf_data["not_tracked_params"]: - continue - - value = dict_params[key] - if value is None or (value is not None and value == ''): - continue - - # Renaming param names in case we need: - attr_name = key - if "align_terms" in ProvConf.conf_data and key in ProvConf.conf_data["align_terms"]: - attr_name = ProvConf.conf_data["align_terms"][key] - - should_stringify = __get_should_stringify(attr_name) - if should_stringify: - retrospective_input_prov[attr_name] = str(value) - else: - retrospective_input_prov[attr_name] = value - attr = { - "name": attr_name, - #"description": "" - } - if "path" in attr_name or "file" in attr_name: - attr["semantics"] = "FILE_REFERENCE" - attr["storage_references"] = { - "key": "main_filesystem" - } - else: - attr["semantics"] = "PARAMETER" - attr["ml_semantics"] = str(MLSemantics.hyper_parameter) - - # if "_slices" in attr_name: - # # we have a special case for slices in a different method - # continue - - dtype = get_dtype_from_val(value, should_stringify) - if dtype == "list": - attr["dtype"] = dtype - if "REFERENCE" not in attr["semantics"] and len(value) > 0: - attr["elementdtype"] = str(get_dtype_from_val(value[0], should_stringify)) - elif "REFERENCE" not in attr["semantics"]: - attr["dtype"] = dtype - - attributes.append(attr) - - return attributes, retrospective_input_prov - - -def __get_should_stringify(key): - if "stringify_params" not in ProvConf.conf_data: - return False - if key not in ProvConf.conf_data["stringify_params"]: - return False - return True - - -def get_prospective_attribute(key, value): - ml_semantics = None - - if type(value) == type: - dtype = get_dtype_from_type(value) - elif type(value) == DTYPE: - dtype = value - elif type(value) == dict: - dtype = value.get("dtype", "any") - ml_semantics = value.get("ml_semantics", None) - else: - dtype = get_dtype_from_val(value) - - attr = { - "name": key, - "dtype": str(dtype) - } - - if ml_semantics: - attr["ml_semantics"] = str(ml_semantics) - - if "path" in key or "file" in key: - attr["semantics"] = "FILE_REFERENCE" - attr["storage_references"] = {"key": "main_filesystem"} - - if dtype == DTYPE.list: - if len(value) > 0: - attr["elementdtype"] = str(get_dtype_from_val(value[0])) - elif dtype == DTYPE.complex: - attr["attributes"] = get_prospective_from_args(value) - - return attr - - -def get_prospective_from_args(prov_args: dict) -> list: - args_schema = [] - - for k in prov_args: - # removing unwanted attributes - if k == "self" or k.startswith('_') or "prov" in k: - continue - - attr = get_prospective_attribute(k, prov_args[k]) - args_schema.append(attr) - return args_schema - - -def build_prov_for_transformation(prospective_prov: dict, transformation): - # IN - prospective_in, prospective_out = [], [] - if hasattr(transformation, "static_prospective_prov_attributes_in") \ - and transformation.static_prospective_prov_attributes_in: - prospective_in = transformation.static_prospective_prov_attributes_in - elif transformation.static_schema_args_in: - prospective_in = get_prospective_from_args(transformation.static_schema_args_in) - else: - transformation.static_schema_args_in = transformation.get_static_schema_args_from_init() - prospective_in = get_prospective_from_args(transformation.static_schema_args_in) - - # OUT - if transformation.static_schema_args_out: - prospective_out = get_prospective_from_args(transformation.static_schema_args_out) - # elif hasattr(transformation, "prospective_args_out") and transformation.prospective_args_out: - # prospective_out = transformation.prospective_args_out - prospective_prov["data_transformations"].update({ - transformation.dt_name(): { - "input_datasets": [{"attributes": prospective_in}], - "output_datasets": [{"attributes": prospective_out}] - } - }) - return prospective_prov \ No newline at end of file diff --git a/src/provlake/utils/constants.py b/src/provlake/utils/constants.py index eeb871e..c4b3f3f 100644 --- a/src/provlake/utils/constants.py +++ b/src/provlake/utils/constants.py @@ -248,16 +248,16 @@ def get_id_dataset(dte_id): def get_id_atv(attribute_id, value, value_type=None): if value_type: if value_type in {Vocabulary.DATA_REFERENCE_TYPE, Vocabulary.KG_REFERENCE_TYPE}: - return attribute_id + "_" + str(value) + return attribute_id + "_" + id_hash(str(value)) elif value_type == Vocabulary.DATASET_ITEM: return "dataset_item_"+str(uuid.uuid4()) else: - return attribute_id + "_" + str(value) + return attribute_id + "_" + id_hash(str(value)) else: if type(value) in [dict, list]: return attribute_id + "_" + id_hash(str(value)) else: - # TODO if its a float, replace the dots + # TODO: if its a float, replace the dots return attribute_id + "_" + str(value) @staticmethod diff --git a/version b/version index c2f0a83..28fbf1a 100644 --- a/version +++ b/version @@ -1 +1 @@ -0.2.98 +0.2.983 \ No newline at end of file