Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/ml/eval_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
from flyte.io import File

image = flyte.Image.from_debian_base(name="lightning-eval").with_pip_packages(
"lightning==2.6.1", "flyteplugins-pytorch==2.0.2"
"lightning==2.6.1", "flyteplugins-pytorch==2.0.3"
)

# Multi-node training: 2 nodes, 1 process per node
train_env = flyte.TaskEnvironment(
name="distributed-train",
resources=flyte.Resources(cpu=4, memory="25Gi", gpu="L4:1"),
resources=flyte.Resources(cpu=4, memory="25Gi", gpu="T4:1"),
plugin_config=Elastic(
nproc_per_node=1,
nnodes=2,
Expand Down
11 changes: 2 additions & 9 deletions examples/plugins/torch_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,11 @@

import flyte

# Install flyteplugins-torch from the wheel for development.
# In production, you would just specify the package name and version.
# from flyte._image import DIST_FOLDER, PythonWheels
# image = flyte.Image.from_debian_base(name="torch").clone(
# addl_layer=PythonWheels(wheel_dir=DIST_FOLDER, package_name="flyteplugins-pytorch", pre=True)
# )

image = flyte.Image.from_debian_base(name="torch").with_pip_packages("flyteplugins-pytorch")

torch_env = flyte.TaskEnvironment(
name="torch_env",
resources=flyte.Resources(cpu=(1, 2), memory=("1Gi", "2Gi")),
resources=flyte.Resources(cpu=(1, 2), memory=("1Gi", "2Gi"), gpu="T4:1"),
plugin_config=Elastic(
nproc_per_node=1,
# if you want to do local testing set nnodes=1
Expand Down Expand Up @@ -106,6 +99,6 @@ def torch_distributed_train(epochs: int) -> typing.Optional[float]:

if __name__ == "__main__":
flyte.init_from_config()
run = flyte.with_runcontext(mode="remote").run(torch_distributed_train, epochs=3)
run = flyte.with_runcontext(mode="remote").run(torch_distributed_train, epochs=1000)
print("run name:", run.name)
print("run url:", run.url)
20 changes: 20 additions & 0 deletions examples/scripts/hello.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""
Run with:

```bash
flyte run --follow python-script hello.py --output-dir output
```
"""

import os


def main():
print("Hello, world!")
os.makedirs("output", exist_ok=True)
with open("output/hello.txt", "w") as f:
f.write("Hello, file!")


if __name__ == "__main__":
main()
16 changes: 14 additions & 2 deletions src/flyte/_code_bundle/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
from ._ignore import GitIgnore, IgnoreGroup, StandardIgnore
from ._utils import CopyFiles
from .bundle import build_code_bundle, build_pkl_bundle, download_bundle
from .bundle import (
build_code_bundle,
build_code_bundle_from_relative_paths,
build_pkl_bundle,
download_bundle,
)

__all__ = ["CopyFiles", "build_code_bundle", "build_pkl_bundle", "default_ignores", "download_bundle"]
__all__ = [
"CopyFiles",
"build_code_bundle",
"build_code_bundle_from_relative_paths",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this an existing function, is it used for apps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

"build_pkl_bundle",
"default_ignores",
"download_bundle",
]


default_ignores = [GitIgnore, StandardIgnore, IgnoreGroup]
2 changes: 1 addition & 1 deletion src/flyte/_code_bundle/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from ._ignore import Ignore, IgnoreGroup, StandardIgnore

CopyFiles = Literal["loaded_modules", "all", "none"]
CopyFiles = Literal["loaded_modules", "all", "none", "custom"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow this... what does this do? Are we adding this to the public API now? how will users specify this on the command line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



def compress_scripts(source_path: str, destination: str, modules: List[ModuleType]):
Expand Down
6 changes: 6 additions & 0 deletions src/flyte/_debug/vscode.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ def prepare_launch_json(ctx: click.Context, pid: int):
ctx.params["version"],
"--run-base-dir",
ctx.params["run_base_dir"],
"--raw-data-path",
ctx.params["raw_data_path"],
"--checkpoint-path",
ctx.params["checkpoint_path"],
"--prev-checkpoint",
ctx.params["prev_checkpoint"],
"--name",
name,
"--run-name",
Expand Down
56 changes: 56 additions & 0 deletions src/flyte/_internal/resolvers/internal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""Generic resolver for internal Flyte tasks.

Stores an import path to a task-builder function and arbitrary keyword
arguments. At runtime ``load_task`` dynamically imports the builder and
calls it with the stored kwargs, recreating a lightweight task without
pickling. This is the same mechanism used by ``run_python_script`` and
can be reused for prefetch, custom bundling, and other internal tasks.
"""

import importlib
from pathlib import Path
from typing import Any, Dict, List, Optional

from flyte._internal.resolvers.common import Resolver
from flyte._task import TaskTemplate


class InternalTaskResolver(Resolver):
"""Resolve an internal task by dynamically importing its builder.

During serialization the resolver stores:

* ``task_builder`` - fully-qualified import path of a callable that
returns a :class:`TaskTemplate` (e.g.
``"flyte._run_python_script._build_script_runner_task"``).
* Arbitrary keyword arguments forwarded to the builder.

At runtime :meth:`load_task` re-imports the builder and calls it with
the stored kwargs.
"""

def __init__(self, task_builder: str = "", **kwargs: Any):
self._task_builder = task_builder
self._kwargs = kwargs

@property
def import_path(self) -> str:
return "flyte._internal.resolvers.internal.InternalTaskResolver"

def load_task(self, loader_args: List[str]) -> TaskTemplate:
args_iter = iter(loader_args)
parsed: Dict[str, str] = dict(zip(args_iter, args_iter))

builder_path = parsed.pop("task_builder")
module_path, func_name = builder_path.rsplit(".", 1)
module = importlib.import_module(module_path)
builder = getattr(module, func_name)

return builder(**parsed)

def loader_args(self, task: TaskTemplate, root_dir: Optional[Path] = None) -> List[str]:
args = ["task_builder", self._task_builder]
for key, value in self._kwargs.items():
if value is not None:
args.extend([key, str(value)])
return args
62 changes: 42 additions & 20 deletions src/flyte/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,14 @@ def __init__(
preserve_original_types: bool | None = None,
debug: bool = False,
_tracker: Any = None,
_bundle_relative_paths: tuple[str, ...] | None = None,
_bundle_from_dir: pathlib.Path | None = None,
):
from flyte._tools import ipython_check

self._tracker = _tracker
self._bundle_relative_paths = _bundle_relative_paths
self._bundle_from_dir = _bundle_from_dir
init_config = _get_init_config()
client = init_config.client if init_config else None
if not force_mode and client is not None:
Expand Down Expand Up @@ -169,7 +173,7 @@ async def _run_remote(self, obj: TaskTemplate[P, R, F] | LazyEntity, *args: P.ar
from flyte.remote import Run
from flyte.remote._task import LazyEntity, TaskDetails

from ._code_bundle import build_code_bundle, build_pkl_bundle
from ._code_bundle import build_code_bundle, build_code_bundle_from_relative_paths, build_pkl_bundle
from ._deploy import build_images
from ._internal.runtime.convert import convert_from_native_to_inputs
from ._internal.runtime.task_serde import translate_task_to_wire
Expand Down Expand Up @@ -223,16 +227,24 @@ async def _run_remote(self, obj: TaskTemplate[P, R, F] | LazyEntity, *args: P.ar
upload_to_controlplane=not self._dry_run,
copy_bundle_to=self._copy_bundle_to,
)
elif self._copy_files == "custom":
if not self._bundle_relative_paths or not self._bundle_from_dir:
raise ValueError("copy_style='custom' requires _bundle_relative_paths and _bundle_from_dir")
code_bundle = await build_code_bundle_from_relative_paths(
self._bundle_relative_paths,
from_dir=self._bundle_from_dir,
dryrun=self._dry_run,
copy_bundle_to=self._copy_bundle_to,
)
elif self._copy_files != "none":
code_bundle = await build_code_bundle(
from_dir=cfg.root_dir,
dryrun=self._dry_run,
copy_bundle_to=self._copy_bundle_to,
copy_style=self._copy_files,
)
else:
if self._copy_files != "none":
code_bundle = await build_code_bundle(
from_dir=cfg.root_dir,
dryrun=self._dry_run,
copy_bundle_to=self._copy_bundle_to,
copy_style=self._copy_files,
)
else:
code_bundle = None
code_bundle = None
if not self._disable_run_cache:
_RUN_CACHE[_CacheKey(obj_id=id(obj), dry_run=self._dry_run)] = _CacheValue(
code_bundle=code_bundle, image_cache=image_cache
Expand Down Expand Up @@ -435,7 +447,7 @@ async def _run_hybrid(self, obj: TaskTemplate[P, R, F], *args: P.args, **kwargs:
over the longer term we will productize this.
"""
import flyte.report
from flyte._code_bundle import build_code_bundle, build_pkl_bundle
from flyte._code_bundle import build_code_bundle, build_code_bundle_from_relative_paths, build_pkl_bundle
from flyte._deploy import build_images
from flyte.models import RawDataPath
from flyte.storage import ABFS, GCS, S3
Expand Down Expand Up @@ -469,16 +481,24 @@ async def _run_hybrid(self, obj: TaskTemplate[P, R, F], *args: P.args, **kwargs:
upload_to_controlplane=not self._dry_run,
copy_bundle_to=self._copy_bundle_to,
)
elif self._copy_files == "custom":
if not self._bundle_relative_paths or not self._bundle_from_dir:
raise ValueError("copy_style='custom' requires _bundle_relative_paths and _bundle_from_dir")
code_bundle = await build_code_bundle_from_relative_paths(
self._bundle_relative_paths,
from_dir=self._bundle_from_dir,
dryrun=self._dry_run,
copy_bundle_to=self._copy_bundle_to,
)
elif self._copy_files != "none":
code_bundle = await build_code_bundle(
from_dir=cfg.root_dir,
dryrun=self._dry_run,
copy_bundle_to=self._copy_bundle_to,
copy_style=self._copy_files,
)
else:
if self._copy_files != "none":
code_bundle = await build_code_bundle(
from_dir=cfg.root_dir,
dryrun=self._dry_run,
copy_bundle_to=self._copy_bundle_to,
copy_style=self._copy_files,
)
else:
code_bundle = None
code_bundle = None

version = self._version or (
code_bundle.computed_version if code_bundle and code_bundle.computed_version else None
Expand Down Expand Up @@ -802,6 +822,8 @@ async def example_task(x: int, y: str) -> str:
"""
if mode == "hybrid" and not name and not run_base_dir:
raise ValueError("Run name and run base dir are required for hybrid mode")
if copy_style == "custom":
raise ValueError("copy_style='custom' is not yet supported through with_runcontext.")
if copy_style == "none" and not version:
raise ValueError("Version is required when copy_style is 'none'")

Expand Down
Loading
Loading