Skip to content

Commit 4c302aa

Browse files
authored
Bugfix/prod 1477 dbfs filesize wait (#76)
* wait for non-empty event log files in dbfs before downloading * bump version * wait for eventlog directory to be unchanging with nonzero file sizes * fix logic. Add logging statement * add type hint
1 parent 73aaf53 commit 4c302aa

File tree

3 files changed

+35
-3
lines changed

3 files changed

+35
-3
lines changed

Makefile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,9 @@ endif
1919
.PHONY: tidy
2020
tidy: format lint
2121

22+
# Removes the directory that contains bytecode cache files
23+
# that are automatically generated by python.
24+
.PHONY: clean
25+
clean:
26+
find . -type f -name "*.pyc" | xargs rm -fr
27+
find . -type d -name __pycache__ | xargs rm -fr

sync/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
"""Library for leveraging the power of Sync"""
2-
__version__ = "0.5.2"
2+
__version__ = "0.5.3"
33

44
TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"

sync/_databricks.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1725,6 +1725,25 @@ def _dbfs_directory_has_all_rollover_logs(contents: dict, run_end_time_millis: f
17251725
)
17261726

17271727

1728+
def _dbfs_any_file_has_zero_size(dbfs_contents: Dict) -> bool:
1729+
any_zeros = any(file["file_size"] == 0 for file in dbfs_contents["files"])
1730+
if any_zeros:
1731+
logger.info("One or more dbfs event log files has a file size of zero")
1732+
return any_zeros
1733+
1734+
1735+
def _check_total_file_size_changed(
1736+
last_total_file_size: int, dbfs_contents: Dict
1737+
) -> Tuple[bool, int]:
1738+
1739+
new_total_file_size = sum([file.get("file_size", 0) for file in dbfs_contents.get("files", {})])
1740+
if new_total_file_size == last_total_file_size:
1741+
return False, new_total_file_size
1742+
else:
1743+
logger.info("Total file size of eventlog directory changed")
1744+
return True, new_total_file_size
1745+
1746+
17281747
def _event_log_poll_duration_seconds():
17291748
"""Convenience function to aid testing"""
17301749
return 15
@@ -1820,9 +1839,12 @@ def _get_eventlog_from_dbfs(
18201839
poll_num_attempts = 0
18211840
poll_max_attempts = 20 # 5 minutes / 15 seconds = 20 attempts
18221841

1823-
while (
1842+
total_file_size = 0
1843+
file_size_changed, total_file_size = _check_total_file_size_changed(0, eventlog_dir)
1844+
while (poll_num_attempts < poll_max_attempts) and (
18241845
not _dbfs_directory_has_all_rollover_logs(eventlog_dir, run_end_time_millis)
1825-
and poll_num_attempts < poll_max_attempts
1846+
or _dbfs_any_file_has_zero_size(eventlog_dir)
1847+
or file_size_changed
18261848
):
18271849
if poll_num_attempts > 0:
18281850
logger.info(
@@ -1831,6 +1853,10 @@ def _get_eventlog_from_dbfs(
18311853
sleep(poll_duration_seconds)
18321854

18331855
eventlog_dir = dbx_client.list_dbfs_directory(matching_subdirectory["path"])
1856+
file_size_changed, total_file_size = _check_total_file_size_changed(
1857+
total_file_size, eventlog_dir
1858+
)
1859+
18341860
poll_num_attempts += 1
18351861

18361862
eventlog_zip = io.BytesIO()

0 commit comments

Comments
 (0)