Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/codeflare_sdk/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

from .rayjobs import (
RayJob,
RayJobDeploymentStatus,
CodeflareRayJobStatus,
RayJobInfo,
)

from .cluster import (
Expand Down
1 change: 1 addition & 0 deletions src/codeflare_sdk/ray/rayjobs/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .rayjob import RayJob
from .status import RayJobDeploymentStatus, CodeflareRayJobStatus, RayJobInfo
116 changes: 116 additions & 0 deletions src/codeflare_sdk/ray/rayjobs/pretty_print.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Copyright 2025 IBM, Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This sub-module exists primarily to be used internally by the RayJob object
(in the rayjob sub-module) for pretty-printing job status and details.
"""

from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from typing import Tuple, Optional

from .status import RayJobDeploymentStatus, RayJobInfo


def print_job_status(job_info: RayJobInfo):
"""
Pretty print the job status in a format similar to cluster status.
"""
status_display, header_color = _get_status_display(job_info.status)

# Create main info table
table = _create_info_table(header_color, job_info.name, status_display)
table.add_row(f"[bold]Job ID:[/bold] {job_info.job_id}")
table.add_row(f"[bold]Status:[/bold] {job_info.status.value}")
table.add_row(f"[bold]RayCluster:[/bold] {job_info.cluster_name}")
table.add_row(f"[bold]Namespace:[/bold] {job_info.namespace}")

# Add timing information if available
if job_info.start_time:
table.add_row(f"[bold]Started:[/bold] {job_info.start_time}")

# Add attempt counts if there are failures
if job_info.failed_attempts > 0:
table.add_row(f"[bold]Failed Attempts:[/bold] {job_info.failed_attempts}")

_print_table_in_panel(table)


def print_no_job_found(job_name: str, namespace: str):
"""
Print a message when no job is found.
"""
# Create table with error message
table = _create_info_table(
"[white on red][bold]Name", job_name, "[bold red]No RayJob found"
)
table.add_row()
table.add_row("Please run rayjob.submit() to submit a job.")
table.add_row()
table.add_row(f"[bold]Namespace:[/bold] {namespace}")

_print_table_in_panel(table)


def _get_status_display(status: RayJobDeploymentStatus) -> Tuple[str, str]:
"""
Get the display string and header color for a given status.

Returns:
Tuple of (status_display, header_color)
"""
status_mapping = {
RayJobDeploymentStatus.COMPLETE: (
"Complete :white_heavy_check_mark:",
"[white on green][bold]Name",
),
RayJobDeploymentStatus.RUNNING: ("Running :gear:", "[white on blue][bold]Name"),
RayJobDeploymentStatus.FAILED: ("Failed :x:", "[white on red][bold]Name"),
RayJobDeploymentStatus.SUSPENDED: (
"Suspended :pause_button:",
"[white on yellow][bold]Name",
),
}

return status_mapping.get(
status, ("Unknown :question:", "[white on red][bold]Name")
)


def _create_info_table(header_color: str, name: str, status_display: str) -> Table:
"""
Create a standardized info table with header and status.

Returns:
Table with header row, name/status row, and empty separator row
"""
table = Table(box=None, show_header=False)
table.add_row(header_color)
table.add_row("[bold underline]" + name, status_display)
table.add_row() # Empty separator row
return table


def _print_table_in_panel(table: Table):
"""
Print a table wrapped in a consistent panel format.
"""
console = Console()
main_table = Table(
box=None, title="[bold] :package: CodeFlare RayJob Status :package:"
)
main_table.add_row(Panel.fit(table))
console.print(main_table)
95 changes: 93 additions & 2 deletions src/codeflare_sdk/ray/rayjobs/rayjob.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,32 @@
# Copyright 2025 IBM, Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
RayJob client for submitting and managing Ray jobs using the odh-kuberay-client.
"""

import logging
from typing import Dict, Any, Optional
from typing import Dict, Any, Optional, Tuple
from odh_kuberay_client.kuberay_job_api import RayjobApi

from .status import (
RayJobDeploymentStatus,
CodeflareRayJobStatus,
RayJobInfo,
)
from . import pretty_print

# Set up logging
logger = logging.getLogger(__name__)

Expand All @@ -15,7 +36,7 @@ class RayJob:
A client for managing Ray jobs using the KubeRay operator.

This class provides a simplified interface for submitting and managing
Ray jobs in a Kubernetes cluster with the KubeRay operator installed.
RayJob CRs (using the KubeRay RayJob python client).
"""

def __init__(
Expand Down Expand Up @@ -109,3 +130,73 @@ def _build_rayjob_cr(
rayjob_cr["spec"]["runtimeEnvYAML"] = str(runtime_env)

return rayjob_cr

def status(
self, print_to_console: bool = True
) -> Tuple[CodeflareRayJobStatus, bool]:
"""
Get the status of the Ray job.

Args:
print_to_console (bool): Whether to print formatted status to console (default: True)

Returns:
Tuple of (CodeflareRayJobStatus, ready: bool) where ready indicates job completion
"""
status_data = self._api.get_job_status(
name=self.name, k8s_namespace=self.namespace
)

if not status_data:
if print_to_console:
pretty_print.print_no_job_found(self.name, self.namespace)
return CodeflareRayJobStatus.UNKNOWN, False

# Map deployment status to our enums
deployment_status_str = status_data.get("jobDeploymentStatus", "Unknown")

try:
deployment_status = RayJobDeploymentStatus(deployment_status_str)
except ValueError:
deployment_status = RayJobDeploymentStatus.UNKNOWN

# Create RayJobInfo dataclass
job_info = RayJobInfo(
name=self.name,
job_id=status_data.get("jobId", ""),
status=deployment_status,
namespace=self.namespace,
cluster_name=self.cluster_name,
start_time=status_data.get("startTime"),
end_time=status_data.get("endTime"),
failed_attempts=status_data.get("failed", 0),
succeeded_attempts=status_data.get("succeeded", 0),
)

# Map to CodeFlare status and determine readiness
codeflare_status, ready = self._map_to_codeflare_status(deployment_status)

if print_to_console:
pretty_print.print_job_status(job_info)

return codeflare_status, ready

def _map_to_codeflare_status(
self, deployment_status: RayJobDeploymentStatus
) -> Tuple[CodeflareRayJobStatus, bool]:
"""
Map deployment status to CodeFlare status and determine readiness.

Returns:
Tuple of (CodeflareRayJobStatus, ready: bool)
"""
status_mapping = {
RayJobDeploymentStatus.COMPLETE: (CodeflareRayJobStatus.COMPLETE, True),
RayJobDeploymentStatus.RUNNING: (CodeflareRayJobStatus.RUNNING, False),
RayJobDeploymentStatus.FAILED: (CodeflareRayJobStatus.FAILED, False),
RayJobDeploymentStatus.SUSPENDED: (CodeflareRayJobStatus.SUSPENDED, False),
}

return status_mapping.get(
deployment_status, (CodeflareRayJobStatus.UNKNOWN, False)
)
64 changes: 64 additions & 0 deletions src/codeflare_sdk/ray/rayjobs/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright 2025 IBM, Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The status sub-module defines Enums containing information for Ray job
deployment states and CodeFlare job states, as well as
dataclasses to store information for Ray jobs.
"""

from dataclasses import dataclass
from enum import Enum
from typing import Optional


class RayJobDeploymentStatus(Enum):
"""
Defines the possible deployment states of a Ray job (from the KubeRay RayJob API).
"""

COMPLETE = "Complete"
RUNNING = "Running"
FAILED = "Failed"
SUSPENDED = "Suspended"
UNKNOWN = "Unknown"


class CodeflareRayJobStatus(Enum):
"""
Defines the possible reportable states of a CodeFlare Ray job.
"""

COMPLETE = 1
RUNNING = 2
FAILED = 3
SUSPENDED = 4
UNKNOWN = 5


@dataclass
class RayJobInfo:
"""
For storing information about a Ray job.
"""

name: str
job_id: str
status: RayJobDeploymentStatus
namespace: str
cluster_name: str
start_time: Optional[str] = None
end_time: Optional[str] = None
failed_attempts: int = 0
succeeded_attempts: int = 0
Loading
Loading