-
Notifications
You must be signed in to change notification settings - Fork 24
wip: first attempt to workflow client #109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Pull Request Test Coverage Report for Build 18064673242Details
💛 - Coveralls |
|
||
[project.optional-dependencies] | ||
server = ["starlette>=0.39.0", "uvicorn>=0.32.0"] | ||
client = ["httpx>=0.28.1"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should bound this to <1
. Looks like they're working on some breaking changes
src/workflows/client/client.py
Outdated
**kwargs: Any number of keyword arguments that would be passed on as additional keyword arguments to the workflow. | ||
Returns: | ||
str: ID of the handler running the workflow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/workflows/client/client.py
Outdated
|
||
return response.json()["handler_id"] | ||
|
||
async def _stream_events_sse( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a reason to support both of these. I think for python, the ndjson is a lot simpler
src/workflows/client/client.py
Outdated
if line.strip(): # Skip empty lines | ||
try: | ||
event = json.loads(line.replace("\n", "")) | ||
yield event.get("value", {}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this shouldn't just emit the value
. Ignoring the qualified_name
is throwing out valueable information, otherwise the caller doesn't know exactly what type this is supposed to be. Especially with workflows, where there's lots of meta programming based on classes (e.g. there's often events without even any values).
We could also perhaps have an option to plumb it through the JsonSerializer in case they happen to have the same python class around? Not sure what that would really look like though.
src/workflows/client/client.py
Outdated
async def stream_events( | ||
self, | ||
handler_id: str, | ||
event_callback: Callable[[dict[str, Any]], Any] | None = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why isn't this just an async generator too? That seems easier than having a callback
""" | ||
if isinstance(event, Event): | ||
try: | ||
event = event.model_dump_json() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this won't work. We need to use that JsonSerializer
, otherwise the server will just get this back of json values, and not know what it's supposed to deserialize to. I think rather than requiring a dict[str, Any]
, we should have something simple like a TypedDict or yet another pydantic model to represent the serialized state. (the thing with the qualified_name
)
if response.status_code == 202: | ||
return | ||
|
||
return response.json()["result"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhaotai is also updating this API to return the full handler, so maybe don't throw out the rest of that info
IMO, I think we'll need a low level basic API client like this either way, so this is great, but I kind of think we should treat it as private, and rather expose an "easy to use" interface that looks more like the handler response from a |
Also worth considering potentially generating a client for the lower API integration (with something like https://github.com/openapi-generators/openapi-python-client). The main advantage there is probably keeping us honest as types change while we develop the API. We have the OpenAPI spec easily exportable already. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added some small questions around handler parameters. @AstraBert how do you feel about the generated client? I know I suggested it, but wondering if its helping or hurting (its astonishing how much code it generates!)
src/workflows/client/client.py
Outdated
else: | ||
raise ValueError("Response was not properly generated") | ||
|
||
async def get_workflow_result(self, handler: Handler) -> Any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this just be a handler id? as input, and return a Handler as a result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it should, but I thought it would be easier and more future-proof passing the handler (maybe in the future we want to use more parameters from it). But it can be easily reverted to handler id, I also think it may be a little bit bloaty to put the entire handler there
src/workflows/client/client.py
Outdated
""" | ||
Stream events using newline-delimited JSON format | ||
""" | ||
url = f"/events/{handler.handler_id}?sse=false" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: url encode the handler in case its some garbage string that has slashes/question marks in it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not know it, I'll sanitize it later today along with reverting to pass onlu the handler id
src/workflows/client/client.py
Outdated
|
||
async def get_workflow_events( | ||
self, | ||
handler: Handler, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this just be a string id?
@adrianlyjak personally not a super fan of this last iteration, I feel like we could do more with less code (previous iteration :)) |
Reverted back to the initial implementation with some adjustments, @adrianlyjak :) |
return response.json().get("status", "") == "healthy" | ||
return False | ||
|
||
async def ping(self) -> float: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not sure I see value in having this predefined in the client
Returns: | ||
dict[str, Any]: JSON representation of the handler running the workflow | ||
""" | ||
if isinstance(start_event, StartEvent): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: nice to have some shared functions to consolidate this duplication
context = context.to_dict() | ||
except Exception as e: | ||
raise ValueError(f"Impossible to serialize the context because of: {e}") | ||
request_body = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think we should build out some shared TypedDict
models between client/server to lightly type things and make type discrepancies obvious
workflow_name: str, | ||
start_event: Union[StartEvent, dict[str, Any], None] = None, | ||
context: Union[Context, dict[str, Any], None] = None, | ||
**kwargs: Any, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've never actually found a use case for these kwargs. I'd recommend hiding them from the interface to avoid breaking changes, and if we do need to introduce them, use Unpack[TypedDict]
types so that the exact set of supported options is obvious and documented
self, | ||
workflow_name: str, | ||
start_event: Union[StartEvent, dict[str, Any], None] = None, | ||
context: Union[Context, dict[str, Any], None] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this and run
are missing the new handler_id
option
|
||
async def get_workflow_events( | ||
self, | ||
handler_id: str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs the new include_internal option
""" | ||
if isinstance(event, Event): | ||
try: | ||
event = event.model_dump_json() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the wrong way to serialize. These need to be the JsonSerializer
serialized values, otherwise the server will not know what type to deserialize to
if response.status_code == 202: | ||
return | ||
|
||
return response.json()["result"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should expose the full response TypedDict (already defined for the server), and return that. That way you can get the current status, etc.
logger = getLogger(__name__) | ||
|
||
|
||
class WorkflowClient: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a /handlers
method added here. Should we include that? Related to the /result
api, it returns a list of the same data model
Attempting the creation of a workflow client