From ca1e27638ee16aa87d1ac6cb7cc93641e21fddd6 Mon Sep 17 00:00:00 2001 From: ChethanUK Date: Sun, 25 May 2025 18:43:31 +0000 Subject: [PATCH 1/2] Upgrade Kubeflow Pipelines to version v2 sdk --- .gitignore | 4 + dev-requirements.txt | 16 +- setup.py | 3 +- .../pipelines/kfp/advanced_pipeline.py | 58 +- .../examples/pipelines/kfp/dist_pipeline.py | 41 +- .../kfp/dist_pipeline_v2_enhanced.py | 176 ++++++ .../examples/pipelines/kfp/intro_pipeline.py | 33 +- .../pipelines/kfp/task_configs_pipeline.py | 195 +++++++ .../pipelines/kfp/test/kfp_pipeline_test.py | 12 + torchx/pipelines/kfp/__init__.py | 9 +- torchx/pipelines/kfp/adapter.py | 519 ++++++++++-------- torchx/pipelines/kfp/test/adapter_test.py | 235 +++++--- .../pipelines/kfp/test/adapter_v2_it_test.py | 507 +++++++++++++++++ .../kfp/test/adapter_v2_spec_test.py | 515 +++++++++++++++++ torchx/pipelines/kfp/test/version_test.py | 14 +- 15 files changed, 1969 insertions(+), 368 deletions(-) create mode 100644 torchx/examples/pipelines/kfp/dist_pipeline_v2_enhanced.py create mode 100644 torchx/examples/pipelines/kfp/task_configs_pipeline.py create mode 100644 torchx/pipelines/kfp/test/adapter_v2_it_test.py create mode 100644 torchx/pipelines/kfp/test/adapter_v2_spec_test.py diff --git a/.gitignore b/.gitignore index fa081af73..d393b49cf 100644 --- a/.gitignore +++ b/.gitignore @@ -141,3 +141,7 @@ wordlist.dic pipeline.yaml /codecov +.python-version +enhanced_distributed_pipeline.yaml +task_configs_pipeline.yaml +local_outputs diff --git a/dev-requirements.txt b/dev-requirements.txt index 474edd3bb..12acaa5d2 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -12,9 +12,10 @@ google-cloud-logging==3.10.0 google-cloud-runtimeconfig==0.34.0 hydra-core ipython -kfp==1.8.22 -# pin protobuf to the version that is required by kfp -protobuf==3.20.3 +kfp>=2.8.0 +kfp-kubernetes>=1.4.0 # For Kubernetes-specific features in KFP v2 +# kfp v2 is compatible with protobuf 4+ +protobuf>=4.21.0 mlflow-skinny moto~=5.0.8 pyre-extensions @@ -40,8 +41,7 @@ lintrunner-adapters # reduce backtracking -grpcio==1.62.1 -grpcio-status==1.48.1 -googleapis-common-protos==1.63.0 -google-api-core==2.18.0 -protobuf==3.20.3 # kfp==1.8.22 needs protobuf < 4 +grpcio>=1.62.1 +grpcio-status>=1.48.1 +googleapis-common-protos>=1.63.0 +google-api-core>=2.18.0 diff --git a/setup.py b/setup.py index 3a37b67b7..501e60f32 100644 --- a/setup.py +++ b/setup.py @@ -87,7 +87,8 @@ def get_nightly_version(): "google-cloud-logging>=3.0.0", "google-cloud-runtimeconfig>=0.33.2", ], - "kfp": ["kfp==1.6.2"], + # KFP 2.0+ is not supported yet, see https://github.com/pytorch/torchx/issues/123 + "kfp": ["kfp>=2.8.0"], # optional: required for Kubeflow Pipelines integration "kubernetes": ["kubernetes>=11"], "ray": ["ray>=1.12.1"], "dev": dev_reqs, diff --git a/torchx/examples/pipelines/kfp/advanced_pipeline.py b/torchx/examples/pipelines/kfp/advanced_pipeline.py index e84e6ee52..cef35acb1 100755 --- a/torchx/examples/pipelines/kfp/advanced_pipeline.py +++ b/torchx/examples/pipelines/kfp/advanced_pipeline.py @@ -28,7 +28,8 @@ import sys from typing import Dict -import kfp +from kfp import dsl +from kfp import compiler import torchx from torchx import specs from torchx.components.dist import ddp as dist_ddp @@ -36,7 +37,6 @@ from torchx.components.utils import copy as utils_copy, python as utils_python from torchx.pipelines.kfp.adapter import container_from_app - parser = argparse.ArgumentParser(description="example kfp pipeline") # %% @@ -238,48 +238,54 @@ # cluster. # # The KFP adapter currently doesn't track the input and outputs so the -# containers need to have their dependencies specified via `.after()`. +# containers need to have their dependencies specified. # -# We call `.set_tty()` to make the logs from the components more responsive for -# example purposes. +# We no longer need to call `.set_tty()` as that was a v1 feature. +@dsl.pipeline( + name="TorchX Advanced Pipeline", + description="Advanced KFP pipeline with TorchX components" +) def pipeline() -> None: - # container_from_app creates a KFP container from the TorchX app + # container_from_app creates a KFP v2 task from the TorchX app # definition. - copy = container_from_app(copy_app) - copy.container.set_tty() + copy_task = container_from_app(copy_app) + copy_task.set_display_name("Download Data") - datapreproc = container_from_app(datapreproc_app) - datapreproc.container.set_tty() - datapreproc.after(copy) + datapreproc_task = container_from_app(datapreproc_app) + datapreproc_task.set_display_name("Preprocess Data") + # In KFP v2, dependencies are automatically handled based on data flow + # If you need explicit dependencies, you need to pass outputs as inputs + datapreproc_task.after(copy_task) # For the trainer we want to log that UI metadata so you can access # tensorboard from the UI. - trainer = container_from_app(trainer_app, ui_metadata=ui_metadata) - trainer.container.set_tty() - trainer.after(datapreproc) + trainer_task = container_from_app(trainer_app, ui_metadata=ui_metadata) + trainer_task.set_display_name("Train Model") + trainer_task.after(datapreproc_task) if False: - serve = container_from_app(serve_app) - serve.container.set_tty() - serve.after(trainer) + serve_task = container_from_app(serve_app) + serve_task.set_display_name("Serve Model") + serve_task.after(trainer_task) if False: # Serve and interpret only require the trained model so we can run them # in parallel to each other. - interpret = container_from_app(interpret_app) - interpret.container.set_tty() - interpret.after(trainer) + interpret_task = container_from_app(interpret_app) + interpret_task.set_display_name("Interpret Model") + interpret_task.after(trainer_task) -kfp.compiler.Compiler().compile( - pipeline_func=pipeline, - package_path="pipeline.yaml", -) +if __name__ == "__main__": + compiler.Compiler().compile( + pipeline_func=pipeline, + package_path="pipeline.yaml", + ) -with open("pipeline.yaml", "rt") as f: - print(f.read()) + with open("pipeline.yaml", "rt") as f: + print(f.read()) # %% # Once this has all run you should have a pipeline file (typically diff --git a/torchx/examples/pipelines/kfp/dist_pipeline.py b/torchx/examples/pipelines/kfp/dist_pipeline.py index 4cf8f2e05..6a48a87b7 100755 --- a/torchx/examples/pipelines/kfp/dist_pipeline.py +++ b/torchx/examples/pipelines/kfp/dist_pipeline.py @@ -12,17 +12,22 @@ ====================================== This is an example KFP pipeline that uses resource_from_app to launch a -distributed operator using the kubernetes/volcano job scheduler. This only works +distributed job using the kubernetes/volcano job scheduler. This only works in Kubernetes KFP clusters with https://volcano.sh/en/docs/ installed on them. """ -import kfp +from kfp import dsl +from kfp import compiler from torchx import specs from torchx.pipelines.kfp.adapter import resource_from_app +@dsl.pipeline( + name="distributed-pipeline", + description="A distributed pipeline using Volcano job scheduler" +) def pipeline() -> None: - # First we define our AppDef for the component, we set + # First we define our AppDef for the component echo_app = specs.AppDef( name="test-dist", roles=[ @@ -36,31 +41,39 @@ def pipeline() -> None: ], ) - # To convert the TorchX AppDef into a KFP container we use - # the resource_from_app adapter. This takes generates a KFP Kubernetes - # resource operator definition from the TorchX app def and instantiates it. - echo_container: kfp.dsl.BaseOp = resource_from_app(echo_app, queue="default") + # To convert the TorchX AppDef into a KFP v2 task that creates + # a Volcano job, we use the resource_from_app adapter. + # This generates a task that uses kubectl to create the Volcano job. + echo_task = resource_from_app(echo_app, queue="default") + + # Set display name for better visualization + echo_task.set_display_name("Distributed Echo Job") # %% # To generate the pipeline definition file we need to call into the KFP compiler # with our pipeline function. -kfp.compiler.Compiler().compile( - pipeline_func=pipeline, - package_path="pipeline.yaml", -) +if __name__ == "__main__": + compiler.Compiler().compile( + pipeline_func=pipeline, + package_path="pipeline.yaml", + ) -with open("pipeline.yaml", "rt") as f: - print(f.read()) + with open("pipeline.yaml", "rt") as f: + print(f.read()) # %% # Once this has all run you should have a pipeline file (typically # pipeline.yaml) that you can upload to your KFP cluster via the UI or # a kfp.Client. # +# Note: In KFP v2, for more advanced Kubernetes resource manipulation, +# consider using the kfp-kubernetes extension library which provides +# better integration with Kubernetes resources. +# # See the -# `KFP SDK Examples `_ +# `KFP SDK Examples `_ # for more info on launching KFP pipelines. # %% diff --git a/torchx/examples/pipelines/kfp/dist_pipeline_v2_enhanced.py b/torchx/examples/pipelines/kfp/dist_pipeline_v2_enhanced.py new file mode 100644 index 000000000..c8d6b9c17 --- /dev/null +++ b/torchx/examples/pipelines/kfp/dist_pipeline_v2_enhanced.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +""" +Enhanced Distributed Pipeline Example for KFP v2 + +This example demonstrates advanced KFP v2 features including: +- Using kfp-kubernetes for better Kubernetes integration +- Task configuration options (display names, retries, caching) +- Volume mounting for distributed training +- Resource specifications with GPU support +""" + +import argparse +from kfp import dsl, compiler +from kfp import kubernetes # Using kfp-kubernetes extension +from torchx.pipelines.kfp.adapter import container_from_app, resource_from_app +from torchx import specs + + +def main(args: argparse.Namespace) -> None: + # Create distributed training app + ddp_app = specs.AppDef( + name="distributed-trainer", + roles=[ + specs.Role( + name="trainer", + entrypoint="bash", + args=[ + "-c", + "echo 'Starting distributed training...'; " + "echo 'Node rank: $RANK'; " + "echo 'World size: $WORLD_SIZE'; " + "python -m torch.distributed.run --nproc_per_node=2 train.py" + ], + env={ + "MASTER_ADDR": "distributed-trainer-0", + "MASTER_PORT": "29500", + }, + num_replicas=3, + image="pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime", + resource=specs.Resource( + cpu=4, + memMB=8192, + gpu=2, + ), + ) + ], + ) + + # Create data preprocessing app + preprocess_app = specs.AppDef( + name="data-preprocessor", + roles=[ + specs.Role( + name="preprocessor", + entrypoint="python", + args=["-m", "preprocess", "--input", "/data/raw", "--output", "/data/processed"], + env={"DATA_FORMAT": "tfrecord"}, + num_replicas=1, + image="pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime", + resource=specs.Resource( + cpu=2, + memMB=4096, + gpu=0, + ), + ) + ], + ) + + # Create model evaluation app + eval_app = specs.AppDef( + name="model-evaluator", + roles=[ + specs.Role( + name="evaluator", + entrypoint="python", + args=["-m", "evaluate", "--model", "/models/latest", "--data", "/data/test"], + env={"METRICS_OUTPUT": "/metrics/eval.json"}, + num_replicas=1, + image="pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime", + resource=specs.Resource( + cpu=2, + memMB=4096, + gpu=1, + ), + ) + ], + ) + + @dsl.pipeline( + name="enhanced-distributed-pipeline", + description="Enhanced distributed ML pipeline with KFP v2 features" + ) + def pipeline(): + # Create persistent volume for data sharing + pvc = kubernetes.CreatePVC( + pvc_name_suffix='-shared-data', + access_modes=['ReadWriteMany'], + size='50Gi', + storage_class_name='standard', + ) + + # Data preprocessing step + preprocess_task = container_from_app( + preprocess_app, + display_name="Data Preprocessing", + retry_policy={ + 'max_retry_count': 3, + 'backoff_duration': '60s', + 'backoff_factor': 2, + }, + enable_caching=True, + ) + + # Mount volume for preprocessing + kubernetes.mount_pvc( + preprocess_task, + pvc_name=pvc.outputs['name'], + mount_path='/data', + ) + + # Distributed training using Volcano + train_task = resource_from_app( + ddp_app, + queue="training-queue", + service_account="ml-training-sa", + priority_class="high-priority", + ) + train_task.set_display_name("Distributed Training") + train_task.after(preprocess_task) + + # Model evaluation + eval_task = container_from_app( + eval_app, + display_name="Model Evaluation", + enable_caching=False, # Don't cache evaluation results + ) + eval_task.after(train_task) + + # Mount volume for evaluation + kubernetes.mount_pvc( + eval_task, + pvc_name=pvc.outputs['name'], + mount_path='/data', + ) + + # Clean up PVC after evaluation + delete_pvc = kubernetes.DeletePVC( + pvc_name=pvc.outputs['name'] + ).after(eval_task) + delete_pvc.set_display_name("Cleanup Shared Storage") + + # Compile the pipeline + compiler.Compiler().compile( + pipeline_func=pipeline, + package_path=args.output_path + ) + print(f"Pipeline compiled to: {args.output_path}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Enhanced distributed pipeline example") + parser.add_argument( + "--output_path", + type=str, + default="enhanced_distributed_pipeline.yaml", + help="Path to save the compiled pipeline", + ) + + args = parser.parse_args() + main(args) diff --git a/torchx/examples/pipelines/kfp/intro_pipeline.py b/torchx/examples/pipelines/kfp/intro_pipeline.py index 07130b338..b61c4a551 100755 --- a/torchx/examples/pipelines/kfp/intro_pipeline.py +++ b/torchx/examples/pipelines/kfp/intro_pipeline.py @@ -21,18 +21,23 @@ TorchX tries to leverage standard mechanisms wherever possible. For KFP we use the existing KFP pipeline definition syntax and add a single -`component_from_app` conversion step to convert a TorchX component into one +`container_from_app` conversion step to convert a TorchX component into one KFP can understand. Typically you have a separate component file but for this example we define the AppDef inline. """ -import kfp +from kfp import dsl +from kfp import compiler from torchx import specs from torchx.pipelines.kfp.adapter import container_from_app +@dsl.pipeline( + name="intro-pipeline", + description="An introductory pipeline using TorchX components" +) def pipeline() -> None: # First we define our AppDef for the component. AppDef is a core part of TorchX # and can be used to describe complex distributed multi container apps or @@ -50,22 +55,26 @@ def pipeline() -> None: ) # To convert the TorchX AppDef into a KFP container we use - # the container_from_app adapter. This takes generates a KFP component - # definition from the TorchX app def and instantiates it into a container. - echo_container: kfp.dsl.ContainerOp = container_from_app(echo_app) + # the container_from_app adapter. This generates a KFP v2 component + # definition from the TorchX app def and instantiates it into a container task. + echo_container = container_from_app(echo_app) + + # In KFP v2, you can set display name for better visualization + echo_container.set_display_name("Echo Hello TorchX") # %% # To generate the pipeline definition file we need to call into the KFP compiler # with our pipeline function. -kfp.compiler.Compiler().compile( - pipeline_func=pipeline, - package_path="pipeline.yaml", -) +if __name__ == "__main__": + compiler.Compiler().compile( + pipeline_func=pipeline, + package_path="pipeline.yaml", + ) -with open("pipeline.yaml", "rt") as f: - print(f.read()) + with open("pipeline.yaml", "rt") as f: + print(f.read()) # %% # Once this has all run you should have a pipeline file (typically @@ -73,7 +82,7 @@ def pipeline() -> None: # a kfp.Client. # # See the -# `KFP SDK Examples `_ +# `KFP SDK Examples `_ # for more info on launching KFP pipelines. # %% diff --git a/torchx/examples/pipelines/kfp/task_configs_pipeline.py b/torchx/examples/pipelines/kfp/task_configs_pipeline.py new file mode 100644 index 000000000..20ed514a4 --- /dev/null +++ b/torchx/examples/pipelines/kfp/task_configs_pipeline.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +""" +Task Configuration Pipeline Example for KFP v2 + +This example demonstrates all available task configuration options in KFP v2: +- Display names +- Resource limits (CPU, memory, GPU/accelerator) +- Environment variables +- Retry policies +- Caching options +""" + +import argparse +from kfp import dsl, compiler +from torchx import specs +from torchx.pipelines.kfp.adapter import container_from_app + + +def main(args: argparse.Namespace) -> None: + # Create various apps to demonstrate different configurations + + # Basic CPU task + cpu_app = specs.AppDef( + name="cpu-task", + roles=[ + specs.Role( + name="worker", + entrypoint="python", + args=["-c", "print('CPU task running'); import time; time.sleep(5)"], + image="python:3.9-slim", + resource=specs.Resource(cpu=2, memMB=2048, gpu=0), + ) + ], + ) + + # GPU task + gpu_app = specs.AppDef( + name="gpu-task", + roles=[ + specs.Role( + name="trainer", + entrypoint="python", + args=[ + "-c", + "import torch; print(f'GPU available: {torch.cuda.is_available()}'); " + "print(f'GPU count: {torch.cuda.device_count()}')" + ], + image="pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime", + resource=specs.Resource(cpu=4, memMB=8192, gpu=1), + ) + ], + ) + + # Task with environment variables + env_app = specs.AppDef( + name="env-task", + roles=[ + specs.Role( + name="worker", + entrypoint="python", + args=[ + "-c", + "import os; " + "print(f'MODEL_NAME={os.getenv(\"MODEL_NAME\")}'); " + "print(f'BATCH_SIZE={os.getenv(\"BATCH_SIZE\")}'); " + "print(f'LEARNING_RATE={os.getenv(\"LEARNING_RATE\")}');" + ], + env={ + "MODEL_NAME": "resnet50", + "BATCH_SIZE": "32", + "LEARNING_RATE": "0.001", + }, + image="python:3.9-slim", + resource=specs.Resource(cpu=1, memMB=1024, gpu=0), + ) + ], + ) + + # Task that might fail (for retry demonstration) + flaky_app = specs.AppDef( + name="flaky-task", + roles=[ + specs.Role( + name="worker", + entrypoint="python", + args=[ + "-c", + "import random; import sys; " + "success = random.random() > 0.7; " # 70% failure rate + "print(f'Attempt result: {\"SUCCESS\" if success else \"FAILURE\"}'); " + "sys.exit(0 if success else 1);" + ], + image="python:3.9-slim", + resource=specs.Resource(cpu=1, memMB=512, gpu=0), + ) + ], + ) + + @dsl.pipeline( + name="task-configuration-demo", + description="Demonstrates all KFP v2 task configuration options" + ) + def pipeline(): + # Basic CPU task with display name + cpu_task = container_from_app( + cpu_app, + display_name="CPU Processing Task", + enable_caching=True, + ) + + # GPU task with custom accelerator configuration + gpu_task = container_from_app( + gpu_app, + display_name="GPU Training Task", + enable_caching=False, # Don't cache GPU tasks + ) + # Note: GPU settings are automatically applied from the resource spec + # But you can override the accelerator type if needed: + # gpu_task.set_accelerator_type('nvidia-tesla-v100') + + # Task with environment variables + env_task = container_from_app( + env_app, + display_name="Environment Variables Demo", + ) + # Add additional runtime environment variables + env_task.set_env_variable('RUNTIME_VAR', 'runtime_value') + env_task.set_env_variable('EXPERIMENT_ID', 'exp-001') + + # Flaky task with retry policy + flaky_task = container_from_app( + flaky_app, + display_name="Flaky Task with Retries", + retry_policy={ + 'max_retry_count': 5, + 'backoff_duration': '30s', + 'backoff_factor': 2, + 'backoff_max_duration': '300s', + }, + enable_caching=False, # Don't cache flaky tasks + ) + + # Set task dependencies + gpu_task.after(cpu_task) + env_task.after(cpu_task) + flaky_task.after(gpu_task, env_task) + + # Additional task configurations + + # Set resource requests/limits explicitly (override defaults) + cpu_task.set_cpu_request('1') + cpu_task.set_memory_request('1Gi') + + # Chain multiple configurations + (gpu_task + .set_env_variable('CUDA_VISIBLE_DEVICES', '0') + .set_env_variable('TORCH_CUDA_ARCH_LIST', '7.0;7.5;8.0')) + + # Compile the pipeline + compiler.Compiler().compile( + pipeline_func=pipeline, + package_path=args.output_path + ) + print(f"Pipeline compiled to: {args.output_path}") + + # Print some helpful information + print("\nTask Configuration Features Demonstrated:") + print("1. Display names for better UI visualization") + print("2. CPU and memory resource requests/limits") + print("3. GPU/accelerator configuration") + print("4. Environment variables (from AppDef and runtime)") + print("5. Retry policies with exponential backoff") + print("6. Caching control per task") + print("7. Task dependencies and execution order") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Task configuration demonstration pipeline" + ) + parser.add_argument( + "--output_path", + type=str, + default="task_configs_pipeline.yaml", + help="Path to save the compiled pipeline", + ) + + args = parser.parse_args() + main(args) diff --git a/torchx/examples/pipelines/kfp/test/kfp_pipeline_test.py b/torchx/examples/pipelines/kfp/test/kfp_pipeline_test.py index f4d26da4b..76695ef98 100644 --- a/torchx/examples/pipelines/kfp/test/kfp_pipeline_test.py +++ b/torchx/examples/pipelines/kfp/test/kfp_pipeline_test.py @@ -44,3 +44,15 @@ def test_dist_pipeline(self) -> None: from torchx.examples.pipelines.kfp import dist_pipeline # noqa: F401 self.assertTrue(os.path.exists("pipeline.yaml")) + + def test_task_configs_pipeline(self) -> None: + sys.argv = ["task_configs_pipeline.py"] + from torchx.examples.pipelines.kfp import task_configs_pipeline # noqa: F401 + + self.assertTrue(os.path.exists("task_configs_pipeline.yaml")) + + def test_dist_pipeline_v2_enhanced(self) -> None: + sys.argv = ["dist_pipeline_v2_enhanced.py"] + from torchx.examples.pipelines.kfp import dist_pipeline_v2_enhanced # noqa: F401 + + self.assertTrue(os.path.exists("enhanced_distributed_pipeline.yaml")) diff --git a/torchx/pipelines/kfp/__init__.py b/torchx/pipelines/kfp/__init__.py index 1adeede50..30311c266 100644 --- a/torchx/pipelines/kfp/__init__.py +++ b/torchx/pipelines/kfp/__init__.py @@ -18,12 +18,13 @@ import kfp from .version import __version__ as __version__ # noqa F401 - +import warnings def _check_kfp_version() -> None: - if not kfp.__version__.startswith("1."): - raise ImportError( - f"Only kfp version 1.x.x is supported! kfp version {kfp.__version__}" + if kfp.__version__.startswith("1."): + warnings.warn( + f"KFP version 1.x.x is deprecated! Please upgrade to kfp version 2.x.x. Current version: {kfp.__version__}", + DeprecationWarning ) diff --git a/torchx/pipelines/kfp/adapter.py b/torchx/pipelines/kfp/adapter.py index 427f25f44..8e5d3f1f4 100644 --- a/torchx/pipelines/kfp/adapter.py +++ b/torchx/pipelines/kfp/adapter.py @@ -7,247 +7,276 @@ # pyre-strict +""" +This module contains adapters for converting TorchX components to +Kubeflow Pipeline (KFP) v2 components. +""" + import json -import os -import os.path -import shlex -from typing import Mapping, Optional, Tuple +from typing import Any, Dict, Optional, Tuple import yaml -from kfp import components, dsl - -# @manual=fbsource//third-party/pypi/kfp:kfp -from kfp.components.structures import ComponentSpec, OutputSpec -from kubernetes.client.models import ( - V1ContainerPort, - V1EmptyDirVolumeSource, - V1Volume, - V1VolumeMount, -) -from torchx.schedulers.kubernetes_scheduler import app_to_resource, pod_labels -from torchx.specs import api -from typing_extensions import Protocol - -from .version import __version__ as __version__ # noqa F401 - - -def component_spec_from_app(app: api.AppDef) -> Tuple[str, api.Role]: - """ - component_spec_from_app takes in a TorchX component and generates the yaml - spec for it. Notably this doesn't apply resources or port_maps since those - must be applied at runtime which is why it returns the role spec as well. - - >>> from torchx import specs - >>> from torchx.pipelines.kfp.adapter import component_spec_from_app - >>> app_def = specs.AppDef( - ... name="trainer", - ... roles=[specs.Role("trainer", image="foo:latest")], - ... ) - >>> component_spec_from_app(app_def) - ('description: ...', Role(...)) - """ - assert len(app.roles) == 1, f"KFP adapter only support one role, got {app.roles}" - - role = app.roles[0] - assert ( - role.num_replicas - == 1 - # pyre-fixme[16]: `AppDef` has no attribute `num_replicas`. - ), f"KFP adapter only supports one replica, got {app.num_replicas}" +from kfp import dsl +from kfp.dsl import ContainerSpec, OutputPath, PipelineTask - command = [role.entrypoint, *role.args] +import torchx.specs as api +from torchx.schedulers import kubernetes_scheduler - spec = { - "name": f"{app.name}-{role.name}", - "description": f"KFP wrapper for TorchX component {app.name}, role {role.name}", - "implementation": { - "container": { - "image": role.image, - "command": command, - "env": role.env, - } - }, - "outputs": [], - } - return yaml.dump(spec), role - - -class ContainerFactory(Protocol): - """ - ContainerFactory is a protocol that represents a function that when called produces a - kfp.dsl.ContainerOp. - """ - def __call__(self, *args: object, **kwargs: object) -> dsl.ContainerOp: ... - - -class KFPContainerFactory(ContainerFactory, Protocol): - """ - KFPContainerFactory is a ContainerFactory that also has some KFP metadata - attached to it. - """ - - component_spec: ComponentSpec - - -METADATA_FILE = "/tmp/outputs/mlpipeline-ui-metadata/data.json" +# Metadata Template for TorchX components +UI_METADATA_TEMPLATE = """ +import json +metadata = {metadata_json} +with open("{output_path}", "w") as f: + json.dump(metadata, f) +""" def component_from_app( - app: api.AppDef, ui_metadata: Optional[Mapping[str, object]] = None -) -> ContainerFactory: + app: api.AppDef, + ui_metadata: Optional[Dict[str, Any]] = None, +) -> Any: """ - component_from_app takes in a TorchX component/AppDef and returns a KFP - ContainerOp factory. This is equivalent to the - `kfp.components.load_component_from_* - `_ - methods. + component_from_app creates a KFP v2 component from a TorchX AppDef. + + In KFP v2, we use container components for single-container apps. + For multi-role apps, we use the first role as the primary container. Args: - app: The AppDef to generate a KFP container factory for. - ui_metadata: KFP UI Metadata to output so you can have model results show - up in the UI. See - https://www.kubeflow.org/docs/components/pipelines/legacy-v1/sdk/output-viewer/ - for more info on the format. - - >>> from torchx import specs - >>> from torchx.pipelines.kfp.adapter import component_from_app - >>> app_def = specs.AppDef( - ... name="trainer", - ... roles=[specs.Role("trainer", image="foo:latest")], - ... ) - >>> component_from_app(app_def) - + app: The torchx AppDef to adapt. + ui_metadata: optional UI metadata to attach to the component. + + Returns: + A KFP v2 component function. + + Note: + KFP v2 uses a different component structure than v1. This function + returns a component that can be used within a pipeline function. + + Example: + >>> from torchx import specs + >>> from torchx.pipelines.kfp.adapter import component_from_app + >>> from kfp import dsl + >>> + >>> app_def = specs.AppDef( + ... name="trainer", + ... roles=[specs.Role( + ... name="trainer", + ... image="pytorch/pytorch:latest", + ... entrypoint="python", + ... args=["-m", "train", "--epochs", "10"], + ... env={"CUDA_VISIBLE_DEVICES": "0"}, + ... resource=specs.Resource(cpu=2, memMB=8192, gpu=1) + ... )], + ... ) + >>> trainer_component = component_from_app(app_def) + >>> + >>> @dsl.pipeline(name="training-pipeline") + >>> def my_pipeline(): + ... trainer_task = container_from_app(app_def) + ... trainer_task.set_display_name("Model Training") """ - - role_spec: api.Role - spec, role_spec = component_spec_from_app(app) - resources: api.Resource = role_spec.resource - assert ( - len(resources.capabilities) == 0 - ), f"KFP doesn't support capabilities, got {resources.capabilities}" - component_factory: KFPContainerFactory = components.load_component_from_text(spec) - - if ui_metadata is not None: - # pyre-fixme[16]: `ComponentSpec` has no attribute `outputs` - component_factory.component_spec.outputs.append( - OutputSpec( - name="mlpipeline-ui-metadata", - type="MLPipeline UI Metadata", - description="ui metadata", - ) + if len(app.roles) > 1: + raise ValueError( + f"KFP adapter does not support apps with more than one role. " + f"AppDef has roles: {[r.name for r in app.roles]}" ) - - def factory_wrapper(*args: object, **kwargs: object) -> dsl.ContainerOp: - c = component_factory(*args, **kwargs) - container = c.container - - if ui_metadata is not None: - # We generate the UI metadata from the sidecar so we need to make - # both the container and the sidecar share the same tmp directory so - # the outputs appear in the original container. - c.add_volume(V1Volume(name="tmp", empty_dir=V1EmptyDirVolumeSource())) - container.add_volume_mount( - V1VolumeMount( - name="tmp", - mount_path="/tmp/", - ) - ) - c.output_artifact_paths["mlpipeline-ui-metadata"] = METADATA_FILE - c.add_sidecar(_ui_metadata_sidecar(ui_metadata)) - - cpu = resources.cpu - if cpu >= 0: - cpu_str = f"{int(cpu*1000)}m" - container.set_cpu_request(cpu_str) - container.set_cpu_limit(cpu_str) - mem = resources.memMB - if mem >= 0: - mem_str = f"{int(mem)}M" - container.set_memory_request(mem_str) - container.set_memory_limit(mem_str) - gpu = resources.gpu - if gpu > 0: - container.set_gpu_limit(str(gpu)) - - for name, port in role_spec.port_map.items(): - container.add_port( - V1ContainerPort( - name=name, - container_port=port, - ), - ) - - c.pod_labels.update(pod_labels(app, 0, role_spec, 0, app.name)) - - return c - - return factory_wrapper - - -def _ui_metadata_sidecar( - ui_metadata: Mapping[str, object], image: str = "alpine" -) -> dsl.Sidecar: - shell_encoded = shlex.quote(json.dumps(ui_metadata)) - dirname = os.path.dirname(METADATA_FILE) - return dsl.Sidecar( - name="ui-metadata-sidecar", - image=image, - command=[ - "sh", - "-c", - f"mkdir -p {dirname}; echo {shell_encoded} > {METADATA_FILE}", - ], - mirror_volume_mounts=True, - ) + + role = app.roles[0] + + @dsl.container_component + def torchx_component(mlpipeline_ui_metadata: OutputPath(str) = None) -> ContainerSpec: + """KFP v2 wrapper for TorchX component.""" + # Basic container spec + container_spec_dict = { + "image": role.image, + } + + # Build command and args + command = [] + if role.entrypoint: + command.append(role.entrypoint) + if role.args: + command.extend(role.args) + + # Set command or args + if role.entrypoint and role.args: + # If both entrypoint and args exist, use command for full command line + container_spec_dict["command"] = command + elif role.entrypoint: + # If only entrypoint exists, use it as command + container_spec_dict["command"] = [role.entrypoint] + elif role.args: + # If only args exist, use them as args + container_spec_dict["args"] = list(role.args) + + # Handle UI metadata if provided + if ui_metadata and mlpipeline_ui_metadata: + metadata_json = json.dumps(ui_metadata) + metadata_cmd = f'echo \'{metadata_json}\' > {mlpipeline_ui_metadata}' + + # If there's an existing command, wrap it with metadata writing + if "command" in container_spec_dict: + existing_cmd = ' '.join(container_spec_dict["command"]) + container_spec_dict["command"] = ["sh", "-c", f"{metadata_cmd} && {existing_cmd}"] + else: + container_spec_dict["command"] = ["sh", "-c", metadata_cmd] + + return ContainerSpec(**container_spec_dict) + + # Set component metadata + torchx_component._component_human_name = f"{app.name}-{role.name}" + torchx_component._component_description = f"KFP v2 wrapper for TorchX component {app.name}, role {role.name}" + + # Store role resource info as a component attribute so container_from_app can use it + torchx_component._torchx_role = role + + return torchx_component + + +# Alias for clarity - matches the naming in adapter_v23.py +component_from_app_def = component_from_app def container_from_app( app: api.AppDef, - *args: object, - ui_metadata: Optional[Mapping[str, object]] = None, - **kwargs: object, -) -> dsl.ContainerOp: + *args: Any, + ui_metadata: Optional[Dict[str, Any]] = None, + display_name: Optional[str] = None, + retry_policy: Optional[Dict[str, Any]] = None, + enable_caching: bool = True, + accelerator_type: Optional[str] = None, + **kwargs: Any, +) -> PipelineTask: """ - container_from_app transforms the app into a KFP component and returns a - corresponding ContainerOp instance. + container_from_app transforms the app into a KFP v2 component and returns a + corresponding PipelineTask instance when called within a pipeline. - See component_from_app for description on the arguments. Any unspecified - arguments are passed through to the KFP container factory method. + Args: + app: The torchx AppDef to adapt. + ui_metadata: optional UI metadata to attach to the component. + display_name: optional display name for the task in the KFP UI. + retry_policy: optional retry configuration dict with 'max_retry_count' and/or 'backoff_duration' keys. + enable_caching: whether to enable caching for this task (default: True). + accelerator_type: optional accelerator type (e.g., 'nvidia-tesla-v100', 'nvidia-tesla-k80'). + *args: positional arguments passed to the component. + **kwargs: keyword arguments passed to the component. - >>> import kfp - >>> from torchx import specs - >>> from torchx.pipelines.kfp.adapter import container_from_app - >>> app_def = specs.AppDef( - ... name="trainer", - ... roles=[specs.Role("trainer", image="foo:latest")], - ... ) - >>> def pipeline(): - ... trainer = container_from_app(app_def) - ... print(trainer) - >>> kfp.compiler.Compiler().compile( - ... pipeline_func=pipeline, - ... package_path="/tmp/pipeline.yaml", - ... ) - {'ContainerOp': {... 'name': 'trainer-trainer', ...}} + Returns: + A configured PipelineTask instance. + + See component_from_app for description on the arguments. Any unspecified + arguments are passed through to the component. + + Example: + >>> import kfp + >>> from kfp import dsl + >>> from torchx import specs + >>> from torchx.pipelines.kfp.adapter import container_from_app + >>> + >>> app_def = specs.AppDef( + ... name="trainer", + ... roles=[specs.Role( + ... name="trainer", + ... image="pytorch/pytorch:latest", + ... entrypoint="python", + ... args=["train.py"], + ... resource=specs.Resource(cpu=4, memMB=16384, gpu=1) + ... )], + ... ) + >>> + >>> @dsl.pipeline(name="ml-pipeline") + >>> def pipeline(): + ... # Create a training task + ... trainer = container_from_app( + ... app_def, + ... display_name="PyTorch Training", + ... retry_policy={'max_retry_count': 3}, + ... accelerator_type='nvidia-tesla-v100' + ... ) + ... trainer.set_env_variable("WANDB_PROJECT", "my-project") + ... + ... # Create another task that depends on trainer + ... evaluator = container_from_app( + ... app_def, + ... display_name="Model Evaluation" + ... ) + ... evaluator.after(trainer) """ - factory = component_from_app(app, ui_metadata) - return factory(*args, **kwargs) + component = component_from_app(app, ui_metadata) + # Call the component function to create a PipelineTask + task = component(*args, **kwargs) + + # Apply resource constraints and environment variables from the role + if hasattr(component, '_torchx_role'): + role = component._torchx_role + + # Set resources + if role.resource.cpu > 0: + task.set_cpu_request(str(int(role.resource.cpu))) + task.set_cpu_limit(str(int(role.resource.cpu))) + if role.resource.memMB > 0: + task.set_memory_request(f"{role.resource.memMB}M") + task.set_memory_limit(f"{role.resource.memMB}M") + if role.resource.gpu > 0: + # Use the newer set_accelerator_limit API (set_gpu_limit is deprecated) + # Check for accelerator type in metadata or use provided one + acc_type = accelerator_type + if not acc_type and app.metadata: + acc_type = app.metadata.get('accelerator_type') + if not acc_type: + acc_type = 'nvidia-tesla-k80' # Default GPU type + + task.set_accelerator_type(acc_type) + task.set_accelerator_limit(str(int(role.resource.gpu))) + + # Set environment variables + if role.env: + for name, value in role.env.items(): + task.set_env_variable(name=name, value=str(value)) + + # Apply additional configurations + if display_name: + task.set_display_name(display_name) + + if retry_policy: + retry_args = {} + if 'max_retry_count' in retry_policy: + retry_args['num_retries'] = retry_policy['max_retry_count'] + if 'backoff_duration' in retry_policy: + retry_args['backoff_duration'] = retry_policy['backoff_duration'] + if 'backoff_factor' in retry_policy: + retry_args['backoff_factor'] = retry_policy['backoff_factor'] + if 'backoff_max_duration' in retry_policy: + retry_args['backoff_max_duration'] = retry_policy['backoff_max_duration'] + if retry_args: + task.set_retry(**retry_args) + + # Set caching options + task.set_caching_options(enable_caching=enable_caching) + + return task def resource_from_app( app: api.AppDef, queue: str, service_account: Optional[str] = None, -) -> dsl.ResourceOp: + priority_class: Optional[str] = None, +) -> PipelineTask: """ - resource_from_app generates a KFP ResourceOp from the provided app that uses - the Volcano job scheduler on Kubernetes to run distributed apps. See - https://volcano.sh/en/docs/ for more info on Volcano and how to install. + resource_from_app generates a KFP v2 component that creates Kubernetes + resources using the Volcano job scheduler for distributed apps. Args: app: The torchx AppDef to adapt. - queue: the Volcano queue to schedule the operator in. + queue: the Volcano queue to schedule the job in. + service_account: optional service account to use. + priority_class: optional priority class name. + + Note: In KFP v2, direct Kubernetes resource manipulation requires + the kfp-kubernetes extension. This function provides a basic + implementation using kubectl. >>> import kfp >>> from torchx import specs @@ -256,19 +285,69 @@ def resource_from_app( ... name="trainer", ... roles=[specs.Role("trainer", image="foo:latest", num_replicas=3)], ... ) + >>> @dsl.pipeline >>> def pipeline(): ... trainer = resource_from_app(app_def, queue="test") ... print(trainer) - >>> kfp.compiler.Compiler().compile( - ... pipeline_func=pipeline, - ... package_path="/tmp/pipeline.yaml", - ... ) - {'ResourceOp': {... 'name': 'trainer-0', ... 'name': 'trainer-1', ... 'name': 'trainer-2', ...}} """ - return dsl.ResourceOp( - name=app.name, - action="create", - success_condition="status.state.phase = Completed", - failure_condition="status.state.phase = Failed", - k8s_resource=app_to_resource(app, queue, service_account=service_account), + @dsl.container_component + def volcano_job_component() -> ContainerSpec: + """Creates a Volcano job via kubectl.""" + resource = kubernetes_scheduler.app_to_resource( + app, queue, service_account, priority_class + ) + + # Serialize the resource to YAML + resource_yaml = yaml.dump(resource, default_flow_style=False) + + # Use kubectl to create the resource + return ContainerSpec( + image="bitnami/kubectl:latest", + command=["sh", "-c"], + args=[f"echo '{resource_yaml}' | kubectl apply -f -"], + ) + + volcano_job_component._component_human_name = f"{app.name}-volcano-job" + volcano_job_component._component_description = f"Creates Volcano job for {app.name}" + + return volcano_job_component() + + +# Backwards compatibility - map old function names to new ones +def component_spec_from_app(app: api.AppDef) -> Tuple[str, api.Role]: + """ + DEPRECATED: This function is maintained for backwards compatibility. + Use component_from_app instead for KFP v2. + """ + import warnings + warnings.warn( + "component_spec_from_app is deprecated. Use component_from_app for KFP v2.", + DeprecationWarning, + stacklevel=2 ) + + if len(app.roles) != 1: + raise ValueError( + f"Distributed apps are only supported via resource_from_app. " + f"{app.name} has roles: {[r.name for r in app.roles]}" + ) + + return app.name, app.roles[0] + + +def component_spec_from_role(name: str, role: api.Role) -> Dict[str, Any]: + """ + DEPRECATED: Use component_from_app for KFP v2. + """ + import warnings + warnings.warn( + "component_spec_from_role is deprecated. Use component_from_app for KFP v2.", + DeprecationWarning, + stacklevel=2 + ) + + # Return a minimal spec for backwards compatibility + return { + "name": f"{name}-{role.name}", + "description": f"DEPRECATED: {name} {role.name}", + } diff --git a/torchx/pipelines/kfp/test/adapter_test.py b/torchx/pipelines/kfp/test/adapter_test.py index df7b743a8..6c52a8fae 100644 --- a/torchx/pipelines/kfp/test/adapter_test.py +++ b/torchx/pipelines/kfp/test/adapter_test.py @@ -10,17 +10,15 @@ import os.path import tempfile import unittest -from typing import Callable, List +from typing import Callable import torchx import yaml -from kfp import compiler, components, dsl -from kubernetes.client.models import V1ContainerPort, V1ResourceRequirements +from kfp import compiler, dsl from torchx.pipelines.kfp.adapter import ( component_from_app, component_spec_from_app, container_from_app, - ContainerFactory, ) from torchx.specs import api @@ -48,108 +46,183 @@ def _test_app(self) -> api.AppDef: return api.AppDef("test", roles=[trainer_role]) - def _compile_pipeline(self, pipeline: Callable[[], None]) -> None: + def _compile_pipeline(self, pipeline: Callable[[], None]) -> dict: + """Compile pipeline and return the compiled structure.""" with tempfile.TemporaryDirectory() as tmpdir: pipeline_file = os.path.join(tmpdir, "pipeline.yaml") - compiler.Compiler().compile(pipeline, pipeline_file) + compiler.Compiler().compile(pipeline_func=pipeline, package_path=pipeline_file) with open(pipeline_file, "r") as f: data = yaml.safe_load(f) - - spec = data["spec"] - templates = spec["templates"] - self.assertGreaterEqual(len(templates), 2) + return data def test_component_spec_from_app(self) -> None: app = self._test_app() - spec, role = component_spec_from_app(app) - self.assertIsNotNone(components.load_component_from_text(spec)) + # component_spec_from_app is deprecated and returns app name and role + app_name, role = component_spec_from_app(app) + + # The function should return the app name and first role + self.assertEqual(app_name, "test") + self.assertEqual(role, app.roles[0]) self.assertEqual(role.resource, app.roles[0].resource) - self.assertEqual( - spec, - """description: KFP wrapper for TorchX component test, role trainer -implementation: - container: - command: - - main - - --output-path - - blah - env: - FOO: bar - image: pytorch/torchx:latest -name: test-trainer -outputs: [] -""", - ) + self.assertEqual(role.name, "trainer") def test_pipeline(self) -> None: app = self._test_app() - kfp_copy: ContainerFactory = component_from_app(app) + @dsl.pipeline(name="test-pipeline") def pipeline() -> None: - a = kfp_copy() - resources: V1ResourceRequirements = a.container.resources - self.assertEqual( - resources, - V1ResourceRequirements( - limits={ - "cpu": "2000m", - "memory": "3000M", - "nvidia.com/gpu": "4", - }, - requests={ - "cpu": "2000m", - "memory": "3000M", - }, - ), - ) - ports: List[V1ContainerPort] = a.container.ports - self.assertEqual( - ports, - [V1ContainerPort(name="foo", container_port=1234)], - ) - - b = kfp_copy() + # Create two instances of the component + a = container_from_app(app, display_name="Task A") + b = container_from_app(app, display_name="Task B") + # Set dependency b.after(a) - self._compile_pipeline(pipeline) + # Compile and check structure + data = self._compile_pipeline(pipeline) + + # KFP v2 compiled pipelines have this structure at root level + self.assertIn("components", data) + self.assertIn("deploymentSpec", data) + self.assertIn("root", data) + + # Check that we have components + components = data["components"] + self.assertGreater(len(components), 0) + + # Check executors + executors = data["deploymentSpec"]["executors"] + self.assertGreater(len(executors), 0) + + # Check the task structure + self.assertIn("dag", data["root"]) + self.assertIn("tasks", data["root"]["dag"]) + + # We should have 2 tasks + tasks = data["root"]["dag"]["tasks"] + self.assertEqual(len(tasks), 2) + + # Check dependency - second task should depend on first + task_names = list(tasks.keys()) + second_task = tasks[task_names[1]] + self.assertIn("dependentTasks", second_task) def test_pipeline_metadata(self) -> None: app = self._test_app() - metadata = {} - kfp_copy: ContainerFactory = component_from_app(app, metadata) + ui_metadata = { + "outputs": [ + { + "type": "tensorboard", + "source": "gs://my-bucket/logs", + } + ] + } + @dsl.pipeline(name="test-pipeline-metadata") def pipeline() -> None: - a = kfp_copy() - self.assertEqual(len(a.volumes), 1) - self.assertEqual(len(a.container.volume_mounts), 1) - self.assertEqual(len(a.sidecars), 1) - self.assertEqual( - a.output_artifact_paths["mlpipeline-ui-metadata"], - "/tmp/outputs/mlpipeline-ui-metadata/data.json", - ) - self.assertEqual( - a.pod_labels, - { - "app.kubernetes.io/instance": "test", - "app.kubernetes.io/managed-by": "torchx.pytorch.org", - "app.kubernetes.io/name": "test", - "torchx.pytorch.org/version": torchx.__version__, - "torchx.pytorch.org/app-name": "test", - "torchx.pytorch.org/role-index": "0", - "torchx.pytorch.org/role-name": "trainer", - "torchx.pytorch.org/replica-id": "0", - }, - ) - - self._compile_pipeline(pipeline) + # Create component with UI metadata + a = container_from_app(app, ui_metadata=ui_metadata, display_name="Task with Metadata") + + # Compile pipeline + data = self._compile_pipeline(pipeline) + + # Check basic structure + self.assertIn("components", data) + self.assertIn("deploymentSpec", data) + self.assertIn("root", data) + + # Check that UI metadata affects the command + executors = data["deploymentSpec"]["executors"] + # UI metadata should be present in at least one executor + found_metadata = False + for executor in executors.values(): + if "container" in executor: + command = executor["container"].get("command", []) + # Check if metadata handling is in the command + if any("metadata" in str(cmd) for cmd in command): + found_metadata = True + break + self.assertTrue(found_metadata, "UI metadata not found in executor commands") def test_container_from_app(self) -> None: app: api.AppDef = self._test_app() + @dsl.pipeline(name="test-container-pipeline") def pipeline() -> None: - a: dsl.ContainerOp = container_from_app(app) - b: dsl.ContainerOp = container_from_app(app) + # Create two tasks from the same app + a = container_from_app(app, display_name="First Task") + b = container_from_app(app, display_name="Second Task") b.after(a) - self._compile_pipeline(pipeline) + # Compile and verify + data = self._compile_pipeline(pipeline) + self.assertIn("root", data) + + # Check tasks + tasks = data["root"]["dag"]["tasks"] + self.assertEqual(len(tasks), 2) + + # Check dependency + # The second task should have a dependency on the first + task_names = list(tasks.keys()) + second_task = tasks[task_names[1]] + self.assertIn("dependentTasks", second_task) + + # Check display names + for task_name, task in tasks.items(): + self.assertIn("taskInfo", task) + self.assertIn("name", task["taskInfo"]) + + def test_resource_configuration(self) -> None: + """Test that resources are properly configured in the component.""" + app = self._test_app() + + # Create a component and check that it has the right resources + component = component_from_app(app) + + # The component function should exist + self.assertIsNotNone(component) + + # Check that the component has the expected metadata + # In KFP v2, components store metadata differently + if hasattr(component, '_torchx_role'): + role = component._torchx_role + self.assertEqual(role.resource.cpu, 2) + self.assertEqual(role.resource.memMB, 3000) + self.assertEqual(role.resource.gpu, 4) + + def test_environment_variables(self) -> None: + """Test that environment variables are properly passed.""" + app = self._test_app() + + @dsl.pipeline(name="test-env-pipeline") + def pipeline() -> None: + task = container_from_app(app) + + # Compile pipeline + data = self._compile_pipeline(pipeline) + + # Check that the pipeline was compiled successfully + self.assertIn("deploymentSpec", data) + + # Find the executor and check environment variables + executors = data["deploymentSpec"]["executors"] + found_env = False + for executor_name, executor in executors.items(): + if "container" in executor: + container = executor["container"] + if "env" in container: + # Check that FOO environment variable is set + env_vars = container["env"] + for env_var in env_vars: + if env_var.get("name") == "FOO" and env_var.get("value") == "bar": + found_env = True + break + if found_env: + break + + self.assertTrue(found_env, "Environment variable FOO=bar not found in any executor") + + +if __name__ == "__main__": + unittest.main() diff --git a/torchx/pipelines/kfp/test/adapter_v2_it_test.py b/torchx/pipelines/kfp/test/adapter_v2_it_test.py new file mode 100644 index 000000000..2f6129f3c --- /dev/null +++ b/torchx/pipelines/kfp/test/adapter_v2_it_test.py @@ -0,0 +1,507 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +""" +Integration tests for KFP v2 adapter that test component creation and pipeline compilation. + +This module tests the adapter module that converts TorchX AppDef +to KFP v2 components, focusing on component creation, task configuration, +and pipeline compilation. +""" + +import unittest +import tempfile +import shutil +import os +import json +from pathlib import Path +from kfp import dsl, compiler, local +from torchx import specs +from torchx.pipelines.kfp.adapter import component_from_app, container_from_app + + +class TestTorchXComponentCreation(unittest.TestCase): + """Test TorchX component creation and metadata.""" + + def test_simple_component_creation(self): + """Test creating a simple container component from TorchX AppDef.""" + app = specs.AppDef( + name="echo-test", + roles=[ + specs.Role( + name="worker", + entrypoint="/bin/echo", + args=["Hello from TorchX"], + image="alpine:latest", + resource=specs.Resource(cpu=1, memMB=512, gpu=0), + ) + ], + ) + + component = component_from_app(app) + + # Verify component was created correctly + self.assertIsNotNone(component) + self.assertTrue(callable(component)) + self.assertEqual(component._component_human_name, "echo-test-worker") + self.assertIn("TorchX component", component._component_description) + + # Verify the role is attached + self.assertTrue(hasattr(component, '_torchx_role')) + self.assertEqual(component._torchx_role.entrypoint, "/bin/echo") + self.assertEqual(component._torchx_role.args, ["Hello from TorchX"]) + self.assertEqual(component._torchx_role.image, "alpine:latest") + + def test_component_with_environment_variables(self): + """Test component creation with environment variables.""" + env_vars = { + "MODEL_PATH": "/models/bert", + "BATCH_SIZE": "32", + "LEARNING_RATE": "0.001", + "CUDA_VISIBLE_DEVICES": "0,1", + } + + app = specs.AppDef( + name="ml-training", + roles=[ + specs.Role( + name="trainer", + entrypoint="python", + args=["train.py", "--distributed"], + env=env_vars, + image="pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime", + resource=specs.Resource(cpu=8, memMB=32768, gpu=2), + ) + ], + ) + + component = component_from_app(app) + + # Verify environment variables are stored + self.assertEqual(component._torchx_role.env, env_vars) + self.assertEqual(component._torchx_role.resource.gpu, 2) + self.assertEqual(component._torchx_role.resource.cpu, 8) + + def test_multi_gpu_component_with_metadata(self): + """Test component with multiple GPUs and accelerator metadata.""" + app = specs.AppDef( + name="distributed-training", + metadata={"accelerator_type": "nvidia-tesla-a100"}, + roles=[ + specs.Role( + name="ddp-trainer", + entrypoint="torchrun", + args=[ + "--nproc_per_node=4", + "--master_port=29500", + "train_ddp.py", + "--epochs=100", + ], + image="pytorch/pytorch:latest", + resource=specs.Resource(cpu=16, memMB=65536, gpu=4), + ) + ], + ) + + component = component_from_app(app) + + # Verify multi-GPU configuration + self.assertEqual(component._torchx_role.resource.gpu, 4) + self.assertEqual(app.metadata["accelerator_type"], "nvidia-tesla-a100") + + def test_component_with_ui_metadata(self): + """Test component with UI metadata for visualization.""" + ui_metadata = { + "outputs": [ + { + "type": "tensorboard", + "source": "gs://my-bucket/tensorboard-logs", + }, + { + "type": "markdown", + "storage": "inline", + "source": "# Training Complete\nModel saved to gs://my-bucket/model", + } + ] + } + + app = specs.AppDef( + name="viz-test", + roles=[ + specs.Role( + name="worker", + entrypoint="python", + args=["visualize.py"], + image="python:3.9", + resource=specs.Resource(cpu=2, memMB=4096, gpu=0), + ) + ], + ) + + component = component_from_app(app, ui_metadata=ui_metadata) + + # Component should be created successfully with UI metadata + self.assertIsNotNone(component) + + +class TestPipelineCompilation(unittest.TestCase): + """Test pipeline compilation with TorchX components.""" + + def setUp(self): + """Set up test fixtures.""" + self.temp_dir = tempfile.mkdtemp() + + def tearDown(self): + """Clean up after tests.""" + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_compile_simple_pipeline(self): + """Test compiling a pipeline with TorchX components.""" + app1 = specs.AppDef( + name="preprocessor", + roles=[ + specs.Role( + name="worker", + entrypoint="python", + args=["preprocess.py", "--input=/data/raw", "--output=/data/processed"], + image="python:3.9", + resource=specs.Resource(cpu=2, memMB=4096, gpu=0), + ) + ], + ) + + app2 = specs.AppDef( + name="trainer", + roles=[ + specs.Role( + name="worker", + entrypoint="python", + args=["train.py", "--data=/data/processed"], + image="pytorch/pytorch:latest", + resource=specs.Resource(cpu=4, memMB=8192, gpu=1), + ) + ], + ) + + @dsl.pipeline(name="torchx-pipeline", description="Pipeline with TorchX components") + def torchx_pipeline(): + # Create tasks from TorchX apps + preprocess_task = container_from_app( + app1, + display_name="Data Preprocessing", + enable_caching=True + ) + + train_task = container_from_app( + app2, + display_name="Model Training", + retry_policy={"max_retry_count": 2}, + accelerator_type="nvidia-tesla-v100" + ) + + # Set task dependencies + train_task.after(preprocess_task) + + # Compile the pipeline + output_path = os.path.join(self.temp_dir, "pipeline.yaml") + compiler.Compiler().compile(pipeline_func=torchx_pipeline, package_path=output_path) + + # Verify the pipeline was compiled + self.assertTrue(os.path.exists(output_path)) + + # Read and verify pipeline structure + with open(output_path) as f: + pipeline_content = f.read() + + # Check that key components are in the pipeline + self.assertIn("torchx-component", pipeline_content) + self.assertIn("python:3.9", pipeline_content) + self.assertIn("pytorch/pytorch:latest", pipeline_content) + self.assertIn("Data Preprocessing", pipeline_content) + self.assertIn("Model Training", pipeline_content) + + def test_compile_ml_pipeline_with_parameters(self): + """Test compiling a complete ML pipeline with parameters.""" + @dsl.pipeline( + name="ml-training-pipeline", + description="Complete ML pipeline with parameters" + ) + def ml_pipeline( + learning_rate: float = 0.001, + batch_size: int = 32, + epochs: int = 50, + gpu_type: str = "nvidia-tesla-v100" + ): + # Preprocessing step + preprocess_app = specs.AppDef( + name="preprocess", + roles=[ + specs.Role( + name="preprocessor", + entrypoint="python", + args=["preprocess_data.py", "--batch-size", str(batch_size)], + image="python:3.9-slim", + resource=specs.Resource(cpu=4, memMB=16384, gpu=0), + ) + ], + ) + + preprocess_task = container_from_app( + preprocess_app, + display_name="Data Preprocessing", + enable_caching=True + ) + + # Training step + train_app = specs.AppDef( + name="train", + roles=[ + specs.Role( + name="trainer", + entrypoint="python", + args=[ + "train_model.py", + f"--learning-rate={learning_rate}", + f"--batch-size={batch_size}", + f"--epochs={epochs}", + ], + env={"CUDA_VISIBLE_DEVICES": "0,1"}, + image="pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime", + resource=specs.Resource(cpu=8, memMB=32768, gpu=2), + ) + ], + ) + + train_task = container_from_app( + train_app, + display_name=f"Model Training (LR={learning_rate})", + retry_policy={ + "max_retry_count": 3, + "backoff_duration": "300s", + "backoff_factor": 2.0 + }, + accelerator_type=gpu_type + ) + train_task.after(preprocess_task) + + # Evaluation step + eval_app = specs.AppDef( + name="evaluate", + roles=[ + specs.Role( + name="evaluator", + entrypoint="python", + args=["evaluate_model.py"], + image="pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime", + resource=specs.Resource(cpu=4, memMB=16384, gpu=1), + ) + ], + ) + + eval_task = container_from_app( + eval_app, + display_name="Model Evaluation", + enable_caching=False + ) + eval_task.after(train_task) + + # Compile the pipeline + output_path = os.path.join(self.temp_dir, "ml_pipeline.yaml") + compiler.Compiler().compile( + pipeline_func=ml_pipeline, + package_path=output_path + ) + + # Verify pipeline was compiled + self.assertTrue(os.path.exists(output_path)) + + # Read and verify content + with open(output_path) as f: + content = f.read() + + # Verify all components and parameters are present + self.assertIn("Data Preprocessing", content) + self.assertIn("Model Training", content) + self.assertIn("Model Evaluation", content) + self.assertIn("learning_rate", content) + self.assertIn("batch_size", content) + self.assertIn("epochs", content) + self.assertIn("gpu_type", content) + + # Verify resource configurations + self.assertIn("resourceCpuLimit", content) + self.assertIn("resourceMemoryLimit", content) + self.assertIn("accelerator", content) + + def test_compile_nested_pipeline(self): + """Test compiling a pipeline with nested components.""" + simple_app = specs.AppDef( + name="worker", + roles=[ + specs.Role( + name="task", + entrypoint="echo", + args=["Processing"], + image="alpine:latest", + resource=specs.Resource(cpu=1, memMB=512, gpu=0), + ) + ], + ) + + @dsl.pipeline(name="inner-pipeline") + def inner_pipeline(message: str): + task1 = container_from_app(simple_app, display_name=f"Task 1: {message}") + task2 = container_from_app(simple_app, display_name=f"Task 2: {message}") + task2.after(task1) + + @dsl.pipeline(name="outer-pipeline") + def outer_pipeline(): + # Preprocessing + preprocessing = container_from_app(simple_app, display_name="Preprocessing") + + # Inner pipeline + inner = inner_pipeline(message="Inner Processing") + inner.after(preprocessing) + + # Postprocessing + postprocessing = container_from_app(simple_app, display_name="Postprocessing") + postprocessing.after(inner) + + # Compile the pipeline + output_path = os.path.join(self.temp_dir, "nested_pipeline.yaml") + compiler.Compiler().compile( + pipeline_func=outer_pipeline, + package_path=output_path + ) + + # Verify compilation + self.assertTrue(os.path.exists(output_path)) + + # Verify structure + with open(output_path) as f: + content = f.read() + + self.assertIn("Preprocessing", content) + self.assertIn("Inner Processing", content) + self.assertIn("Postprocessing", content) + + +class TestLocalExecution(unittest.TestCase): + """Test local execution of lightweight Python components. + + Note: Container components require DockerRunner which may not be available + in all test environments, so we focus on testing with lightweight Python + components to verify the execution flow. + """ + + def setUp(self): + """Set up test environment.""" + self.temp_dir = tempfile.mkdtemp() + # Initialize local execution with SubprocessRunner + local.init(runner=local.SubprocessRunner(), pipeline_root=self.temp_dir) + + def tearDown(self): + """Clean up test environment.""" + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_simple_component_execution(self): + """Test executing a simple Python component.""" + @dsl.component(base_image="python:3.9-slim") + def add_numbers(a: float, b: float) -> float: + """Add two numbers.""" + return a + b + + # Execute component + task = add_numbers(a=5.0, b=3.0) + + # Verify result + self.assertEqual(task.output, 8.0) + + def test_component_with_artifact_output(self): + """Test component that produces output artifacts.""" + @dsl.component(base_image="python:3.9-slim") + def generate_report(data: dict, report_name: str) -> str: + """Generate a report from data.""" + import json + + report = { + "name": report_name, + "data": data, + "summary": f"Report contains {len(data)} items" + } + + return json.dumps(report) + + # Execute component + test_data = {"metric1": 0.95, "metric2": 0.87} + task = generate_report(data=test_data, report_name="Test Report") + + # Verify output + result = json.loads(task.output) + self.assertEqual(result["name"], "Test Report") + self.assertEqual(result["data"], test_data) + self.assertIn("2 items", result["summary"]) + + def test_pipeline_execution(self): + """Test executing a pipeline with multiple components.""" + @dsl.component(base_image="python:3.9-slim") + def preprocess(value: float) -> float: + """Preprocess input value.""" + return value * 2.0 + + @dsl.component(base_image="python:3.9-slim") + def transform(value: float, factor: float = 1.5) -> float: + """Transform value by factor.""" + return value * factor + + @dsl.pipeline(name="test-pipeline") + def data_pipeline(input_value: float = 10.0) -> float: + prep_task = preprocess(value=input_value) + trans_task = transform(value=prep_task.output, factor=3.0) + return trans_task.output + + # Execute pipeline + pipeline_task = data_pipeline(input_value=5.0) + + # Verify result: 5.0 * 2.0 * 3.0 = 30.0 + self.assertEqual(pipeline_task.output, 30.0) + + def test_conditional_execution(self): + """Test conditional execution in a pipeline.""" + @dsl.component(base_image="python:3.9-slim") + def check_threshold(value: float, threshold: float = 0.5) -> str: + """Check if value exceeds threshold.""" + return "high" if value > threshold else "low" + + @dsl.component(base_image="python:3.9-slim") + def process_high(value: float) -> float: + """Process high values.""" + return value * 2.0 + + @dsl.component(base_image="python:3.9-slim") + def process_low(value: float) -> float: + """Process low values.""" + return value * 0.5 + + # Test with high value + check_task = check_threshold(value=0.8) + self.assertEqual(check_task.output, "high") + + # Test with low value + check_task = check_threshold(value=0.3) + self.assertEqual(check_task.output, "low") + + # Test processing based on condition + high_task = process_high(value=10.0) + self.assertEqual(high_task.output, 20.0) + + low_task = process_low(value=10.0) + self.assertEqual(low_task.output, 5.0) + + +if __name__ == "__main__": + unittest.main() diff --git a/torchx/pipelines/kfp/test/adapter_v2_spec_test.py b/torchx/pipelines/kfp/test/adapter_v2_spec_test.py new file mode 100644 index 000000000..a8e55f907 --- /dev/null +++ b/torchx/pipelines/kfp/test/adapter_v2_spec_test.py @@ -0,0 +1,515 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +""" +Unit tests for KFP v2 adapter that test component creation and task configuration. + +This module tests the adapter module that converts TorchX AppDef +to KFP v2 components, focusing on proper resource allocation, +environment configuration, and pipeline task settings. +""" + +import unittest +from unittest import mock +from torchx import specs +from torchx.pipelines.kfp.adapter import component_from_app, container_from_app + + +class TestComponentCreation(unittest.TestCase): + """Test basic component creation from TorchX AppDef.""" + + def test_simple_component_creation(self): + """Test creating a basic component.""" + app = specs.AppDef( + name="test-app", + roles=[ + specs.Role( + name="worker", + entrypoint="/bin/echo", + args=["hello", "world"], + image="alpine:latest", + resource=specs.Resource(cpu=1, memMB=512, gpu=0), + ) + ], + ) + + component = component_from_app(app) + + # Verify component attributes + self.assertEqual(component._component_human_name, "test-app-worker") + self.assertIn("TorchX component", component._component_description) + self.assertTrue(hasattr(component, '_torchx_role')) + self.assertEqual(component._torchx_role.entrypoint, "/bin/echo") + self.assertEqual(component._torchx_role.args, ["hello", "world"]) + self.assertEqual(component._torchx_role.image, "alpine:latest") + + def test_component_with_environment_variables(self): + """Test component creation with environment variables.""" + env_vars = { + "MODEL_PATH": "/models/bert", + "BATCH_SIZE": "32", + "CUDA_VISIBLE_DEVICES": "0,1", + } + + app = specs.AppDef( + name="ml-app", + roles=[ + specs.Role( + name="trainer", + entrypoint="python", + args=["train.py"], + env=env_vars, + image="pytorch/pytorch:latest", + resource=specs.Resource(cpu=4, memMB=8192, gpu=2), + ) + ], + ) + + component = component_from_app(app) + + # Verify environment variables are preserved + self.assertEqual(component._torchx_role.env, env_vars) + self.assertEqual(component._torchx_role.resource.gpu, 2) + + def test_component_with_ui_metadata(self): + """Test component creation with UI metadata.""" + ui_metadata = { + "outputs": [ + { + "type": "tensorboard", + "source": "gs://my-bucket/logs", + } + ] + } + + app = specs.AppDef( + name="viz-app", + roles=[ + specs.Role( + name="worker", + entrypoint="python", + args=["visualize.py"], + image="python:3.9", + resource=specs.Resource(cpu=1, memMB=2048, gpu=0), + ) + ], + ) + + # Component should be created successfully with UI metadata + component = component_from_app(app, ui_metadata=ui_metadata) + self.assertIsNotNone(component) + self.assertEqual(component._component_human_name, "viz-app-worker") + + +class TestContainerTaskConfiguration(unittest.TestCase): + """Test container task configuration from AppDef.""" + + def setUp(self): + """Set up test fixtures.""" + self.base_app = specs.AppDef( + name="test-app", + roles=[ + specs.Role( + name="worker", + entrypoint="python", + args=["script.py"], + image="python:3.9", + resource=specs.Resource(cpu=2, memMB=4096, gpu=0), + ) + ], + ) + + def test_basic_container_task(self): + """Test basic container task creation.""" + with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + mock_task = mock.MagicMock() + mock_component = mock.MagicMock(return_value=mock_task) + mock_component._torchx_role = self.base_app.roles[0] + mock_component_fn.return_value = mock_component + + task = container_from_app(self.base_app) + + # Verify component was called + self.assertEqual(task, mock_task) + mock_component.assert_called_once() + + # Verify resource settings + mock_task.set_cpu_request.assert_called_once_with("2") + mock_task.set_cpu_limit.assert_called_once_with("2") + mock_task.set_memory_request.assert_called_once_with("4096M") + mock_task.set_memory_limit.assert_called_once_with("4096M") + + def test_container_task_with_display_name(self): + """Test container task with custom display name.""" + with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + mock_task = mock.MagicMock() + mock_component = mock.MagicMock(return_value=mock_task) + mock_component._torchx_role = self.base_app.roles[0] + mock_component_fn.return_value = mock_component + + display_name = "My Custom Task" + task = container_from_app(self.base_app, display_name=display_name) + + mock_task.set_display_name.assert_called_once_with(display_name) + + def test_container_task_with_caching(self): + """Test container task with caching configuration.""" + with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + mock_task = mock.MagicMock() + mock_component = mock.MagicMock(return_value=mock_task) + mock_component._torchx_role = self.base_app.roles[0] + mock_component_fn.return_value = mock_component + + # Test enabling caching + task = container_from_app(self.base_app, enable_caching=True) + mock_task.set_caching_options.assert_called_once_with(enable_caching=True) + + # Reset mock + mock_task.reset_mock() + + # Test disabling caching + task = container_from_app(self.base_app, enable_caching=False) + mock_task.set_caching_options.assert_called_once_with(enable_caching=False) + + +class TestResourceConfiguration(unittest.TestCase): + """Test resource configuration for container tasks.""" + + def test_memory_size_conversions(self): + """Test memory size conversion from MB to KFP format.""" + test_cases = [ + (512, "512M"), # 512 MB + (1024, "1024M"), # 1 GB + (2048, "2048M"), # 2 GB + (4096, "4096M"), # 4 GB + (8192, "8192M"), # 8 GB + (16384, "16384M"), # 16 GB + (1536, "1536M"), # 1.5 GB (non-standard) + (0, None), # Zero memory + ] + + for memMB, expected_str in test_cases: + with self.subTest(memMB=memMB): + app = specs.AppDef( + name="memory-test", + roles=[ + specs.Role( + name="worker", + entrypoint="python", + args=["script.py"], + image="python:3.9", + resource=specs.Resource(cpu=1, memMB=memMB, gpu=0), + ) + ], + ) + + with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + mock_task = mock.MagicMock() + mock_component = mock.MagicMock(return_value=mock_task) + mock_component._torchx_role = app.roles[0] + mock_component_fn.return_value = mock_component + + task = container_from_app(app) + + if expected_str: + mock_task.set_memory_request.assert_called_once_with(expected_str) + mock_task.set_memory_limit.assert_called_once_with(expected_str) + else: + mock_task.set_memory_request.assert_not_called() + mock_task.set_memory_limit.assert_not_called() + + def test_gpu_configuration(self): + """Test GPU resource configuration.""" + gpu_configs = [ + (0, None, None), # No GPU + (1, "nvidia-tesla-v100", "nvidia-tesla-v100"), # Single GPU with type + (2, None, "nvidia-tesla-k80"), # Multiple GPUs without type (uses default) + (4, "nvidia-tesla-a100", "nvidia-tesla-a100"), # Multi-GPU with type + ] + + for gpu_count, accelerator_type, expected_type in gpu_configs: + with self.subTest(gpu_count=gpu_count, accelerator_type=accelerator_type): + app = specs.AppDef( + name="gpu-test", + roles=[ + specs.Role( + name="worker", + entrypoint="python", + args=["train.py"], + image="pytorch/pytorch:latest", + resource=specs.Resource(cpu=4, memMB=8192, gpu=gpu_count), + ) + ], + ) + + with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + mock_task = mock.MagicMock() + mock_component = mock.MagicMock(return_value=mock_task) + mock_component._torchx_role = app.roles[0] + mock_component_fn.return_value = mock_component + + task = container_from_app(app, accelerator_type=accelerator_type) + + if gpu_count > 0: + mock_task.set_accelerator_limit.assert_called_once_with(str(gpu_count)) + if expected_type: + mock_task.set_accelerator_type.assert_called_once_with(expected_type) + else: + mock_task.set_accelerator_limit.assert_not_called() + mock_task.set_accelerator_type.assert_not_called() + + def test_fractional_cpu_handling(self): + """Test handling of fractional CPU values.""" + app = specs.AppDef( + name="cpu-test", + roles=[ + specs.Role( + name="worker", + entrypoint="python", + args=["script.py"], + image="python:3.9", + resource=specs.Resource(cpu=1.5, memMB=1024, gpu=0), + ) + ], + ) + + with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + mock_task = mock.MagicMock() + mock_component = mock.MagicMock(return_value=mock_task) + mock_component._torchx_role = app.roles[0] + mock_component_fn.return_value = mock_component + + task = container_from_app(app) + + # CPU should be truncated to integer (1.5 -> 1) + mock_task.set_cpu_request.assert_called_once_with("1") + mock_task.set_cpu_limit.assert_called_once_with("1") + + +class TestRetryAndErrorHandling(unittest.TestCase): + """Test retry policies and error handling configurations.""" + + def test_retry_policy_configurations(self): + """Test various retry policy configurations.""" + app = specs.AppDef( + name="retry-test", + roles=[ + specs.Role( + name="worker", + entrypoint="python", + args=["script.py"], + image="python:3.9", + resource=specs.Resource(cpu=1, memMB=1024, gpu=0), + ) + ], + ) + + retry_configs = [ + ({"max_retry_count": 5}, {"num_retries": 5}), + ( + {"max_retry_count": 3, "backoff_duration": "30s"}, + {"num_retries": 3, "backoff_duration": "30s"} + ), + ( + { + "max_retry_count": 2, + "backoff_factor": 2.0, + "backoff_max_duration": "300s" + }, + { + "num_retries": 2, + "backoff_factor": 2.0, + "backoff_max_duration": "300s" + } + ), + ] + + for retry_policy, expected_args in retry_configs: + with self.subTest(retry_policy=retry_policy): + with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + mock_task = mock.MagicMock() + mock_component = mock.MagicMock(return_value=mock_task) + mock_component._torchx_role = app.roles[0] + mock_component_fn.return_value = mock_component + + task = container_from_app(app, retry_policy=retry_policy) + + mock_task.set_retry.assert_called_once_with(**expected_args) + + def test_timeout_configuration(self): + """Test timeout configuration for tasks.""" + # Skip this test - timeout is not currently implemented in container_from_app + self.skipTest("Timeout configuration not yet implemented in adapter") + + +class TestEnvironmentVariables(unittest.TestCase): + """Test environment variable handling.""" + + def test_environment_variable_setting(self): + """Test that environment variables are properly set on tasks.""" + env_vars = { + "VAR1": "value1", + "VAR2": "123", + "VAR3": "true", + "PATH_VAR": "/usr/local/bin:/usr/bin", + "EMPTY_VAR": "", + } + + app = specs.AppDef( + name="env-app", + roles=[ + specs.Role( + name="worker", + entrypoint="python", + args=["app.py"], + env=env_vars, + image="python:3.9", + resource=specs.Resource(cpu=1, memMB=1024, gpu=0), + ) + ], + ) + + with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + mock_task = mock.MagicMock() + mock_component = mock.MagicMock(return_value=mock_task) + mock_component._torchx_role = app.roles[0] + mock_component_fn.return_value = mock_component + + task = container_from_app(app) + + # Verify all environment variables were set + expected_calls = [ + mock.call(name=name, value=str(value)) + for name, value in env_vars.items() + ] + mock_task.set_env_variable.assert_has_calls(expected_calls, any_order=True) + self.assertEqual(mock_task.set_env_variable.call_count, len(env_vars)) + + def test_special_environment_variables(self): + """Test handling of special environment variables.""" + special_env_vars = { + "CUDA_VISIBLE_DEVICES": "0,1,2,3", + "NCCL_DEBUG": "INFO", + "PYTHONPATH": "/app:/lib", + "LD_LIBRARY_PATH": "/usr/local/cuda/lib64", + } + + app = specs.AppDef( + name="special-env-app", + roles=[ + specs.Role( + name="worker", + entrypoint="python", + args=["distributed_train.py"], + env=special_env_vars, + image="pytorch/pytorch:latest", + resource=specs.Resource(cpu=8, memMB=32768, gpu=4), + ) + ], + ) + + with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + mock_task = mock.MagicMock() + mock_component = mock.MagicMock(return_value=mock_task) + mock_component._torchx_role = app.roles[0] + mock_component_fn.return_value = mock_component + + task = container_from_app(app) + + # Verify special environment variables are set correctly + for name, value in special_env_vars.items(): + mock_task.set_env_variable.assert_any_call(name=name, value=value) + + +class TestEdgeCases(unittest.TestCase): + """Test edge cases and error conditions.""" + + def test_minimal_resource_spec(self): + """Test handling of minimal resource specifications.""" + app = specs.AppDef( + name="minimal-app", + roles=[ + specs.Role( + name="worker", + entrypoint="echo", + args=["test"], + image="alpine:latest", + resource=specs.Resource(cpu=0, memMB=0, gpu=0), + ) + ], + ) + + with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + mock_task = mock.MagicMock() + mock_component = mock.MagicMock(return_value=mock_task) + mock_component._torchx_role = app.roles[0] + mock_component_fn.return_value = mock_component + + task = container_from_app(app) + + # Verify no resource methods were called for zero resources + mock_task.set_cpu_request.assert_not_called() + mock_task.set_cpu_limit.assert_not_called() + mock_task.set_memory_request.assert_not_called() + mock_task.set_memory_limit.assert_not_called() + mock_task.set_accelerator_type.assert_not_called() + mock_task.set_accelerator_limit.assert_not_called() + + def test_very_large_resources(self): + """Test handling of very large resource requests.""" + app = specs.AppDef( + name="large-app", + roles=[ + specs.Role( + name="worker", + entrypoint="python", + args=["bigdata.py"], + image="python:3.9", + resource=specs.Resource(cpu=128, memMB=524288, gpu=8), # 512 GB RAM + ) + ], + ) + + with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + mock_task = mock.MagicMock() + mock_component = mock.MagicMock(return_value=mock_task) + mock_component._torchx_role = app.roles[0] + mock_component_fn.return_value = mock_component + + task = container_from_app(app) + + # Verify large resources are set correctly + mock_task.set_cpu_request.assert_called_once_with("128") + mock_task.set_cpu_limit.assert_called_once_with("128") + mock_task.set_memory_request.assert_called_once_with("524288M") + mock_task.set_memory_limit.assert_called_once_with("524288M") + mock_task.set_accelerator_limit.assert_called_once_with("8") + + def test_empty_args_and_entrypoint(self): + """Test component with no args.""" + app = specs.AppDef( + name="no-args-app", + roles=[ + specs.Role( + name="worker", + entrypoint="/app/start.sh", + args=[], # Empty args + image="custom:latest", + resource=specs.Resource(cpu=1, memMB=1024, gpu=0), + ) + ], + ) + + component = component_from_app(app) + + # Verify component is created successfully + self.assertEqual(component._torchx_role.entrypoint, "/app/start.sh") + self.assertEqual(component._torchx_role.args, []) + + +if __name__ == "__main__": + unittest.main() diff --git a/torchx/pipelines/kfp/test/version_test.py b/torchx/pipelines/kfp/test/version_test.py index f932f5b7b..e33ef535e 100644 --- a/torchx/pipelines/kfp/test/version_test.py +++ b/torchx/pipelines/kfp/test/version_test.py @@ -10,6 +10,7 @@ import importlib import unittest from unittest.mock import patch +import warnings class VersionTest(unittest.TestCase): @@ -21,9 +22,18 @@ def test_can_get_version(self) -> None: def test_kfp_1x(self) -> None: import torchx.pipelines.kfp + # KFP 2.x should not trigger any warnings with patch("kfp.__version__", "2.0.1"): - with self.assertRaisesRegex(ImportError, "Only kfp version"): + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") importlib.reload(torchx.pipelines.kfp) + self.assertEqual(len(w), 0) + # KFP 1.x should trigger a DeprecationWarning with patch("kfp.__version__", "1.5.0"): - importlib.reload(torchx.pipelines.kfp) + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + importlib.reload(torchx.pipelines.kfp) + self.assertEqual(len(w), 1) + self.assertTrue(issubclass(w[-1].category, DeprecationWarning)) + self.assertIn("KFP version 1.x.x is deprecated", str(w[-1].message)) From eb054d7f371a1c15b38ce7a19bbdaa42cd7fa94b Mon Sep 17 00:00:00 2001 From: ChethanUK Date: Sun, 25 May 2025 18:45:36 +0000 Subject: [PATCH 2/2] lint fixes --- setup.py | 4 +- .../pipelines/kfp/advanced_pipeline.py | 6 +- .../examples/pipelines/kfp/dist_pipeline.py | 7 +- .../kfp/dist_pipeline_v2_enhanced.py | 81 ++++--- .../examples/pipelines/kfp/intro_pipeline.py | 7 +- .../pipelines/kfp/task_configs_pipeline.py | 68 +++--- .../pipelines/kfp/test/kfp_pipeline_test.py | 4 +- torchx/pipelines/kfp/__init__.py | 6 +- torchx/pipelines/kfp/adapter.py | 119 +++++----- torchx/pipelines/kfp/test/adapter_test.py | 57 +++-- .../pipelines/kfp/test/adapter_v2_it_test.py | 208 +++++++++--------- .../kfp/test/adapter_v2_spec_test.py | 207 +++++++++-------- torchx/pipelines/kfp/test/version_test.py | 2 +- 13 files changed, 423 insertions(+), 353 deletions(-) diff --git a/setup.py b/setup.py index 501e60f32..b6db3d8a3 100644 --- a/setup.py +++ b/setup.py @@ -88,7 +88,9 @@ def get_nightly_version(): "google-cloud-runtimeconfig>=0.33.2", ], # KFP 2.0+ is not supported yet, see https://github.com/pytorch/torchx/issues/123 - "kfp": ["kfp>=2.8.0"], # optional: required for Kubeflow Pipelines integration + "kfp": [ + "kfp>=2.8.0" + ], # optional: required for Kubeflow Pipelines integration "kubernetes": ["kubernetes>=11"], "ray": ["ray>=1.12.1"], "dev": dev_reqs, diff --git a/torchx/examples/pipelines/kfp/advanced_pipeline.py b/torchx/examples/pipelines/kfp/advanced_pipeline.py index cef35acb1..4a71e9a04 100755 --- a/torchx/examples/pipelines/kfp/advanced_pipeline.py +++ b/torchx/examples/pipelines/kfp/advanced_pipeline.py @@ -28,9 +28,9 @@ import sys from typing import Dict -from kfp import dsl -from kfp import compiler import torchx + +from kfp import compiler, dsl from torchx import specs from torchx.components.dist import ddp as dist_ddp from torchx.components.serve import torchserve @@ -245,7 +245,7 @@ @dsl.pipeline( name="TorchX Advanced Pipeline", - description="Advanced KFP pipeline with TorchX components" + description="Advanced KFP pipeline with TorchX components", ) def pipeline() -> None: # container_from_app creates a KFP v2 task from the TorchX app diff --git a/torchx/examples/pipelines/kfp/dist_pipeline.py b/torchx/examples/pipelines/kfp/dist_pipeline.py index 6a48a87b7..9f6909ac8 100755 --- a/torchx/examples/pipelines/kfp/dist_pipeline.py +++ b/torchx/examples/pipelines/kfp/dist_pipeline.py @@ -16,15 +16,14 @@ in Kubernetes KFP clusters with https://volcano.sh/en/docs/ installed on them. """ -from kfp import dsl -from kfp import compiler +from kfp import compiler, dsl from torchx import specs from torchx.pipelines.kfp.adapter import resource_from_app @dsl.pipeline( name="distributed-pipeline", - description="A distributed pipeline using Volcano job scheduler" + description="A distributed pipeline using Volcano job scheduler", ) def pipeline() -> None: # First we define our AppDef for the component @@ -45,7 +44,7 @@ def pipeline() -> None: # a Volcano job, we use the resource_from_app adapter. # This generates a task that uses kubectl to create the Volcano job. echo_task = resource_from_app(echo_app, queue="default") - + # Set display name for better visualization echo_task.set_display_name("Distributed Echo Job") diff --git a/torchx/examples/pipelines/kfp/dist_pipeline_v2_enhanced.py b/torchx/examples/pipelines/kfp/dist_pipeline_v2_enhanced.py index c8d6b9c17..38d89d71e 100644 --- a/torchx/examples/pipelines/kfp/dist_pipeline_v2_enhanced.py +++ b/torchx/examples/pipelines/kfp/dist_pipeline_v2_enhanced.py @@ -16,10 +16,10 @@ """ import argparse -from kfp import dsl, compiler -from kfp import kubernetes # Using kfp-kubernetes extension -from torchx.pipelines.kfp.adapter import container_from_app, resource_from_app + +from kfp import compiler, dsl, kubernetes # Using kfp-kubernetes extension from torchx import specs +from torchx.pipelines.kfp.adapter import container_from_app, resource_from_app def main(args: argparse.Namespace) -> None: @@ -35,7 +35,7 @@ def main(args: argparse.Namespace) -> None: "echo 'Starting distributed training...'; " "echo 'Node rank: $RANK'; " "echo 'World size: $WORLD_SIZE'; " - "python -m torch.distributed.run --nproc_per_node=2 train.py" + "python -m torch.distributed.run --nproc_per_node=2 train.py", ], env={ "MASTER_ADDR": "distributed-trainer-0", @@ -51,7 +51,7 @@ def main(args: argparse.Namespace) -> None: ) ], ) - + # Create data preprocessing app preprocess_app = specs.AppDef( name="data-preprocessor", @@ -59,7 +59,14 @@ def main(args: argparse.Namespace) -> None: specs.Role( name="preprocessor", entrypoint="python", - args=["-m", "preprocess", "--input", "/data/raw", "--output", "/data/processed"], + args=[ + "-m", + "preprocess", + "--input", + "/data/raw", + "--output", + "/data/processed", + ], env={"DATA_FORMAT": "tfrecord"}, num_replicas=1, image="pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime", @@ -71,7 +78,7 @@ def main(args: argparse.Namespace) -> None: ) ], ) - + # Create model evaluation app eval_app = specs.AppDef( name="model-evaluator", @@ -79,7 +86,14 @@ def main(args: argparse.Namespace) -> None: specs.Role( name="evaluator", entrypoint="python", - args=["-m", "evaluate", "--model", "/models/latest", "--data", "/data/test"], + args=[ + "-m", + "evaluate", + "--model", + "/models/latest", + "--data", + "/data/test", + ], env={"METRICS_OUTPUT": "/metrics/eval.json"}, num_replicas=1, image="pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime", @@ -94,36 +108,36 @@ def main(args: argparse.Namespace) -> None: @dsl.pipeline( name="enhanced-distributed-pipeline", - description="Enhanced distributed ML pipeline with KFP v2 features" + description="Enhanced distributed ML pipeline with KFP v2 features", ) def pipeline(): # Create persistent volume for data sharing pvc = kubernetes.CreatePVC( - pvc_name_suffix='-shared-data', - access_modes=['ReadWriteMany'], - size='50Gi', - storage_class_name='standard', + pvc_name_suffix="-shared-data", + access_modes=["ReadWriteMany"], + size="50Gi", + storage_class_name="standard", ) - + # Data preprocessing step preprocess_task = container_from_app( preprocess_app, display_name="Data Preprocessing", retry_policy={ - 'max_retry_count': 3, - 'backoff_duration': '60s', - 'backoff_factor': 2, + "max_retry_count": 3, + "backoff_duration": "60s", + "backoff_factor": 2, }, enable_caching=True, ) - + # Mount volume for preprocessing kubernetes.mount_pvc( preprocess_task, - pvc_name=pvc.outputs['name'], - mount_path='/data', + pvc_name=pvc.outputs["name"], + mount_path="/data", ) - + # Distributed training using Volcano train_task = resource_from_app( ddp_app, @@ -133,7 +147,7 @@ def pipeline(): ) train_task.set_display_name("Distributed Training") train_task.after(preprocess_task) - + # Model evaluation eval_task = container_from_app( eval_app, @@ -141,36 +155,33 @@ def pipeline(): enable_caching=False, # Don't cache evaluation results ) eval_task.after(train_task) - + # Mount volume for evaluation kubernetes.mount_pvc( eval_task, - pvc_name=pvc.outputs['name'], - mount_path='/data', + pvc_name=pvc.outputs["name"], + mount_path="/data", ) - + # Clean up PVC after evaluation - delete_pvc = kubernetes.DeletePVC( - pvc_name=pvc.outputs['name'] - ).after(eval_task) + delete_pvc = kubernetes.DeletePVC(pvc_name=pvc.outputs["name"]).after(eval_task) delete_pvc.set_display_name("Cleanup Shared Storage") # Compile the pipeline - compiler.Compiler().compile( - pipeline_func=pipeline, - package_path=args.output_path - ) + compiler.Compiler().compile(pipeline_func=pipeline, package_path=args.output_path) print(f"Pipeline compiled to: {args.output_path}") if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Enhanced distributed pipeline example") + parser = argparse.ArgumentParser( + description="Enhanced distributed pipeline example" + ) parser.add_argument( "--output_path", type=str, default="enhanced_distributed_pipeline.yaml", help="Path to save the compiled pipeline", ) - + args = parser.parse_args() main(args) diff --git a/torchx/examples/pipelines/kfp/intro_pipeline.py b/torchx/examples/pipelines/kfp/intro_pipeline.py index b61c4a551..c15b0f2ee 100755 --- a/torchx/examples/pipelines/kfp/intro_pipeline.py +++ b/torchx/examples/pipelines/kfp/intro_pipeline.py @@ -28,15 +28,14 @@ AppDef inline. """ -from kfp import dsl -from kfp import compiler +from kfp import compiler, dsl from torchx import specs from torchx.pipelines.kfp.adapter import container_from_app @dsl.pipeline( name="intro-pipeline", - description="An introductory pipeline using TorchX components" + description="An introductory pipeline using TorchX components", ) def pipeline() -> None: # First we define our AppDef for the component. AppDef is a core part of TorchX @@ -58,7 +57,7 @@ def pipeline() -> None: # the container_from_app adapter. This generates a KFP v2 component # definition from the TorchX app def and instantiates it into a container task. echo_container = container_from_app(echo_app) - + # In KFP v2, you can set display name for better visualization echo_container.set_display_name("Echo Hello TorchX") diff --git a/torchx/examples/pipelines/kfp/task_configs_pipeline.py b/torchx/examples/pipelines/kfp/task_configs_pipeline.py index 20ed514a4..0e6ee37b4 100644 --- a/torchx/examples/pipelines/kfp/task_configs_pipeline.py +++ b/torchx/examples/pipelines/kfp/task_configs_pipeline.py @@ -17,14 +17,15 @@ """ import argparse -from kfp import dsl, compiler + +from kfp import compiler, dsl from torchx import specs from torchx.pipelines.kfp.adapter import container_from_app def main(args: argparse.Namespace) -> None: # Create various apps to demonstrate different configurations - + # Basic CPU task cpu_app = specs.AppDef( name="cpu-task", @@ -38,7 +39,7 @@ def main(args: argparse.Namespace) -> None: ) ], ) - + # GPU task gpu_app = specs.AppDef( name="gpu-task", @@ -49,14 +50,14 @@ def main(args: argparse.Namespace) -> None: args=[ "-c", "import torch; print(f'GPU available: {torch.cuda.is_available()}'); " - "print(f'GPU count: {torch.cuda.device_count()}')" + "print(f'GPU count: {torch.cuda.device_count()}')", ], image="pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime", resource=specs.Resource(cpu=4, memMB=8192, gpu=1), ) ], ) - + # Task with environment variables env_app = specs.AppDef( name="env-task", @@ -69,7 +70,7 @@ def main(args: argparse.Namespace) -> None: "import os; " "print(f'MODEL_NAME={os.getenv(\"MODEL_NAME\")}'); " "print(f'BATCH_SIZE={os.getenv(\"BATCH_SIZE\")}'); " - "print(f'LEARNING_RATE={os.getenv(\"LEARNING_RATE\")}');" + "print(f'LEARNING_RATE={os.getenv(\"LEARNING_RATE\")}');", ], env={ "MODEL_NAME": "resnet50", @@ -81,7 +82,7 @@ def main(args: argparse.Namespace) -> None: ) ], ) - + # Task that might fail (for retry demonstration) flaky_app = specs.AppDef( name="flaky-task", @@ -93,8 +94,8 @@ def main(args: argparse.Namespace) -> None: "-c", "import random; import sys; " "success = random.random() > 0.7; " # 70% failure rate - "print(f'Attempt result: {\"SUCCESS\" if success else \"FAILURE\"}'); " - "sys.exit(0 if success else 1);" + 'print(f\'Attempt result: {"SUCCESS" if success else "FAILURE"}\'); ' + "sys.exit(0 if success else 1);", ], image="python:3.9-slim", resource=specs.Resource(cpu=1, memMB=512, gpu=0), @@ -104,7 +105,7 @@ def main(args: argparse.Namespace) -> None: @dsl.pipeline( name="task-configuration-demo", - description="Demonstrates all KFP v2 task configuration options" + description="Demonstrates all KFP v2 task configuration options", ) def pipeline(): # Basic CPU task with display name @@ -113,7 +114,7 @@ def pipeline(): display_name="CPU Processing Task", enable_caching=True, ) - + # GPU task with custom accelerator configuration gpu_task = container_from_app( gpu_app, @@ -123,52 +124,51 @@ def pipeline(): # Note: GPU settings are automatically applied from the resource spec # But you can override the accelerator type if needed: # gpu_task.set_accelerator_type('nvidia-tesla-v100') - + # Task with environment variables env_task = container_from_app( env_app, display_name="Environment Variables Demo", ) # Add additional runtime environment variables - env_task.set_env_variable('RUNTIME_VAR', 'runtime_value') - env_task.set_env_variable('EXPERIMENT_ID', 'exp-001') - + env_task.set_env_variable("RUNTIME_VAR", "runtime_value") + env_task.set_env_variable("EXPERIMENT_ID", "exp-001") + # Flaky task with retry policy flaky_task = container_from_app( flaky_app, display_name="Flaky Task with Retries", retry_policy={ - 'max_retry_count': 5, - 'backoff_duration': '30s', - 'backoff_factor': 2, - 'backoff_max_duration': '300s', + "max_retry_count": 5, + "backoff_duration": "30s", + "backoff_factor": 2, + "backoff_max_duration": "300s", }, enable_caching=False, # Don't cache flaky tasks ) - + # Set task dependencies gpu_task.after(cpu_task) env_task.after(cpu_task) flaky_task.after(gpu_task, env_task) - + # Additional task configurations - + # Set resource requests/limits explicitly (override defaults) - cpu_task.set_cpu_request('1') - cpu_task.set_memory_request('1Gi') - + cpu_task.set_cpu_request("1") + cpu_task.set_memory_request("1Gi") + # Chain multiple configurations - (gpu_task - .set_env_variable('CUDA_VISIBLE_DEVICES', '0') - .set_env_variable('TORCH_CUDA_ARCH_LIST', '7.0;7.5;8.0')) + ( + gpu_task.set_env_variable("CUDA_VISIBLE_DEVICES", "0").set_env_variable( + "TORCH_CUDA_ARCH_LIST", "7.0;7.5;8.0" + ) + ) # Compile the pipeline - compiler.Compiler().compile( - pipeline_func=pipeline, - package_path=args.output_path - ) + compiler.Compiler().compile(pipeline_func=pipeline, package_path=args.output_path) print(f"Pipeline compiled to: {args.output_path}") - + # Print some helpful information print("\nTask Configuration Features Demonstrated:") print("1. Display names for better UI visualization") @@ -190,6 +190,6 @@ def pipeline(): default="task_configs_pipeline.yaml", help="Path to save the compiled pipeline", ) - + args = parser.parse_args() main(args) diff --git a/torchx/examples/pipelines/kfp/test/kfp_pipeline_test.py b/torchx/examples/pipelines/kfp/test/kfp_pipeline_test.py index 76695ef98..b1e7aec38 100644 --- a/torchx/examples/pipelines/kfp/test/kfp_pipeline_test.py +++ b/torchx/examples/pipelines/kfp/test/kfp_pipeline_test.py @@ -53,6 +53,8 @@ def test_task_configs_pipeline(self) -> None: def test_dist_pipeline_v2_enhanced(self) -> None: sys.argv = ["dist_pipeline_v2_enhanced.py"] - from torchx.examples.pipelines.kfp import dist_pipeline_v2_enhanced # noqa: F401 + from torchx.examples.pipelines.kfp import ( # noqa: F401 + dist_pipeline_v2_enhanced, + ) self.assertTrue(os.path.exists("enhanced_distributed_pipeline.yaml")) diff --git a/torchx/pipelines/kfp/__init__.py b/torchx/pipelines/kfp/__init__.py index 30311c266..296ee588b 100644 --- a/torchx/pipelines/kfp/__init__.py +++ b/torchx/pipelines/kfp/__init__.py @@ -15,16 +15,18 @@ components. """ +import warnings + import kfp from .version import __version__ as __version__ # noqa F401 -import warnings + def _check_kfp_version() -> None: if kfp.__version__.startswith("1."): warnings.warn( f"KFP version 1.x.x is deprecated! Please upgrade to kfp version 2.x.x. Current version: {kfp.__version__}", - DeprecationWarning + DeprecationWarning, ) diff --git a/torchx/pipelines/kfp/adapter.py b/torchx/pipelines/kfp/adapter.py index 8e5d3f1f4..45f98d9d8 100644 --- a/torchx/pipelines/kfp/adapter.py +++ b/torchx/pipelines/kfp/adapter.py @@ -15,11 +15,11 @@ import json from typing import Any, Dict, Optional, Tuple +import torchx.specs as api + import yaml from kfp import dsl from kfp.dsl import ContainerSpec, OutputPath, PipelineTask - -import torchx.specs as api from torchx.schedulers import kubernetes_scheduler @@ -38,7 +38,7 @@ def component_from_app( ) -> Any: """ component_from_app creates a KFP v2 component from a TorchX AppDef. - + In KFP v2, we use container components for single-container apps. For multi-role apps, we use the first role as the primary container. @@ -57,11 +57,11 @@ def component_from_app( >>> from torchx import specs >>> from torchx.pipelines.kfp.adapter import component_from_app >>> from kfp import dsl - >>> + >>> >>> app_def = specs.AppDef( ... name="trainer", ... roles=[specs.Role( - ... name="trainer", + ... name="trainer", ... image="pytorch/pytorch:latest", ... entrypoint="python", ... args=["-m", "train", "--epochs", "10"], @@ -70,7 +70,7 @@ def component_from_app( ... )], ... ) >>> trainer_component = component_from_app(app_def) - >>> + >>> >>> @dsl.pipeline(name="training-pipeline") >>> def my_pipeline(): ... trainer_task = container_from_app(app_def) @@ -81,24 +81,26 @@ def component_from_app( f"KFP adapter does not support apps with more than one role. " f"AppDef has roles: {[r.name for r in app.roles]}" ) - + role = app.roles[0] - + @dsl.container_component - def torchx_component(mlpipeline_ui_metadata: OutputPath(str) = None) -> ContainerSpec: + def torchx_component( + mlpipeline_ui_metadata: OutputPath(str) = None, + ) -> ContainerSpec: """KFP v2 wrapper for TorchX component.""" # Basic container spec container_spec_dict = { "image": role.image, } - + # Build command and args command = [] if role.entrypoint: command.append(role.entrypoint) if role.args: command.extend(role.args) - + # Set command or args if role.entrypoint and role.args: # If both entrypoint and args exist, use command for full command line @@ -109,28 +111,34 @@ def torchx_component(mlpipeline_ui_metadata: OutputPath(str) = None) -> Containe elif role.args: # If only args exist, use them as args container_spec_dict["args"] = list(role.args) - + # Handle UI metadata if provided if ui_metadata and mlpipeline_ui_metadata: metadata_json = json.dumps(ui_metadata) - metadata_cmd = f'echo \'{metadata_json}\' > {mlpipeline_ui_metadata}' - + metadata_cmd = f"echo '{metadata_json}' > {mlpipeline_ui_metadata}" + # If there's an existing command, wrap it with metadata writing if "command" in container_spec_dict: - existing_cmd = ' '.join(container_spec_dict["command"]) - container_spec_dict["command"] = ["sh", "-c", f"{metadata_cmd} && {existing_cmd}"] + existing_cmd = " ".join(container_spec_dict["command"]) + container_spec_dict["command"] = [ + "sh", + "-c", + f"{metadata_cmd} && {existing_cmd}", + ] else: container_spec_dict["command"] = ["sh", "-c", metadata_cmd] - + return ContainerSpec(**container_spec_dict) - + # Set component metadata torchx_component._component_human_name = f"{app.name}-{role.name}" - torchx_component._component_description = f"KFP v2 wrapper for TorchX component {app.name}, role {role.name}" - + torchx_component._component_description = ( + f"KFP v2 wrapper for TorchX component {app.name}, role {role.name}" + ) + # Store role resource info as a component attribute so container_from_app can use it torchx_component._torchx_role = role - + return torchx_component @@ -173,29 +181,29 @@ def container_from_app( >>> from kfp import dsl >>> from torchx import specs >>> from torchx.pipelines.kfp.adapter import container_from_app - >>> + >>> >>> app_def = specs.AppDef( ... name="trainer", ... roles=[specs.Role( - ... name="trainer", + ... name="trainer", ... image="pytorch/pytorch:latest", ... entrypoint="python", ... args=["train.py"], ... resource=specs.Resource(cpu=4, memMB=16384, gpu=1) ... )], ... ) - >>> + >>> >>> @dsl.pipeline(name="ml-pipeline") >>> def pipeline(): ... # Create a training task ... trainer = container_from_app( - ... app_def, + ... app_def, ... display_name="PyTorch Training", ... retry_policy={'max_retry_count': 3}, ... accelerator_type='nvidia-tesla-v100' ... ) ... trainer.set_env_variable("WANDB_PROJECT", "my-project") - ... + ... ... # Create another task that depends on trainer ... evaluator = container_from_app( ... app_def, @@ -206,11 +214,11 @@ def container_from_app( component = component_from_app(app, ui_metadata) # Call the component function to create a PipelineTask task = component(*args, **kwargs) - + # Apply resource constraints and environment variables from the role - if hasattr(component, '_torchx_role'): + if hasattr(component, "_torchx_role"): role = component._torchx_role - + # Set resources if role.resource.cpu > 0: task.set_cpu_request(str(int(role.resource.cpu))) @@ -223,38 +231,38 @@ def container_from_app( # Check for accelerator type in metadata or use provided one acc_type = accelerator_type if not acc_type and app.metadata: - acc_type = app.metadata.get('accelerator_type') + acc_type = app.metadata.get("accelerator_type") if not acc_type: - acc_type = 'nvidia-tesla-k80' # Default GPU type - + acc_type = "nvidia-tesla-k80" # Default GPU type + task.set_accelerator_type(acc_type) task.set_accelerator_limit(str(int(role.resource.gpu))) - + # Set environment variables if role.env: for name, value in role.env.items(): task.set_env_variable(name=name, value=str(value)) - + # Apply additional configurations if display_name: task.set_display_name(display_name) - + if retry_policy: retry_args = {} - if 'max_retry_count' in retry_policy: - retry_args['num_retries'] = retry_policy['max_retry_count'] - if 'backoff_duration' in retry_policy: - retry_args['backoff_duration'] = retry_policy['backoff_duration'] - if 'backoff_factor' in retry_policy: - retry_args['backoff_factor'] = retry_policy['backoff_factor'] - if 'backoff_max_duration' in retry_policy: - retry_args['backoff_max_duration'] = retry_policy['backoff_max_duration'] + if "max_retry_count" in retry_policy: + retry_args["num_retries"] = retry_policy["max_retry_count"] + if "backoff_duration" in retry_policy: + retry_args["backoff_duration"] = retry_policy["backoff_duration"] + if "backoff_factor" in retry_policy: + retry_args["backoff_factor"] = retry_policy["backoff_factor"] + if "backoff_max_duration" in retry_policy: + retry_args["backoff_max_duration"] = retry_policy["backoff_max_duration"] if retry_args: task.set_retry(**retry_args) - + # Set caching options task.set_caching_options(enable_caching=enable_caching) - + return task @@ -290,26 +298,27 @@ def resource_from_app( ... trainer = resource_from_app(app_def, queue="test") ... print(trainer) """ + @dsl.container_component def volcano_job_component() -> ContainerSpec: """Creates a Volcano job via kubectl.""" resource = kubernetes_scheduler.app_to_resource( app, queue, service_account, priority_class ) - + # Serialize the resource to YAML resource_yaml = yaml.dump(resource, default_flow_style=False) - + # Use kubectl to create the resource return ContainerSpec( image="bitnami/kubectl:latest", command=["sh", "-c"], args=[f"echo '{resource_yaml}' | kubectl apply -f -"], ) - + volcano_job_component._component_human_name = f"{app.name}-volcano-job" volcano_job_component._component_description = f"Creates Volcano job for {app.name}" - + return volcano_job_component() @@ -320,18 +329,19 @@ def component_spec_from_app(app: api.AppDef) -> Tuple[str, api.Role]: Use component_from_app instead for KFP v2. """ import warnings + warnings.warn( "component_spec_from_app is deprecated. Use component_from_app for KFP v2.", DeprecationWarning, - stacklevel=2 + stacklevel=2, ) - + if len(app.roles) != 1: raise ValueError( f"Distributed apps are only supported via resource_from_app. " f"{app.name} has roles: {[r.name for r in app.roles]}" ) - + return app.name, app.roles[0] @@ -340,12 +350,13 @@ def component_spec_from_role(name: str, role: api.Role) -> Dict[str, Any]: DEPRECATED: Use component_from_app for KFP v2. """ import warnings + warnings.warn( "component_spec_from_role is deprecated. Use component_from_app for KFP v2.", DeprecationWarning, - stacklevel=2 + stacklevel=2, ) - + # Return a minimal spec for backwards compatibility return { "name": f"{name}-{role.name}", diff --git a/torchx/pipelines/kfp/test/adapter_test.py b/torchx/pipelines/kfp/test/adapter_test.py index 6c52a8fae..9765b26d8 100644 --- a/torchx/pipelines/kfp/test/adapter_test.py +++ b/torchx/pipelines/kfp/test/adapter_test.py @@ -50,7 +50,9 @@ def _compile_pipeline(self, pipeline: Callable[[], None]) -> dict: """Compile pipeline and return the compiled structure.""" with tempfile.TemporaryDirectory() as tmpdir: pipeline_file = os.path.join(tmpdir, "pipeline.yaml") - compiler.Compiler().compile(pipeline_func=pipeline, package_path=pipeline_file) + compiler.Compiler().compile( + pipeline_func=pipeline, package_path=pipeline_file + ) with open(pipeline_file, "r") as f: data = yaml.safe_load(f) return data @@ -60,7 +62,7 @@ def test_component_spec_from_app(self) -> None: # component_spec_from_app is deprecated and returns app name and role app_name, role = component_spec_from_app(app) - + # The function should return the app name and first role self.assertEqual(app_name, "test") self.assertEqual(role, app.roles[0]) @@ -80,28 +82,28 @@ def pipeline() -> None: # Compile and check structure data = self._compile_pipeline(pipeline) - + # KFP v2 compiled pipelines have this structure at root level self.assertIn("components", data) self.assertIn("deploymentSpec", data) self.assertIn("root", data) - + # Check that we have components components = data["components"] self.assertGreater(len(components), 0) - + # Check executors executors = data["deploymentSpec"]["executors"] self.assertGreater(len(executors), 0) - + # Check the task structure self.assertIn("dag", data["root"]) self.assertIn("tasks", data["root"]["dag"]) - + # We should have 2 tasks tasks = data["root"]["dag"]["tasks"] self.assertEqual(len(tasks), 2) - + # Check dependency - second task should depend on first task_names = list(tasks.keys()) second_task = tasks[task_names[1]] @@ -121,16 +123,18 @@ def test_pipeline_metadata(self) -> None: @dsl.pipeline(name="test-pipeline-metadata") def pipeline() -> None: # Create component with UI metadata - a = container_from_app(app, ui_metadata=ui_metadata, display_name="Task with Metadata") + a = container_from_app( + app, ui_metadata=ui_metadata, display_name="Task with Metadata" + ) # Compile pipeline data = self._compile_pipeline(pipeline) - + # Check basic structure self.assertIn("components", data) self.assertIn("deploymentSpec", data) self.assertIn("root", data) - + # Check that UI metadata affects the command executors = data["deploymentSpec"]["executors"] # UI metadata should be present in at least one executor @@ -157,17 +161,17 @@ def pipeline() -> None: # Compile and verify data = self._compile_pipeline(pipeline) self.assertIn("root", data) - + # Check tasks tasks = data["root"]["dag"]["tasks"] self.assertEqual(len(tasks), 2) - + # Check dependency # The second task should have a dependency on the first task_names = list(tasks.keys()) second_task = tasks[task_names[1]] self.assertIn("dependentTasks", second_task) - + # Check display names for task_name, task in tasks.items(): self.assertIn("taskInfo", task) @@ -176,16 +180,16 @@ def pipeline() -> None: def test_resource_configuration(self) -> None: """Test that resources are properly configured in the component.""" app = self._test_app() - + # Create a component and check that it has the right resources component = component_from_app(app) - + # The component function should exist self.assertIsNotNone(component) - + # Check that the component has the expected metadata # In KFP v2, components store metadata differently - if hasattr(component, '_torchx_role'): + if hasattr(component, "_torchx_role"): role = component._torchx_role self.assertEqual(role.resource.cpu, 2) self.assertEqual(role.resource.memMB, 3000) @@ -194,17 +198,17 @@ def test_resource_configuration(self) -> None: def test_environment_variables(self) -> None: """Test that environment variables are properly passed.""" app = self._test_app() - + @dsl.pipeline(name="test-env-pipeline") def pipeline() -> None: task = container_from_app(app) # Compile pipeline data = self._compile_pipeline(pipeline) - + # Check that the pipeline was compiled successfully self.assertIn("deploymentSpec", data) - + # Find the executor and check environment variables executors = data["deploymentSpec"]["executors"] found_env = False @@ -215,13 +219,18 @@ def pipeline() -> None: # Check that FOO environment variable is set env_vars = container["env"] for env_var in env_vars: - if env_var.get("name") == "FOO" and env_var.get("value") == "bar": + if ( + env_var.get("name") == "FOO" + and env_var.get("value") == "bar" + ): found_env = True break if found_env: break - - self.assertTrue(found_env, "Environment variable FOO=bar not found in any executor") + + self.assertTrue( + found_env, "Environment variable FOO=bar not found in any executor" + ) if __name__ == "__main__": diff --git a/torchx/pipelines/kfp/test/adapter_v2_it_test.py b/torchx/pipelines/kfp/test/adapter_v2_it_test.py index 2f6129f3c..eb253d059 100644 --- a/torchx/pipelines/kfp/test/adapter_v2_it_test.py +++ b/torchx/pipelines/kfp/test/adapter_v2_it_test.py @@ -13,20 +13,21 @@ and pipeline compilation. """ -import unittest -import tempfile -import shutil -import os import json +import os +import shutil +import tempfile +import unittest from pathlib import Path -from kfp import dsl, compiler, local + +from kfp import compiler, dsl, local from torchx import specs from torchx.pipelines.kfp.adapter import component_from_app, container_from_app class TestTorchXComponentCreation(unittest.TestCase): """Test TorchX component creation and metadata.""" - + def test_simple_component_creation(self): """Test creating a simple container component from TorchX AppDef.""" app = specs.AppDef( @@ -41,21 +42,21 @@ def test_simple_component_creation(self): ) ], ) - + component = component_from_app(app) - + # Verify component was created correctly self.assertIsNotNone(component) self.assertTrue(callable(component)) self.assertEqual(component._component_human_name, "echo-test-worker") self.assertIn("TorchX component", component._component_description) - + # Verify the role is attached - self.assertTrue(hasattr(component, '_torchx_role')) + self.assertTrue(hasattr(component, "_torchx_role")) self.assertEqual(component._torchx_role.entrypoint, "/bin/echo") self.assertEqual(component._torchx_role.args, ["Hello from TorchX"]) self.assertEqual(component._torchx_role.image, "alpine:latest") - + def test_component_with_environment_variables(self): """Test component creation with environment variables.""" env_vars = { @@ -64,7 +65,7 @@ def test_component_with_environment_variables(self): "LEARNING_RATE": "0.001", "CUDA_VISIBLE_DEVICES": "0,1", } - + app = specs.AppDef( name="ml-training", roles=[ @@ -78,14 +79,14 @@ def test_component_with_environment_variables(self): ) ], ) - + component = component_from_app(app) - + # Verify environment variables are stored self.assertEqual(component._torchx_role.env, env_vars) self.assertEqual(component._torchx_role.resource.gpu, 2) self.assertEqual(component._torchx_role.resource.cpu, 8) - + def test_multi_gpu_component_with_metadata(self): """Test component with multiple GPUs and accelerator metadata.""" app = specs.AppDef( @@ -106,13 +107,13 @@ def test_multi_gpu_component_with_metadata(self): ) ], ) - + component = component_from_app(app) - + # Verify multi-GPU configuration self.assertEqual(component._torchx_role.resource.gpu, 4) self.assertEqual(app.metadata["accelerator_type"], "nvidia-tesla-a100") - + def test_component_with_ui_metadata(self): """Test component with UI metadata for visualization.""" ui_metadata = { @@ -125,10 +126,10 @@ def test_component_with_ui_metadata(self): "type": "markdown", "storage": "inline", "source": "# Training Complete\nModel saved to gs://my-bucket/model", - } + }, ] } - + app = specs.AppDef( name="viz-test", roles=[ @@ -141,24 +142,24 @@ def test_component_with_ui_metadata(self): ) ], ) - + component = component_from_app(app, ui_metadata=ui_metadata) - + # Component should be created successfully with UI metadata self.assertIsNotNone(component) class TestPipelineCompilation(unittest.TestCase): """Test pipeline compilation with TorchX components.""" - + def setUp(self): """Set up test fixtures.""" self.temp_dir = tempfile.mkdtemp() - + def tearDown(self): """Clean up after tests.""" shutil.rmtree(self.temp_dir, ignore_errors=True) - + def test_compile_simple_pipeline(self): """Test compiling a pipeline with TorchX components.""" app1 = specs.AppDef( @@ -167,13 +168,17 @@ def test_compile_simple_pipeline(self): specs.Role( name="worker", entrypoint="python", - args=["preprocess.py", "--input=/data/raw", "--output=/data/processed"], + args=[ + "preprocess.py", + "--input=/data/raw", + "--output=/data/processed", + ], image="python:3.9", resource=specs.Resource(cpu=2, memMB=4096, gpu=0), ) ], ) - + app2 = specs.AppDef( name="trainer", roles=[ @@ -186,55 +191,58 @@ def test_compile_simple_pipeline(self): ) ], ) - - @dsl.pipeline(name="torchx-pipeline", description="Pipeline with TorchX components") + + @dsl.pipeline( + name="torchx-pipeline", description="Pipeline with TorchX components" + ) def torchx_pipeline(): # Create tasks from TorchX apps preprocess_task = container_from_app( - app1, - display_name="Data Preprocessing", - enable_caching=True + app1, display_name="Data Preprocessing", enable_caching=True ) - + train_task = container_from_app( app2, display_name="Model Training", retry_policy={"max_retry_count": 2}, - accelerator_type="nvidia-tesla-v100" + accelerator_type="nvidia-tesla-v100", ) - + # Set task dependencies train_task.after(preprocess_task) - + # Compile the pipeline output_path = os.path.join(self.temp_dir, "pipeline.yaml") - compiler.Compiler().compile(pipeline_func=torchx_pipeline, package_path=output_path) - + compiler.Compiler().compile( + pipeline_func=torchx_pipeline, package_path=output_path + ) + # Verify the pipeline was compiled self.assertTrue(os.path.exists(output_path)) - + # Read and verify pipeline structure with open(output_path) as f: pipeline_content = f.read() - + # Check that key components are in the pipeline self.assertIn("torchx-component", pipeline_content) self.assertIn("python:3.9", pipeline_content) self.assertIn("pytorch/pytorch:latest", pipeline_content) self.assertIn("Data Preprocessing", pipeline_content) self.assertIn("Model Training", pipeline_content) - + def test_compile_ml_pipeline_with_parameters(self): """Test compiling a complete ML pipeline with parameters.""" + @dsl.pipeline( name="ml-training-pipeline", - description="Complete ML pipeline with parameters" + description="Complete ML pipeline with parameters", ) def ml_pipeline( learning_rate: float = 0.001, batch_size: int = 32, epochs: int = 50, - gpu_type: str = "nvidia-tesla-v100" + gpu_type: str = "nvidia-tesla-v100", ): # Preprocessing step preprocess_app = specs.AppDef( @@ -249,13 +257,11 @@ def ml_pipeline( ) ], ) - + preprocess_task = container_from_app( - preprocess_app, - display_name="Data Preprocessing", - enable_caching=True + preprocess_app, display_name="Data Preprocessing", enable_caching=True ) - + # Training step train_app = specs.AppDef( name="train", @@ -275,19 +281,19 @@ def ml_pipeline( ) ], ) - + train_task = container_from_app( train_app, display_name=f"Model Training (LR={learning_rate})", retry_policy={ "max_retry_count": 3, "backoff_duration": "300s", - "backoff_factor": 2.0 + "backoff_factor": 2.0, }, - accelerator_type=gpu_type + accelerator_type=gpu_type, ) train_task.after(preprocess_task) - + # Evaluation step eval_app = specs.AppDef( name="evaluate", @@ -301,28 +307,23 @@ def ml_pipeline( ) ], ) - + eval_task = container_from_app( - eval_app, - display_name="Model Evaluation", - enable_caching=False + eval_app, display_name="Model Evaluation", enable_caching=False ) eval_task.after(train_task) - + # Compile the pipeline output_path = os.path.join(self.temp_dir, "ml_pipeline.yaml") - compiler.Compiler().compile( - pipeline_func=ml_pipeline, - package_path=output_path - ) - + compiler.Compiler().compile(pipeline_func=ml_pipeline, package_path=output_path) + # Verify pipeline was compiled self.assertTrue(os.path.exists(output_path)) - + # Read and verify content with open(output_path) as f: content = f.read() - + # Verify all components and parameters are present self.assertIn("Data Preprocessing", content) self.assertIn("Model Training", content) @@ -331,12 +332,12 @@ def ml_pipeline( self.assertIn("batch_size", content) self.assertIn("epochs", content) self.assertIn("gpu_type", content) - + # Verify resource configurations self.assertIn("resourceCpuLimit", content) self.assertIn("resourceMemoryLimit", content) self.assertIn("accelerator", content) - + def test_compile_nested_pipeline(self): """Test compiling a pipeline with nested components.""" simple_app = specs.AppDef( @@ -351,40 +352,41 @@ def test_compile_nested_pipeline(self): ) ], ) - + @dsl.pipeline(name="inner-pipeline") def inner_pipeline(message: str): task1 = container_from_app(simple_app, display_name=f"Task 1: {message}") task2 = container_from_app(simple_app, display_name=f"Task 2: {message}") task2.after(task1) - + @dsl.pipeline(name="outer-pipeline") def outer_pipeline(): # Preprocessing preprocessing = container_from_app(simple_app, display_name="Preprocessing") - + # Inner pipeline inner = inner_pipeline(message="Inner Processing") inner.after(preprocessing) - + # Postprocessing - postprocessing = container_from_app(simple_app, display_name="Postprocessing") + postprocessing = container_from_app( + simple_app, display_name="Postprocessing" + ) postprocessing.after(inner) - + # Compile the pipeline output_path = os.path.join(self.temp_dir, "nested_pipeline.yaml") compiler.Compiler().compile( - pipeline_func=outer_pipeline, - package_path=output_path + pipeline_func=outer_pipeline, package_path=output_path ) - + # Verify compilation self.assertTrue(os.path.exists(output_path)) - + # Verify structure with open(output_path) as f: content = f.read() - + self.assertIn("Preprocessing", content) self.assertIn("Inner Processing", content) self.assertIn("Postprocessing", content) @@ -392,113 +394,117 @@ def outer_pipeline(): class TestLocalExecution(unittest.TestCase): """Test local execution of lightweight Python components. - + Note: Container components require DockerRunner which may not be available in all test environments, so we focus on testing with lightweight Python components to verify the execution flow. """ - + def setUp(self): """Set up test environment.""" self.temp_dir = tempfile.mkdtemp() # Initialize local execution with SubprocessRunner local.init(runner=local.SubprocessRunner(), pipeline_root=self.temp_dir) - + def tearDown(self): """Clean up test environment.""" shutil.rmtree(self.temp_dir, ignore_errors=True) - + def test_simple_component_execution(self): """Test executing a simple Python component.""" + @dsl.component(base_image="python:3.9-slim") def add_numbers(a: float, b: float) -> float: """Add two numbers.""" return a + b - + # Execute component task = add_numbers(a=5.0, b=3.0) - + # Verify result self.assertEqual(task.output, 8.0) - + def test_component_with_artifact_output(self): """Test component that produces output artifacts.""" + @dsl.component(base_image="python:3.9-slim") def generate_report(data: dict, report_name: str) -> str: """Generate a report from data.""" import json - + report = { "name": report_name, "data": data, - "summary": f"Report contains {len(data)} items" + "summary": f"Report contains {len(data)} items", } - + return json.dumps(report) - + # Execute component test_data = {"metric1": 0.95, "metric2": 0.87} task = generate_report(data=test_data, report_name="Test Report") - + # Verify output result = json.loads(task.output) self.assertEqual(result["name"], "Test Report") self.assertEqual(result["data"], test_data) self.assertIn("2 items", result["summary"]) - + def test_pipeline_execution(self): """Test executing a pipeline with multiple components.""" + @dsl.component(base_image="python:3.9-slim") def preprocess(value: float) -> float: """Preprocess input value.""" return value * 2.0 - + @dsl.component(base_image="python:3.9-slim") def transform(value: float, factor: float = 1.5) -> float: """Transform value by factor.""" return value * factor - + @dsl.pipeline(name="test-pipeline") def data_pipeline(input_value: float = 10.0) -> float: prep_task = preprocess(value=input_value) trans_task = transform(value=prep_task.output, factor=3.0) return trans_task.output - + # Execute pipeline pipeline_task = data_pipeline(input_value=5.0) - + # Verify result: 5.0 * 2.0 * 3.0 = 30.0 self.assertEqual(pipeline_task.output, 30.0) - + def test_conditional_execution(self): """Test conditional execution in a pipeline.""" + @dsl.component(base_image="python:3.9-slim") def check_threshold(value: float, threshold: float = 0.5) -> str: """Check if value exceeds threshold.""" return "high" if value > threshold else "low" - + @dsl.component(base_image="python:3.9-slim") def process_high(value: float) -> float: """Process high values.""" return value * 2.0 - + @dsl.component(base_image="python:3.9-slim") def process_low(value: float) -> float: """Process low values.""" return value * 0.5 - + # Test with high value check_task = check_threshold(value=0.8) self.assertEqual(check_task.output, "high") - + # Test with low value check_task = check_threshold(value=0.3) self.assertEqual(check_task.output, "low") - + # Test processing based on condition high_task = process_high(value=10.0) self.assertEqual(high_task.output, 20.0) - + low_task = process_low(value=10.0) self.assertEqual(low_task.output, 5.0) diff --git a/torchx/pipelines/kfp/test/adapter_v2_spec_test.py b/torchx/pipelines/kfp/test/adapter_v2_spec_test.py index a8e55f907..1e0fb6ab7 100644 --- a/torchx/pipelines/kfp/test/adapter_v2_spec_test.py +++ b/torchx/pipelines/kfp/test/adapter_v2_spec_test.py @@ -15,13 +15,14 @@ import unittest from unittest import mock + from torchx import specs from torchx.pipelines.kfp.adapter import component_from_app, container_from_app class TestComponentCreation(unittest.TestCase): """Test basic component creation from TorchX AppDef.""" - + def test_simple_component_creation(self): """Test creating a basic component.""" app = specs.AppDef( @@ -36,17 +37,17 @@ def test_simple_component_creation(self): ) ], ) - + component = component_from_app(app) - + # Verify component attributes self.assertEqual(component._component_human_name, "test-app-worker") self.assertIn("TorchX component", component._component_description) - self.assertTrue(hasattr(component, '_torchx_role')) + self.assertTrue(hasattr(component, "_torchx_role")) self.assertEqual(component._torchx_role.entrypoint, "/bin/echo") self.assertEqual(component._torchx_role.args, ["hello", "world"]) self.assertEqual(component._torchx_role.image, "alpine:latest") - + def test_component_with_environment_variables(self): """Test component creation with environment variables.""" env_vars = { @@ -54,7 +55,7 @@ def test_component_with_environment_variables(self): "BATCH_SIZE": "32", "CUDA_VISIBLE_DEVICES": "0,1", } - + app = specs.AppDef( name="ml-app", roles=[ @@ -68,13 +69,13 @@ def test_component_with_environment_variables(self): ) ], ) - + component = component_from_app(app) - + # Verify environment variables are preserved self.assertEqual(component._torchx_role.env, env_vars) self.assertEqual(component._torchx_role.resource.gpu, 2) - + def test_component_with_ui_metadata(self): """Test component creation with UI metadata.""" ui_metadata = { @@ -85,7 +86,7 @@ def test_component_with_ui_metadata(self): } ] } - + app = specs.AppDef( name="viz-app", roles=[ @@ -98,7 +99,7 @@ def test_component_with_ui_metadata(self): ) ], ) - + # Component should be created successfully with UI metadata component = component_from_app(app, ui_metadata=ui_metadata) self.assertIsNotNone(component) @@ -107,7 +108,7 @@ def test_component_with_ui_metadata(self): class TestContainerTaskConfiguration(unittest.TestCase): """Test container task configuration from AppDef.""" - + def setUp(self): """Set up test fixtures.""" self.base_app = specs.AppDef( @@ -122,55 +123,61 @@ def setUp(self): ) ], ) - + def test_basic_container_task(self): """Test basic container task creation.""" - with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + with mock.patch( + "torchx.pipelines.kfp.adapter.component_from_app" + ) as mock_component_fn: mock_task = mock.MagicMock() mock_component = mock.MagicMock(return_value=mock_task) mock_component._torchx_role = self.base_app.roles[0] mock_component_fn.return_value = mock_component - + task = container_from_app(self.base_app) - + # Verify component was called self.assertEqual(task, mock_task) mock_component.assert_called_once() - + # Verify resource settings mock_task.set_cpu_request.assert_called_once_with("2") mock_task.set_cpu_limit.assert_called_once_with("2") mock_task.set_memory_request.assert_called_once_with("4096M") mock_task.set_memory_limit.assert_called_once_with("4096M") - + def test_container_task_with_display_name(self): """Test container task with custom display name.""" - with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + with mock.patch( + "torchx.pipelines.kfp.adapter.component_from_app" + ) as mock_component_fn: mock_task = mock.MagicMock() mock_component = mock.MagicMock(return_value=mock_task) mock_component._torchx_role = self.base_app.roles[0] mock_component_fn.return_value = mock_component - + display_name = "My Custom Task" task = container_from_app(self.base_app, display_name=display_name) - + mock_task.set_display_name.assert_called_once_with(display_name) - + def test_container_task_with_caching(self): """Test container task with caching configuration.""" - with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + with mock.patch( + "torchx.pipelines.kfp.adapter.component_from_app" + ) as mock_component_fn: mock_task = mock.MagicMock() mock_component = mock.MagicMock(return_value=mock_task) mock_component._torchx_role = self.base_app.roles[0] mock_component_fn.return_value = mock_component - + # Test enabling caching task = container_from_app(self.base_app, enable_caching=True) mock_task.set_caching_options.assert_called_once_with(enable_caching=True) - + # Reset mock mock_task.reset_mock() - + # Test disabling caching task = container_from_app(self.base_app, enable_caching=False) mock_task.set_caching_options.assert_called_once_with(enable_caching=False) @@ -178,20 +185,20 @@ def test_container_task_with_caching(self): class TestResourceConfiguration(unittest.TestCase): """Test resource configuration for container tasks.""" - + def test_memory_size_conversions(self): """Test memory size conversion from MB to KFP format.""" test_cases = [ - (512, "512M"), # 512 MB - (1024, "1024M"), # 1 GB - (2048, "2048M"), # 2 GB - (4096, "4096M"), # 4 GB - (8192, "8192M"), # 8 GB + (512, "512M"), # 512 MB + (1024, "1024M"), # 1 GB + (2048, "2048M"), # 2 GB + (4096, "4096M"), # 4 GB + (8192, "8192M"), # 8 GB (16384, "16384M"), # 16 GB - (1536, "1536M"), # 1.5 GB (non-standard) - (0, None), # Zero memory + (1536, "1536M"), # 1.5 GB (non-standard) + (0, None), # Zero memory ] - + for memMB, expected_str in test_cases: with self.subTest(memMB=memMB): app = specs.AppDef( @@ -206,22 +213,26 @@ def test_memory_size_conversions(self): ) ], ) - - with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + + with mock.patch( + "torchx.pipelines.kfp.adapter.component_from_app" + ) as mock_component_fn: mock_task = mock.MagicMock() mock_component = mock.MagicMock(return_value=mock_task) mock_component._torchx_role = app.roles[0] mock_component_fn.return_value = mock_component - + task = container_from_app(app) - + if expected_str: - mock_task.set_memory_request.assert_called_once_with(expected_str) + mock_task.set_memory_request.assert_called_once_with( + expected_str + ) mock_task.set_memory_limit.assert_called_once_with(expected_str) else: mock_task.set_memory_request.assert_not_called() mock_task.set_memory_limit.assert_not_called() - + def test_gpu_configuration(self): """Test GPU resource configuration.""" gpu_configs = [ @@ -230,7 +241,7 @@ def test_gpu_configuration(self): (2, None, "nvidia-tesla-k80"), # Multiple GPUs without type (uses default) (4, "nvidia-tesla-a100", "nvidia-tesla-a100"), # Multi-GPU with type ] - + for gpu_count, accelerator_type, expected_type in gpu_configs: with self.subTest(gpu_count=gpu_count, accelerator_type=accelerator_type): app = specs.AppDef( @@ -245,23 +256,29 @@ def test_gpu_configuration(self): ) ], ) - - with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + + with mock.patch( + "torchx.pipelines.kfp.adapter.component_from_app" + ) as mock_component_fn: mock_task = mock.MagicMock() mock_component = mock.MagicMock(return_value=mock_task) mock_component._torchx_role = app.roles[0] mock_component_fn.return_value = mock_component - + task = container_from_app(app, accelerator_type=accelerator_type) - + if gpu_count > 0: - mock_task.set_accelerator_limit.assert_called_once_with(str(gpu_count)) + mock_task.set_accelerator_limit.assert_called_once_with( + str(gpu_count) + ) if expected_type: - mock_task.set_accelerator_type.assert_called_once_with(expected_type) + mock_task.set_accelerator_type.assert_called_once_with( + expected_type + ) else: mock_task.set_accelerator_limit.assert_not_called() mock_task.set_accelerator_type.assert_not_called() - + def test_fractional_cpu_handling(self): """Test handling of fractional CPU values.""" app = specs.AppDef( @@ -276,15 +293,17 @@ def test_fractional_cpu_handling(self): ) ], ) - - with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + + with mock.patch( + "torchx.pipelines.kfp.adapter.component_from_app" + ) as mock_component_fn: mock_task = mock.MagicMock() mock_component = mock.MagicMock(return_value=mock_task) mock_component._torchx_role = app.roles[0] mock_component_fn.return_value = mock_component - + task = container_from_app(app) - + # CPU should be truncated to integer (1.5 -> 1) mock_task.set_cpu_request.assert_called_once_with("1") mock_task.set_cpu_limit.assert_called_once_with("1") @@ -292,7 +311,7 @@ def test_fractional_cpu_handling(self): class TestRetryAndErrorHandling(unittest.TestCase): """Test retry policies and error handling configurations.""" - + def test_retry_policy_configurations(self): """Test various retry policy configurations.""" app = specs.AppDef( @@ -307,39 +326,41 @@ def test_retry_policy_configurations(self): ) ], ) - + retry_configs = [ ({"max_retry_count": 5}, {"num_retries": 5}), ( {"max_retry_count": 3, "backoff_duration": "30s"}, - {"num_retries": 3, "backoff_duration": "30s"} + {"num_retries": 3, "backoff_duration": "30s"}, ), ( { "max_retry_count": 2, "backoff_factor": 2.0, - "backoff_max_duration": "300s" + "backoff_max_duration": "300s", }, { "num_retries": 2, "backoff_factor": 2.0, - "backoff_max_duration": "300s" - } + "backoff_max_duration": "300s", + }, ), ] - + for retry_policy, expected_args in retry_configs: with self.subTest(retry_policy=retry_policy): - with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + with mock.patch( + "torchx.pipelines.kfp.adapter.component_from_app" + ) as mock_component_fn: mock_task = mock.MagicMock() mock_component = mock.MagicMock(return_value=mock_task) mock_component._torchx_role = app.roles[0] mock_component_fn.return_value = mock_component - + task = container_from_app(app, retry_policy=retry_policy) - + mock_task.set_retry.assert_called_once_with(**expected_args) - + def test_timeout_configuration(self): """Test timeout configuration for tasks.""" # Skip this test - timeout is not currently implemented in container_from_app @@ -348,7 +369,7 @@ def test_timeout_configuration(self): class TestEnvironmentVariables(unittest.TestCase): """Test environment variable handling.""" - + def test_environment_variable_setting(self): """Test that environment variables are properly set on tasks.""" env_vars = { @@ -358,7 +379,7 @@ def test_environment_variable_setting(self): "PATH_VAR": "/usr/local/bin:/usr/bin", "EMPTY_VAR": "", } - + app = specs.AppDef( name="env-app", roles=[ @@ -372,15 +393,17 @@ def test_environment_variable_setting(self): ) ], ) - - with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + + with mock.patch( + "torchx.pipelines.kfp.adapter.component_from_app" + ) as mock_component_fn: mock_task = mock.MagicMock() mock_component = mock.MagicMock(return_value=mock_task) mock_component._torchx_role = app.roles[0] mock_component_fn.return_value = mock_component - + task = container_from_app(app) - + # Verify all environment variables were set expected_calls = [ mock.call(name=name, value=str(value)) @@ -388,7 +411,7 @@ def test_environment_variable_setting(self): ] mock_task.set_env_variable.assert_has_calls(expected_calls, any_order=True) self.assertEqual(mock_task.set_env_variable.call_count, len(env_vars)) - + def test_special_environment_variables(self): """Test handling of special environment variables.""" special_env_vars = { @@ -397,7 +420,7 @@ def test_special_environment_variables(self): "PYTHONPATH": "/app:/lib", "LD_LIBRARY_PATH": "/usr/local/cuda/lib64", } - + app = specs.AppDef( name="special-env-app", roles=[ @@ -411,15 +434,17 @@ def test_special_environment_variables(self): ) ], ) - - with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + + with mock.patch( + "torchx.pipelines.kfp.adapter.component_from_app" + ) as mock_component_fn: mock_task = mock.MagicMock() mock_component = mock.MagicMock(return_value=mock_task) mock_component._torchx_role = app.roles[0] mock_component_fn.return_value = mock_component - + task = container_from_app(app) - + # Verify special environment variables are set correctly for name, value in special_env_vars.items(): mock_task.set_env_variable.assert_any_call(name=name, value=value) @@ -427,7 +452,7 @@ def test_special_environment_variables(self): class TestEdgeCases(unittest.TestCase): """Test edge cases and error conditions.""" - + def test_minimal_resource_spec(self): """Test handling of minimal resource specifications.""" app = specs.AppDef( @@ -442,15 +467,17 @@ def test_minimal_resource_spec(self): ) ], ) - - with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + + with mock.patch( + "torchx.pipelines.kfp.adapter.component_from_app" + ) as mock_component_fn: mock_task = mock.MagicMock() mock_component = mock.MagicMock(return_value=mock_task) mock_component._torchx_role = app.roles[0] mock_component_fn.return_value = mock_component - + task = container_from_app(app) - + # Verify no resource methods were called for zero resources mock_task.set_cpu_request.assert_not_called() mock_task.set_cpu_limit.assert_not_called() @@ -458,7 +485,7 @@ def test_minimal_resource_spec(self): mock_task.set_memory_limit.assert_not_called() mock_task.set_accelerator_type.assert_not_called() mock_task.set_accelerator_limit.assert_not_called() - + def test_very_large_resources(self): """Test handling of very large resource requests.""" app = specs.AppDef( @@ -473,22 +500,24 @@ def test_very_large_resources(self): ) ], ) - - with mock.patch('torchx.pipelines.kfp.adapter.component_from_app') as mock_component_fn: + + with mock.patch( + "torchx.pipelines.kfp.adapter.component_from_app" + ) as mock_component_fn: mock_task = mock.MagicMock() mock_component = mock.MagicMock(return_value=mock_task) mock_component._torchx_role = app.roles[0] mock_component_fn.return_value = mock_component - + task = container_from_app(app) - + # Verify large resources are set correctly mock_task.set_cpu_request.assert_called_once_with("128") mock_task.set_cpu_limit.assert_called_once_with("128") mock_task.set_memory_request.assert_called_once_with("524288M") mock_task.set_memory_limit.assert_called_once_with("524288M") mock_task.set_accelerator_limit.assert_called_once_with("8") - + def test_empty_args_and_entrypoint(self): """Test component with no args.""" app = specs.AppDef( @@ -503,9 +532,9 @@ def test_empty_args_and_entrypoint(self): ) ], ) - + component = component_from_app(app) - + # Verify component is created successfully self.assertEqual(component._torchx_role.entrypoint, "/app/start.sh") self.assertEqual(component._torchx_role.args, []) diff --git a/torchx/pipelines/kfp/test/version_test.py b/torchx/pipelines/kfp/test/version_test.py index e33ef535e..a2cd58dec 100644 --- a/torchx/pipelines/kfp/test/version_test.py +++ b/torchx/pipelines/kfp/test/version_test.py @@ -9,8 +9,8 @@ import importlib import unittest -from unittest.mock import patch import warnings +from unittest.mock import patch class VersionTest(unittest.TestCase):