From ebe0c3376c716d7fcd9e88cef79c55b1b8f22c2a Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Tue, 16 Dec 2025 11:55:55 +0100 Subject: [PATCH 1/2] --mirror-failures flag don't hides transfers without mirror failures. --- esrally/storage/_cli.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/esrally/storage/_cli.py b/esrally/storage/_cli.py index 50cae89ab..8ebd5b149 100644 --- a/esrally/storage/_cli.py +++ b/esrally/storage/_cli.py @@ -61,13 +61,13 @@ def main(): "--local-dir", type=str, default=cfg.local_dir, help="It specifies local destination directory for downloading files." ) p.add_argument("--base-url", type=str, default=None, help="It specifies the base URL for remote storage.") - p.add_argument("--mirror-failures", action="store_true", help="It considers only those files which have recorded mirror failures.") # It defines ls sub-command output options. for p in (parser, ls_parser): p.add_argument("--filebeat", action="store_true", help="It prints a JSON entry for each file, each separated by a newline.") p.add_argument("--json", action="store_true", help="It prints a pretty entry for each file.") - p.add_argument("--stats", action="store_true", help="It adds connectivity statistics to produced output.") + p.add_argument("--stats", action="store_true", help="It shows connectivity statistics in produced output.") + p.add_argument("--mirror-failures", action="store_true", help="It shows mirror failures in produced output.") p.add_argument("--filenames", action="store_true", help="It shows downloaded file names.") p.add_argument("--status-filenames", action="store_true", help="It shows status file names.") @@ -133,12 +133,6 @@ def main(): LOG.info("No transfers with base URL: %s.", args.base_url) return - if args.mirror_failures: - transfers = [tr for tr in transfers if tr.mirror_failures] - if not transfers: - LOG.info("No transfers with mirror failures.") - return - file_types: set[storage.TransferFileType] | None = None if args.status_filenames or args.filenames: file_types = set[storage.TransferFileType]() From 80073e961e55f6496783eab00927fb49882c0ace Mon Sep 17 00:00:00 2001 From: Federico Ressi Date: Tue, 16 Dec 2025 12:16:13 +0100 Subject: [PATCH 2/2] It always show mirror failures when there is any. --- esrally/storage/_cli.py | 29 +++++++++++++++++------------ esrally/storage/_transfer.py | 8 ++++---- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/esrally/storage/_cli.py b/esrally/storage/_cli.py index 8ebd5b149..f77d7639d 100644 --- a/esrally/storage/_cli.py +++ b/esrally/storage/_cli.py @@ -61,13 +61,13 @@ def main(): "--local-dir", type=str, default=cfg.local_dir, help="It specifies local destination directory for downloading files." ) p.add_argument("--base-url", type=str, default=None, help="It specifies the base URL for remote storage.") + p.add_argument("--mirror-failures", action="store_true", help="It considers only those files which have recorded mirror failures.") # It defines ls sub-command output options. for p in (parser, ls_parser): p.add_argument("--filebeat", action="store_true", help="It prints a JSON entry for each file, each separated by a newline.") p.add_argument("--json", action="store_true", help="It prints a pretty entry for each file.") - p.add_argument("--stats", action="store_true", help="It shows connectivity statistics in produced output.") - p.add_argument("--mirror-failures", action="store_true", help="It shows mirror failures in produced output.") + p.add_argument("--stats", action="store_true", help="It adds connectivity statistics to produced output.") p.add_argument("--filenames", action="store_true", help="It shows downloaded file names.") p.add_argument("--status-filenames", action="store_true", help="It shows status file names.") @@ -133,6 +133,12 @@ def main(): LOG.info("No transfers with base URL: %s.", args.base_url) return + if args.mirror_failures: + transfers = [tr for tr in transfers if tr.mirror_failures] + if not transfers: + LOG.info("No transfers with mirror failures.") + return + file_types: set[storage.TransferFileType] | None = None if args.status_filenames or args.filenames: file_types = set[storage.TransferFileType]() @@ -150,7 +156,7 @@ def main(): fmt = "json" elif args.filebeat: fmt = "filebeat" - ls(transfers, fmt=fmt, stats=args.stats, mirror_failures=args.mirror_failures, file_types=file_types) + ls(transfers, fmt=fmt, stats=args.stats, file_types=file_types) case "get": get(transfers, todo=storage.rangeset(args.range), monitor_interval=cfg.monitor_interval) case "put": @@ -167,7 +173,6 @@ def ls( *, fmt: LsFormat = "json", stats: bool = False, - mirror_failures: bool = False, file_types: set[storage.TransferFileType] | None = None, ) -> None: LOG.info("Found %d transfer(s).", len(transfers)) @@ -179,7 +184,7 @@ def ls( sys.stdout.write("\n".join(sorted(filenames)) + "\n") case "json": json.dump( - [transfer_to_dict(tr, stats=stats, mirror_failures=mirror_failures) for tr in transfers], + [transfer_to_dict(tr, stats=stats) for tr in transfers], sys.stdout, indent=2, sort_keys=True, @@ -187,11 +192,11 @@ def ls( case "filebeat": for tr in transfers: # Filebeat format is made of the root object (without stats), plus a separate object for each transfer stat. - for d in transfer_to_filebeat(tr, stats=stats, mirror_failures=mirror_failures): + for d in transfer_to_filebeat(tr, stats=stats): line = json.dumps({"rally": {"storage": d}}) sys.stdout.write(f"{line}\n") case "pretty": - json.dump([t.pretty(stats=stats, mirror_failures=mirror_failures) for t in transfers], sys.stdout, indent=2) + json.dump([t.pretty(stats=stats) for t in transfers], sys.stdout, indent=2) class BaseTransferDict(TypedDict): @@ -233,7 +238,7 @@ class StatsDict(TypedDict): write_time: float -def transfer_to_dict(tr: storage.Transfer, *, stats: bool = False, mirror_failures: bool = False) -> TransferDict: +def transfer_to_dict(tr: storage.Transfer, *, stats: bool = False) -> TransferDict: """It obtains dictionaries from transfer status in the format to be serialized as JSON.""" d = TransferDict( url=tr.url, @@ -246,9 +251,9 @@ def transfer_to_dict(tr: storage.Transfer, *, stats: bool = False, mirror_failur todo=str(tr.todo), finished=tr.finished, ) - if mirror_failures: + if tr.mirror_failures: d["mirror_failures"] = [MirrorFailureDict(url=f.url, error=f.error) for f in tr.mirror_failures] - if stats: + if stats and tr.stats: d["stats"] = [ StatsDict( url=s.url, @@ -264,7 +269,7 @@ def transfer_to_dict(tr: storage.Transfer, *, stats: bool = False, mirror_failur def transfer_to_filebeat( - tr: storage.Transfer, stats: bool = False, mirror_failures: bool = False + tr: storage.Transfer, stats: bool = False ) -> Generator[TransferDict | FilebeatStatsDict | FilebeatMirrorFailureDict]: """It obtains dictionaries from transfer in the format to be ingested to filebeat. @@ -273,7 +278,7 @@ def transfer_to_filebeat( - a FilebeatMirrorFailureDict for every MirrorFailureDict in 'mirror_failures' list. - a FilebeatStatsDict for every TransferStatsDict in 'stats' list. """ - root: TransferDict = transfer_to_dict(tr, stats=stats, mirror_failures=mirror_failures) + root: TransferDict = transfer_to_dict(tr, stats=stats) _mirror_failures: list[MirrorFailureDict] = root.pop("mirror_failures", []) _stats: list[StatsDict] = root.pop("stats", []) yield root diff --git a/esrally/storage/_transfer.py b/esrally/storage/_transfer.py index fc31a3a87..9335e22d6 100644 --- a/esrally/storage/_transfer.py +++ b/esrally/storage/_transfer.py @@ -701,7 +701,7 @@ def average_speed(self) -> float: return 0.0 return (done.size - self._resumed_size) / self.duration.s() - def pretty(self, *, stats: bool = False, mirror_failures: bool = False) -> dict[str, Any]: + def pretty(self, *, stats: bool = False) -> dict[str, Any]: details: dict[str, Any] = { "url": self.url, "path": self.path, @@ -712,14 +712,14 @@ def pretty(self, *, stats: bool = False, mirror_failures: bool = False) -> dict[ "duration": self.duration and pretty.duration(self.duration), "throughput": self.average_speed and pretty.throughput(self.average_speed), } - if mirror_failures: + if self.mirror_failures: details["mirror_failures"] = [dataclasses.asdict(f) for f in self.mirror_failures] if stats: details["stats"] = [s.pretty() for s in self.stats] return {k: v for k, v in details.items() if v} - def info(self, *, stats: bool = False, mirror_failures: bool = False) -> str: - return json.dumps(self.pretty(stats=stats, mirror_failures=mirror_failures), indent=2) + def info(self, *, stats: bool = False) -> str: + return json.dumps(self.pretty(stats=stats), indent=2) def wait(self, timeout: float | None = None) -> None: """It waits for transfer termination."""