Skip to content

Conversation

AstraBert
Copy link
Member

Attempting the creation of a workflow client

@coveralls
Copy link

coveralls commented Sep 23, 2025

Pull Request Test Coverage Report for Build 18064673242

Details

  • 0 of 0 changed or added relevant lines in 0 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage remained the same at 90.271%

Totals Coverage Status
Change from base Build 18041425758: 0.0%
Covered Lines: 1531
Relevant Lines: 1696

💛 - Coveralls


[project.optional-dependencies]
server = ["starlette>=0.39.0", "uvicorn>=0.32.0"]
client = ["httpx>=0.28.1"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should bound this to <1. Looks like they're working on some breaking changes

**kwargs: Any number of keyword arguments that would be passed on as additional keyword arguments to the workflow.
Returns:
str: ID of the handler running the workflow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe return a typed dict for the workflow handler dict that @zhaotai is adding? #110


return response.json()["handler_id"]

async def _stream_events_sse(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason to support both of these. I think for python, the ndjson is a lot simpler

if line.strip(): # Skip empty lines
try:
event = json.loads(line.replace("\n", ""))
yield event.get("value", {})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shouldn't just emit the value. Ignoring the qualified_name is throwing out valueable information, otherwise the caller doesn't know exactly what type this is supposed to be. Especially with workflows, where there's lots of meta programming based on classes (e.g. there's often events without even any values).

We could also perhaps have an option to plumb it through the JsonSerializer in case they happen to have the same python class around? Not sure what that would really look like though.

async def stream_events(
self,
handler_id: str,
event_callback: Callable[[dict[str, Any]], Any] | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why isn't this just an async generator too? That seems easier than having a callback

"""
if isinstance(event, Event):
try:
event = event.model_dump_json()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this won't work. We need to use that JsonSerializer, otherwise the server will just get this back of json values, and not know what it's supposed to deserialize to. I think rather than requiring a dict[str, Any], we should have something simple like a TypedDict or yet another pydantic model to represent the serialized state. (the thing with the qualified_name)

if response.status_code == 202:
return

return response.json()["result"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhaotai is also updating this API to return the full handler, so maybe don't throw out the rest of that info

@adrianlyjak
Copy link
Contributor

IMO, I think we'll need a low level basic API client like this either way, so this is great, but I kind of think we should treat it as private, and rather expose an "easy to use" interface that looks more like the handler response from a workflow.run. That also maybe frees us up to make more internal changes as long as we keep the outer interface more stable

@adrianlyjak
Copy link
Contributor

Also worth considering potentially generating a client for the lower API integration (with something like https://github.com/openapi-generators/openapi-python-client). The main advantage there is probably keeping us honest as types change while we develop the API.

We have the OpenAPI spec easily exportable already.

@AstraBert AstraBert marked this pull request as ready for review September 25, 2025 14:53
Copy link
Contributor

@adrianlyjak adrianlyjak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some small questions around handler parameters. @AstraBert how do you feel about the generated client? I know I suggested it, but wondering if its helping or hurting (its astonishing how much code it generates!)

else:
raise ValueError("Response was not properly generated")

async def get_workflow_result(self, handler: Handler) -> Any:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this just be a handler id? as input, and return a Handler as a result?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it should, but I thought it would be easier and more future-proof passing the handler (maybe in the future we want to use more parameters from it). But it can be easily reverted to handler id, I also think it may be a little bit bloaty to put the entire handler there

"""
Stream events using newline-delimited JSON format
"""
url = f"/events/{handler.handler_id}?sse=false"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: url encode the handler in case its some garbage string that has slashes/question marks in it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not know it, I'll sanitize it later today along with reverting to pass onlu the handler id


async def get_workflow_events(
self,
handler: Handler,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this just be a string id?

@AstraBert
Copy link
Member Author

@adrianlyjak personally not a super fan of this last iteration, I feel like we could do more with less code (previous iteration :))

@AstraBert
Copy link
Member Author

Reverted back to the initial implementation with some adjustments, @adrianlyjak :)

return response.json().get("status", "") == "healthy"
return False

async def ping(self) -> float:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not sure I see value in having this predefined in the client

Returns:
dict[str, Any]: JSON representation of the handler running the workflow
"""
if isinstance(start_event, StartEvent):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: nice to have some shared functions to consolidate this duplication

context = context.to_dict()
except Exception as e:
raise ValueError(f"Impossible to serialize the context because of: {e}")
request_body = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we should build out some shared TypedDict models between client/server to lightly type things and make type discrepancies obvious

workflow_name: str,
start_event: Union[StartEvent, dict[str, Any], None] = None,
context: Union[Context, dict[str, Any], None] = None,
**kwargs: Any,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've never actually found a use case for these kwargs. I'd recommend hiding them from the interface to avoid breaking changes, and if we do need to introduce them, use Unpack[TypedDict] types so that the exact set of supported options is obvious and documented

self,
workflow_name: str,
start_event: Union[StartEvent, dict[str, Any], None] = None,
context: Union[Context, dict[str, Any], None] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this and run are missing the new handler_id option


async def get_workflow_events(
self,
handler_id: str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs the new include_internal option

"""
if isinstance(event, Event):
try:
event = event.model_dump_json()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the wrong way to serialize. These need to be the JsonSerializer serialized values, otherwise the server will not know what type to deserialize to

if response.status_code == 202:
return

return response.json()["result"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should expose the full response TypedDict (already defined for the server), and return that. That way you can get the current status, etc.

logger = getLogger(__name__)


class WorkflowClient:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a /handlers method added here. Should we include that? Related to the /result api, it returns a list of the same data model

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants