Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
global-include py.typed
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ This is part of ProvLake Project. See [ibm.biz/provlake](http://ibm.biz/provlake

## Installation

`pip install git+git://github.com/IBM/multi-data-lineage-capture-py.git`
`pip install provlake`


### A Python lib to capture multiworkflow provenance data from Python Scripts
### A Python lib to capture data processed by multiple workflows using provenance

Use this library for code instrumentation to collect provenance data of function calls in a script. Input arguments or output values from functions can come from distributed data storages, including file systems and database systems.

Expand Down Expand Up @@ -52,7 +52,7 @@ If you prefer, you can use it without context management:
```python
from provlake import ProvLake
from provlake.capture import ProvWorkflow, ProvTask
prov = ProvLake.get_persister("factorial_dataflow_without_ctx_mgmt")
prov = ProvLake.get_persister("factorial_dataflow")
prov_workflow = ProvWorkflow(prov)
prov_workflow.begin()

Expand All @@ -67,3 +67,9 @@ prov_task.end(out_args)

prov_workflow.end()
```

By executing this file, the library generates raw provenance data in JSON format. If you don't specify the backend using the `service_url` argument in the `ProvLake.get_persister` builder, the JSON provenance data will be dumped to a local file, updated during workflow execution.
Users can analyze the file using their preferred tools.

To use the analytics tools provided by IBM, such as the Knowledge Exploration System (KES) tool or run more complex queries, users need to use the ProvLake backend service. ProvLake backend processes the JSON provenance data and transforms it into a knowledge graph using W3C standards (PROV, OWL, and RDF) and stores in a knowledge graph system. This backend service is proprietary by IBM. Please contact {rfsouza, lga, rcerq, mstelmar}@br.ibm.com for more information.

2 changes: 0 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
pyyaml
requests-futures
numpy
pandas

6 changes: 4 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@

setup(name='provlake',
version=VERSION,
description='A Python lib to capture multiworkflow provenance data from Python Scripts',
description='A Python lib to capture multiworkflow provenance data',
long_description='ProvLake is a system that captures and integrates data processed by multiple workflows using provenance data.',
long_description_content_type='text/x-rst',
url='http://ibm.biz/provlake',
author='IBM Research',
author_email='rfsouza@br.ibm.com',
license='Apache 2.0',
install_requires=requires,
package_dir={'': 'src'},
packages=find_packages(where='src'),
python_requires='>=3.6',
python_requires='>=3.6,<3.12',
include_package_data=True,
zip_safe=False)
16 changes: 10 additions & 6 deletions src/provlake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def _build_managed_persister(
context: str,
with_validation: bool,
db_name: str,
log_file_path: str
log_file_path: str,
synchronous: bool,
) -> ManagedPersister:
should_send_to_service = False
if service_url is not None:
Expand All @@ -45,7 +46,8 @@ def _build_managed_persister(
should_send_to_file=should_send_to_file,
log_dir=log_dir,
log_file_path=log_file_path,
should_send_to_service=should_send_to_service)
should_send_to_service=should_send_to_service,
synchronous=synchronous,)

@staticmethod
def _build_unmanaged_persister(log_file_path:str) -> UnmanagedPersister:
Expand All @@ -56,14 +58,15 @@ def _build_unmanaged_persister(log_file_path:str) -> UnmanagedPersister:
def get_persister(
log_file_path=None,
managed_persistence=True,
context: str = None,
context: str | None = None,
with_validation: bool = False,
log_level: str = 'error',
should_send_to_file=True,
log_dir='.',
service_url=None,
bag_size=None,
db_name: str = None
db_name: str | None = None,
synchronous: bool = False,
) -> Persister:
if ProvLake._persister_singleton_instance is None:

Expand All @@ -81,11 +84,12 @@ def get_persister(
context=context,
with_validation=with_validation,
db_name=db_name,
log_file_path=log_file_path
log_file_path=log_file_path,
synchronous=synchronous,
)
else:
persister = ProvLake._build_unmanaged_persister(
log_file_path
log_file_path,
)
ProvLake._persister_singleton_instance = persister
return ProvLake._persister_singleton_instance
Expand Down
7 changes: 7 additions & 0 deletions src/provlake/capture/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ def end(self, output_args=dict(), stdout=None, stderr=None, end_time: float = No
self.prov_obj.dt_name + " args: " + str(self.prov_obj.values))
return None

def patch_custom_metadata(self, custom_metadata: dict):
self._prov_persister.patch_custom_metadata(self, custom_metadata)

def __enter__(self):
self.begin()
return self
Expand All @@ -186,6 +189,10 @@ def __exit__(self, *args):
# There is no output, but end of task should be recorded anyway.
self.end()

@property
def dte_id(self, ) -> str:
return f'{self.prov_obj.workflow_name}_exec_{self.prov_obj.wf_exec_id}_{self.prov_obj.dt_name}_{self.prov_obj.task_id}'


class ProvCycle(ActivityCapture):

Expand Down
63 changes: 46 additions & 17 deletions src/provlake/persistence/managed_persister.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import traceback
from time import sleep
import urllib3
import requests

from requests.exceptions import ConnectionError
from requests_futures.sessions import FuturesSession
Expand All @@ -23,9 +24,11 @@ class ManagedPersister(Persister):

def __init__(self, log_file_path: str, service_url: str, wf_exec_id=None, context: str = None,
with_validation: bool = False, db_name: str = None, bag_size: int = 1,
log_dir: str = '.', should_send_to_file: bool = False, should_send_to_service: bool = True,
log_dir: str = '.', should_send_to_file: bool = False, should_send_to_service: bool = True, synchronous: bool = False,
retries_on_connection_error: int = 10
):
super().__init__(log_file_path)
self._service_url = service_url
self.retrospective_url = urljoin(service_url, "retrospective-provenance")
self.prospective_url = urljoin(service_url, "prospective-provenance")
self.context = context
Expand All @@ -35,6 +38,8 @@ def __init__(self, log_file_path: str, service_url: str, wf_exec_id=None, contex
self.bag_size = bag_size
self.should_send_to_service = should_send_to_service
self.should_send_to_file = should_send_to_file
self.synchronous = synchronous
self.retries_on_connection_error = retries_on_connection_error

self._session = None
self._offline_prov_log = None
Expand Down Expand Up @@ -86,6 +91,17 @@ def add_request(self, persistence_request: ProvRequestObj):
traceback.print_exc()
pass

def patch_custom_metadata(self, prov_task, custom_metadata: dict):
prov_task_id = prov_task.dte_id
url = urljoin(self._service_url, f"/provenance/api/data-transformation-executions/{prov_task_id}/custom-metadata")

if self.synchronous:
r = requests.patch(url, json=custom_metadata, verify=False)
else:
r = self.session.patch(url, json=custom_metadata, verify=False).result()

assert r.status_code in [200, 300], f'[{r.status_code}] {r.content=}'

def close(self):
if self.session:
logger.info("Waiting to get response from all submitted provenance tasks...")
Expand Down Expand Up @@ -121,22 +137,35 @@ def _flush(self, all_and_wait: bool = False):

def _send_to_service(self, to_flush: List[dict]):
params = {"with_validation": str(self.with_validation), "db_name": self.db_name}
try:
logger.debug("[Prov-Persistence]" + json.dumps(to_flush))
# TODO: check whether we need this result() below
r = self.session.post(self.retrospective_url, json=to_flush, params=params, verify=False).result()
except ConnectionError as ex:
logger.error(
"[Prov][ConnectionError] There is a communication error between client and server -> " + str(ex))
r = None
pass
except Exception as ex:
traceback.print_exc()
logger.error(
"[Prov] Unexpected exception while adding retrospective provenance: " + type(ex).__name__
+ "->" + str(ex))
r = None
pass
if self.synchronous:
params["synchronous"] = "true"
r = None
for i in range(self.retries_on_connection_error):
try:
logger.debug("[Prov-Persistence] [Retry#" + str(i) + "]" + json.dumps(to_flush))
# TODO: check whether we need this result() below
if self.synchronous:
r = requests.post(self.retrospective_url, json=to_flush, params=params, verify=False)
else:
r = self.session.post(self.retrospective_url, json=to_flush, params=params, verify=False).result()
break
except ConnectionError as ex:
logger.error(
"[Prov][ConnectionError] There is a communication error between client and server -> " + str(ex))
sleep_for = (2 ** i)
logging.debug(f'Exponential backoff ({sleep_for})')
sleep(sleep_for)
r = None
pass
except Exception as ex:
traceback.print_exc()
logger.error(
"[Prov] Unexpected exception while adding retrospective provenance: " + type(ex).__name__
+ "->" + str(ex))
r = None
break
if r is None:
logger.info('Could not process provenance trace {}'.format(json.dumps(to_flush)))
# If requests were validated, check for errors
if r and self.with_validation:
self._log_validation_message(r)
Expand Down
2 changes: 2 additions & 0 deletions src/provlake/persistence/persister.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ def close(self):
del ProvLake._persister_singleton_instance
ProvLake._persister_singleton_instance = None

def patch_custom_metadata(self, prov_task, custom_metadata):
raise NotImplementedError

Empty file added src/provlake/py.typed
Empty file.
Loading