Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions examples/basics/hello_v2_with_progress_bar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from tqdm.asyncio import tqdm

import flyte

env = flyte.TaskEnvironment(
name="hello_v2_with_progress_bar",
)


@env.task()
async def hello_worker(id: int) -> str:
ctx = flyte.ctx()
assert ctx is not None
return f"hello, my id is: {id} and I am being run by Action: {ctx.action}"


@env.task()
async def hello_driver(ids: list[int] = [1, 2, 3]) -> list[str]:
coros = []
with flyte.group("fanout-group"):
for id in ids:
coros.append(hello_worker(id))

vals = await tqdm.gather(*coros,desc="Running Tasks")

return vals


if __name__ == "__main__":
flyte.init_from_config()

run = flyte.run(hello_driver)
print(run.name)
print(run.url)
run.wait()
4 changes: 3 additions & 1 deletion maint_tools/build_default_image.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import argparse
import asyncio
from tqdm.asyncio import tqdm
from pathlib import Path

import flyte
Expand Down Expand Up @@ -72,9 +73,10 @@ async def build_flyte_connector_image(


async def build_all(registry: str | None = None, name: str | None = None, builder: str | None = "local"):
await asyncio.gather(
await tqdm.gather(
build_flyte_image(registry=registry, name=name, builder=builder),
build_flyte_connector_image(registry=registry, name=name, builder=builder),
desc="Building Images"
)


Expand Down
5 changes: 3 additions & 2 deletions maint_tools/build_plugin_image.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import argparse
import asyncio

from tqdm.asyncio import tqdm
import flyte
from flyte.extend import ImageBuildEngine

Expand Down Expand Up @@ -33,9 +33,10 @@ async def build_flyte_sglang_image(registry: str | None = None, name: str | None


async def build_all(registry: str | None = None, name: str | None = None, builder: str | None = "local"):
await asyncio.gather(
await tqdm.gather(
build_flyte_vllm_image(registry=registry, name=name, builder=builder),
build_flyte_sglang_image(registry=registry, name=name, builder=builder),
desc="Building plugin images"
)


Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies = [
"aiolimiter>=1.2.1",
"flyteidl2==2.0.2",
"packaging",
"tqdm>=4.67.3",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really prefer if you use rich progress if at all. No new dependencies please

]

[project.scripts]
Expand Down
9 changes: 5 additions & 4 deletions src/flyte/_deploy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
from tqdm.asyncio import tqdm
import hashlib
from dataclasses import dataclass
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple
Expand Down Expand Up @@ -396,7 +397,7 @@ async def _build_images(deployment: DeploymentPlan, image_refs: Dict[str, str] |
continue
auto_image = Image.from_debian_base()
images.append(_build_image_bg(env_name, auto_image))
final_images = await asyncio.gather(*images)
final_images = await tqdm.gather(*images,desc="Building Images")

for env_name, image_uri in final_images:
logger.warning(f"Built Image for environment {env_name}, image: {image_uri}")
Expand All @@ -417,7 +418,7 @@ async def _deploy_task_env(context: DeploymentContext) -> DeployedTaskEnvironmen
task_coros = []
for task in env.tasks.values():
task_coros.append(_deploy_task(task, context.serialization_context, dryrun=context.dryrun))
deployed_task_vals = await asyncio.gather(*task_coros)
deployed_task_vals = await tqdm.gather(*task_coros,desc="Deploying Task Environment")
deployed_tasks = []
for t in deployed_task_vals:
deployed_tasks.append(t)
Expand Down Expand Up @@ -466,7 +467,7 @@ async def apply(deployment_plan: DeploymentPlan, copy_style: CopyFiles, dryrun:
deployer = get_deployer(type(env))
context = DeploymentContext(environment=env, serialization_context=sc, dryrun=dryrun)
deployment_coros.append(deployer(context))
deployed_envs = await asyncio.gather(*deployment_coros)
deployed_envs = await tqdm.gather(*deployment_coros,desc="Deploying Environment")
envs = {}
for d in deployed_envs:
envs[d.get_name()] = d
Expand Down Expand Up @@ -533,7 +534,7 @@ async def deploy(
deployments = []
for deployment_plan in deployment_plans:
deployments.append(apply(deployment_plan, copy_style=copy_style, dryrun=dryrun))
return await asyncio.gather(*deployments)
return await tqdm.gather(*deployments,desc="Deployment in progress")


@syncify
Expand Down
3 changes: 2 additions & 1 deletion src/flyte/_serve.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
from tqdm.asyncio import tqdm
import atexit
import hashlib
import os
Expand Down Expand Up @@ -663,7 +664,7 @@ async def _serve_remote(self, app_env: "AppEnvironment") -> "App":
app_envs_to_deploy.append(dep_env)

# Deploy all apps concurrently
deployed_apps = await asyncio.gather(*deployment_coros)
deployed_apps = await tqdm.gather(*deployment_coros,desc=f"Deploying Apps")

# Find the deployed app corresponding to the requested app_env
deployed_app = None
Expand Down
3 changes: 2 additions & 1 deletion src/flyte/remote/_data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from tqdm.asyncio import tqdm
import hashlib
import os
import typing
Expand Down Expand Up @@ -234,7 +235,7 @@ async def upload_dir(dir_path: Path, verify: bool = True, prefix: str | None = N
if file.is_file():
uploaded_files.append(_upload_single_file(cfg, file, verify=verify, basedir=prefix))

urls = await asyncio.gather(*uploaded_files)
urls = await tqdm.gather(*uploaded_files,desc=f"Uploading files to {dir_path}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rendering this becomes a big challenge - we need to think about the rendering and overhead architecture of runtime vs cli

native_url = urls[0][1] # Assuming all files are uploaded to the same prefix
# native_url is of the form s3://my-s3-bucket/flytesnacks/development/{prefix}/source/empty.md
uri = native_url.split(prefix)[0]
Expand Down
Loading
Loading