|
| 1 | +from time import sleep |
| 2 | + |
| 3 | +from codeflare_sdk import Cluster, ClusterConfiguration |
| 4 | +from codeflare_sdk.ray.rayjobs import RayJob |
| 5 | +from codeflare_sdk.ray.rayjobs.status import CodeflareRayJobStatus |
| 6 | + |
| 7 | +import pytest |
| 8 | + |
| 9 | +from support import * |
| 10 | + |
| 11 | +# This test creates a Ray Cluster and then submits a RayJob against the existing cluster on Kind Cluster |
| 12 | + |
| 13 | + |
| 14 | +@pytest.mark.kind |
| 15 | +class TestRayJobExistingClusterKind: |
| 16 | + def setup_method(self): |
| 17 | + initialize_kubernetes_client(self) |
| 18 | + |
| 19 | + def teardown_method(self): |
| 20 | + delete_namespace(self) |
| 21 | + delete_kueue_resources(self) |
| 22 | + |
| 23 | + def test_rayjob_ray_cluster_sdk_kind(self): |
| 24 | + self.setup_method() |
| 25 | + create_namespace(self) |
| 26 | + create_kueue_resources(self) |
| 27 | + self.run_rayjob_against_existing_cluster_kind(accelerator="cpu") |
| 28 | + |
| 29 | + @pytest.mark.nvidia_gpu |
| 30 | + def test_rayjob_ray_cluster_sdk_kind_nvidia_gpu(self): |
| 31 | + self.setup_method() |
| 32 | + create_namespace(self) |
| 33 | + create_kueue_resources(self) |
| 34 | + self.run_rayjob_against_existing_cluster_kind( |
| 35 | + accelerator="gpu", number_of_gpus=1 |
| 36 | + ) |
| 37 | + |
| 38 | + def run_rayjob_against_existing_cluster_kind( |
| 39 | + self, accelerator, gpu_resource_name="nvidia.com/gpu", number_of_gpus=0 |
| 40 | + ): |
| 41 | + cluster_name = "existing-cluster" |
| 42 | + cluster = Cluster( |
| 43 | + ClusterConfiguration( |
| 44 | + name=cluster_name, |
| 45 | + namespace=self.namespace, |
| 46 | + num_workers=1, |
| 47 | + head_cpu_requests="500m", |
| 48 | + head_cpu_limits="500m", |
| 49 | + worker_cpu_requests="500m", |
| 50 | + worker_cpu_limits=1, |
| 51 | + worker_memory_requests=1, |
| 52 | + worker_memory_limits=4, |
| 53 | + worker_extended_resource_requests={gpu_resource_name: number_of_gpus}, |
| 54 | + write_to_file=True, |
| 55 | + verify_tls=False, |
| 56 | + ) |
| 57 | + ) |
| 58 | + |
| 59 | + cluster.apply() |
| 60 | + cluster.status() |
| 61 | + cluster.wait_ready() |
| 62 | + cluster.status() |
| 63 | + cluster.details() |
| 64 | + |
| 65 | + print(f"✅ Ray cluster '{cluster_name}' is ready") |
| 66 | + |
| 67 | + # test RayJob submission against the existing cluster |
| 68 | + self.assert_rayjob_submit_against_existing_cluster( |
| 69 | + cluster, accelerator, number_of_gpus |
| 70 | + ) |
| 71 | + |
| 72 | + # Cleanup - manually tear down the cluster since job won't do it |
| 73 | + print("🧹 Cleaning up Ray cluster") |
| 74 | + cluster.down() |
| 75 | + |
| 76 | + def assert_rayjob_submit_against_existing_cluster( |
| 77 | + self, cluster, accelerator, number_of_gpus |
| 78 | + ): |
| 79 | + """ |
| 80 | + Test RayJob submission against an existing Ray cluster. |
| 81 | + """ |
| 82 | + cluster_name = cluster.config.name |
| 83 | + job_name = f"mnist-rayjob-{accelerator}" |
| 84 | + |
| 85 | + print(f"🚀 Testing RayJob submission against existing cluster '{cluster_name}'") |
| 86 | + |
| 87 | + # Create RayJob targeting the existing cluster |
| 88 | + rayjob = RayJob( |
| 89 | + job_name=job_name, |
| 90 | + cluster_name=cluster_name, |
| 91 | + namespace=self.namespace, |
| 92 | + entrypoint="python mnist.py", |
| 93 | + runtime_env={ |
| 94 | + "working_dir": "./tests/e2e/", |
| 95 | + "pip": "./tests/e2e/mnist_pip_requirements.txt", |
| 96 | + "env_vars": get_setup_env_variables(ACCELERATOR=accelerator), |
| 97 | + }, |
| 98 | + shutdown_after_job_finishes=False, # Don't shutdown the existing cluster |
| 99 | + ) |
| 100 | + |
| 101 | + # Submit the job |
| 102 | + submission_result = rayjob.submit() |
| 103 | + assert ( |
| 104 | + submission_result == job_name |
| 105 | + ), f"Job submission failed, expected {job_name}, got {submission_result}" |
| 106 | + print(f"✅ Successfully submitted RayJob '{job_name}' against existing cluster") |
| 107 | + |
| 108 | + # Monitor the job status until completion |
| 109 | + self.monitor_rayjob_completion(rayjob, timeout=900) |
| 110 | + |
| 111 | + print(f"✅ RayJob '{job_name}' completed successfully against existing cluster!") |
| 112 | + |
| 113 | + def monitor_rayjob_completion(self, rayjob: RayJob, timeout: int = 900): |
| 114 | + """ |
| 115 | + Monitor a RayJob until it completes or fails. |
| 116 | +
|
| 117 | + Args: |
| 118 | + rayjob: The RayJob instance to monitor |
| 119 | + timeout: Maximum time to wait in seconds (default: 15 minutes) |
| 120 | + """ |
| 121 | + print(f"⏳ Monitoring RayJob '{rayjob.name}' status...") |
| 122 | + |
| 123 | + elapsed_time = 0 |
| 124 | + check_interval = 10 # Check every 10 seconds |
| 125 | + |
| 126 | + while elapsed_time < timeout: |
| 127 | + status, ready = rayjob.status(print_to_console=True) |
| 128 | + |
| 129 | + # Check if job has completed (either successfully or failed) |
| 130 | + if status == CodeflareRayJobStatus.COMPLETE: |
| 131 | + print(f"✅ RayJob '{rayjob.name}' completed successfully!") |
| 132 | + return |
| 133 | + elif status == CodeflareRayJobStatus.FAILED: |
| 134 | + raise AssertionError(f"❌ RayJob '{rayjob.name}' failed!") |
| 135 | + elif status == CodeflareRayJobStatus.RUNNING: |
| 136 | + print(f"🏃 RayJob '{rayjob.name}' is still running...") |
| 137 | + elif status == CodeflareRayJobStatus.UNKNOWN: |
| 138 | + print(f"❓ RayJob '{rayjob.name}' status is unknown") |
| 139 | + |
| 140 | + # Wait before next check |
| 141 | + sleep(check_interval) |
| 142 | + elapsed_time += check_interval |
| 143 | + |
| 144 | + # If we reach here, the job has timed out |
| 145 | + final_status, _ = rayjob.status(print_to_console=True) |
| 146 | + raise TimeoutError( |
| 147 | + f"⏰ RayJob '{rayjob.name}' did not complete within {timeout} seconds. " |
| 148 | + f"Final status: {final_status}" |
| 149 | + ) |
0 commit comments