Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions esrally/storage/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def main():
fmt = "json"
elif args.filebeat:
fmt = "filebeat"
ls(transfers, fmt=fmt, stats=args.stats, mirror_failures=args.mirror_failures, file_types=file_types)
ls(transfers, fmt=fmt, stats=args.stats, file_types=file_types)
case "get":
get(transfers, todo=storage.rangeset(args.range), monitor_interval=cfg.monitor_interval)
case "put":
Expand All @@ -173,7 +173,6 @@ def ls(
*,
fmt: LsFormat = "json",
stats: bool = False,
mirror_failures: bool = False,
file_types: set[storage.TransferFileType] | None = None,
) -> None:
LOG.info("Found %d transfer(s).", len(transfers))
Expand All @@ -185,19 +184,19 @@ def ls(
sys.stdout.write("\n".join(sorted(filenames)) + "\n")
case "json":
json.dump(
[transfer_to_dict(tr, stats=stats, mirror_failures=mirror_failures) for tr in transfers],
[transfer_to_dict(tr, stats=stats) for tr in transfers],
sys.stdout,
indent=2,
sort_keys=True,
)
case "filebeat":
for tr in transfers:
# Filebeat format is made of the root object (without stats), plus a separate object for each transfer stat.
for d in transfer_to_filebeat(tr, stats=stats, mirror_failures=mirror_failures):
for d in transfer_to_filebeat(tr, stats=stats):
line = json.dumps({"rally": {"storage": d}})
sys.stdout.write(f"{line}\n")
case "pretty":
json.dump([t.pretty(stats=stats, mirror_failures=mirror_failures) for t in transfers], sys.stdout, indent=2)
json.dump([t.pretty(stats=stats) for t in transfers], sys.stdout, indent=2)


class BaseTransferDict(TypedDict):
Expand Down Expand Up @@ -239,7 +238,7 @@ class StatsDict(TypedDict):
write_time: float


def transfer_to_dict(tr: storage.Transfer, *, stats: bool = False, mirror_failures: bool = False) -> TransferDict:
def transfer_to_dict(tr: storage.Transfer, *, stats: bool = False) -> TransferDict:
"""It obtains dictionaries from transfer status in the format to be serialized as JSON."""
d = TransferDict(
url=tr.url,
Expand All @@ -252,9 +251,9 @@ def transfer_to_dict(tr: storage.Transfer, *, stats: bool = False, mirror_failur
todo=str(tr.todo),
finished=tr.finished,
)
if mirror_failures:
if tr.mirror_failures:
d["mirror_failures"] = [MirrorFailureDict(url=f.url, error=f.error) for f in tr.mirror_failures]
if stats:
if stats and tr.stats:
d["stats"] = [
StatsDict(
url=s.url,
Expand All @@ -270,7 +269,7 @@ def transfer_to_dict(tr: storage.Transfer, *, stats: bool = False, mirror_failur


def transfer_to_filebeat(
tr: storage.Transfer, stats: bool = False, mirror_failures: bool = False
tr: storage.Transfer, stats: bool = False
) -> Generator[TransferDict | FilebeatStatsDict | FilebeatMirrorFailureDict]:
"""It obtains dictionaries from transfer in the format to be ingested to filebeat.

Expand All @@ -279,7 +278,7 @@ def transfer_to_filebeat(
- a FilebeatMirrorFailureDict for every MirrorFailureDict in 'mirror_failures' list.
- a FilebeatStatsDict for every TransferStatsDict in 'stats' list.
"""
root: TransferDict = transfer_to_dict(tr, stats=stats, mirror_failures=mirror_failures)
root: TransferDict = transfer_to_dict(tr, stats=stats)
_mirror_failures: list[MirrorFailureDict] = root.pop("mirror_failures", [])
_stats: list[StatsDict] = root.pop("stats", [])
yield root
Expand Down
8 changes: 4 additions & 4 deletions esrally/storage/_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ def average_speed(self) -> float:
return 0.0
return (done.size - self._resumed_size) / self.duration.s()

def pretty(self, *, stats: bool = False, mirror_failures: bool = False) -> dict[str, Any]:
def pretty(self, *, stats: bool = False) -> dict[str, Any]:
details: dict[str, Any] = {
"url": self.url,
"path": self.path,
Expand All @@ -712,14 +712,14 @@ def pretty(self, *, stats: bool = False, mirror_failures: bool = False) -> dict[
"duration": self.duration and pretty.duration(self.duration),
"throughput": self.average_speed and pretty.throughput(self.average_speed),
}
if mirror_failures:
if self.mirror_failures:
details["mirror_failures"] = [dataclasses.asdict(f) for f in self.mirror_failures]
if stats:
details["stats"] = [s.pretty() for s in self.stats]
return {k: v for k, v in details.items() if v}

def info(self, *, stats: bool = False, mirror_failures: bool = False) -> str:
return json.dumps(self.pretty(stats=stats, mirror_failures=mirror_failures), indent=2)
def info(self, *, stats: bool = False) -> str:
return json.dumps(self.pretty(stats=stats), indent=2)

def wait(self, timeout: float | None = None) -> None:
"""It waits for transfer termination."""
Expand Down