Skip to content

Commit 4ab05f3

Browse files
authored
fix: long-running jobs crash with SIGTERM (#370)
1 parent df77102 commit 4ab05f3

File tree

1 file changed

+16
-36
lines changed

1 file changed

+16
-36
lines changed

runpod/serverless/modules/rp_scale.py

Lines changed: 16 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -82,34 +82,15 @@ def handle_shutdown(self, signum, frame):
8282

8383
async def run(self):
8484
# Create an async session that will be closed when the worker is killed.
85-
8685
async with AsyncClientSession() as session:
8786
# Create tasks for getting and running jobs.
8887
jobtake_task = asyncio.create_task(self.get_jobs(session))
8988
jobrun_task = asyncio.create_task(self.run_jobs(session))
9089

9190
tasks = [jobtake_task, jobrun_task]
9291

93-
try:
94-
# Concurrently run both tasks and wait for both to finish.
95-
await asyncio.gather(*tasks)
96-
except asyncio.CancelledError: # worker is killed
97-
log.debug("Worker tasks cancelled.")
98-
self.kill_worker()
99-
finally:
100-
# Handle the task cancellation gracefully
101-
for task in tasks:
102-
if not task.done():
103-
task.cancel()
104-
await asyncio.gather(*tasks, return_exceptions=True)
105-
await self.cleanup() # Ensure resources are cleaned up
106-
107-
async def cleanup(self):
108-
# Perform any necessary cleanup here, such as closing connections
109-
log.debug("Cleaning up resources before shutdown.")
110-
# TODO: stop heartbeat or close any open connections
111-
await asyncio.sleep(0) # Give a chance for other tasks to run (optional)
112-
log.debug("Cleanup complete.")
92+
# Concurrently run both tasks and wait for both to finish.
93+
await asyncio.gather(*tasks)
11394

11495
def is_alive(self):
11596
"""
@@ -121,6 +102,7 @@ def kill_worker(self):
121102
"""
122103
Whether to kill the worker.
123104
"""
105+
log.info("Kill worker.")
124106
self._shutdown_event.set()
125107

126108
async def get_jobs(self, session: ClientSession):
@@ -142,42 +124,40 @@ async def get_jobs(self, session: ClientSession):
142124
jobs_needed = self.current_concurrency - job_progress.get_job_count()
143125
if jobs_needed <= 0:
144126
log.debug("JobScaler.get_jobs | Queue is full. Retrying soon.")
145-
await asyncio.sleep(0.1) # don't go rapidly
127+
await asyncio.sleep(1) # don't go rapidly
146128
continue
147129

148130
try:
149131
# Keep the connection to the blocking call up to 30 seconds
150132
acquired_jobs = await asyncio.wait_for(
151133
get_job(session, jobs_needed), timeout=30
152134
)
135+
136+
if not acquired_jobs:
137+
log.debug("JobScaler.get_jobs | No jobs acquired.")
138+
continue
139+
140+
for job in acquired_jobs:
141+
await job_list.add_job(job)
142+
143+
log.info(f"Jobs in queue: {job_list.get_job_count()}")
144+
153145
except TooManyRequests:
154146
log.debug(f"JobScaler.get_jobs | Too many requests. Debounce for 5 seconds.")
155147
await asyncio.sleep(5) # debounce for 5 seconds
156-
continue
157148
except asyncio.CancelledError:
158149
log.debug("JobScaler.get_jobs | Request was cancelled.")
159-
continue
160150
except TimeoutError:
161151
log.debug("JobScaler.get_jobs | Job acquisition timed out. Retrying.")
162-
continue
163152
except TypeError as error:
164153
log.debug(f"JobScaler.get_jobs | Unexpected error: {error}.")
165-
continue
166154
except Exception as error:
167155
log.error(
168156
f"Failed to get job. | Error Type: {type(error).__name__} | Error Message: {str(error)}"
169157
)
170-
continue
171-
172-
if not acquired_jobs:
173-
log.debug("JobScaler.get_jobs | No jobs acquired.")
158+
finally:
159+
# Yield control back to the event loop
174160
await asyncio.sleep(0)
175-
continue
176-
177-
for job in acquired_jobs:
178-
await job_list.add_job(job)
179-
180-
log.info(f"Jobs in queue: {job_list.get_job_count()}")
181161

182162
async def run_jobs(self, session: ClientSession):
183163
"""

0 commit comments

Comments
 (0)