Skip to content

Commit 969cc35

Browse files
authored
Update async job streaming stop condition and test cases (#404)
1 parent 2c62255 commit 969cc35

File tree

2 files changed

+8
-5
lines changed

2 files changed

+8
-5
lines changed

runpod/endpoint/asyncio/asyncio_runner.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
""" Module for running endpoints asynchronously. """
1+
"""Module for running endpoints asynchronously."""
22

33
# pylint: disable=too-few-public-methods,R0801
44

@@ -89,9 +89,14 @@ async def stream(self) -> Any:
8989
while True:
9090
await asyncio.sleep(1)
9191
stream_partial = await self._fetch_job(source="stream")
92-
if stream_partial["status"] not in FINAL_STATES:
92+
if (
93+
stream_partial["status"] not in FINAL_STATES
94+
or len(stream_partial.get("stream", [])) > 0
95+
):
9396
for chunk in stream_partial.get("stream", []):
9497
yield chunk["output"]
98+
elif stream_partial["status"] in FINAL_STATES:
99+
break
95100

96101
async def cancel(self) -> dict:
97102
"""Cancels current job

tests/test_endpoint/test_asyncio_runner.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
""" Unit tests for the asyncio_runner module. """
1+
"""Unit tests for the asyncio_runner module."""
22

33
# pylint: disable=too-few-public-methods
44

@@ -114,8 +114,6 @@ async def json_side_effect():
114114
outputs = []
115115
async for stream_output in job.stream():
116116
outputs.append(stream_output)
117-
if not responses: # Break the loop when responses are exhausted
118-
break
119117

120118
assert outputs == ["OUTPUT1", "OUTPUT2"]
121119

0 commit comments

Comments
 (0)