From 3d6971b39d0315b1b338749af7af6f7757fa5dca Mon Sep 17 00:00:00 2001 From: tdstein Date: Tue, 6 May 2025 14:41:08 -0400 Subject: [PATCH] feat: improve task polling with exponential backoff MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements an improved task polling mechanism with configurable exponential backoff to reduce the number of API calls for long-running tasks. This helps to minimize resource consumption on the Connect server while still providing responsive feedback. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- src/posit/connect/tasks.py | 27 ++++++++++- tests/posit/connect/test_tasks.py | 78 +++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 1 deletion(-) diff --git a/src/posit/connect/tasks.py b/src/posit/connect/tasks.py index ac6b2931..5b96e28d 100644 --- a/src/posit/connect/tasks.py +++ b/src/posit/connect/tasks.py @@ -2,6 +2,8 @@ from __future__ import annotations +import time + from typing_extensions import overload from . import resources @@ -95,17 +97,40 @@ def update(self, *args, **kwargs) -> None: result = response.json() super().update(**result) - def wait_for(self) -> None: + def wait_for(self, *, initial_wait: int = 1, max_wait: int = 10, backoff: float = 1.5) -> None: """Wait for the task to finish. + Parameters + ---------- + initial_wait : int, default 1 + Initial wait time in seconds. First API request will use this as the wait parameter. + max_wait : int, default 10 + Maximum wait time in seconds between polling requests. + backoff : float, default 1.5 + Backoff multiplier for increasing wait times. + Examples -------- >>> task.wait_for() None + + Notes + ----- + This method implements an exponential backoff strategy to reduce the number of API calls + while waiting for long-running tasks. The first request uses the initial_wait value, + and subsequent requests increase the wait time by the backoff factor, up to max_wait. To disable exponential backoff, set backoff to 1.0. """ + wait_time = initial_wait + while not self.is_finished: self.update() + # Wait client-side + time.sleep(wait_time) + + # Calculate next wait time with backoff + wait_time = min(wait_time * backoff, max_wait) + class Tasks(resources.Resources): @overload diff --git a/tests/posit/connect/test_tasks.py b/tests/posit/connect/test_tasks.py index 5de1d2cf..27bc4e2a 100644 --- a/tests/posit/connect/test_tasks.py +++ b/tests/posit/connect/test_tasks.py @@ -134,6 +134,84 @@ def test(self): assert mock_tasks_get[0].call_count == 1 assert mock_tasks_get[1].call_count == 1 + @responses.activate + @mock.patch("time.sleep", autospec=True) + def test_exponential_backoff(self, mock_sleep): + uid = "jXhOhdm5OOSkGhJw" + + # behavior + mock_tasks_get = [ + responses.get( + f"https://connect.example/__api__/v1/tasks/{uid}", + json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": False}, + ), + responses.get( + f"https://connect.example/__api__/v1/tasks/{uid}", + json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": False}, + ), + responses.get( + f"https://connect.example/__api__/v1/tasks/{uid}", + json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": False}, + ), + responses.get( + f"https://connect.example/__api__/v1/tasks/{uid}", + json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": True}, + ), + ] + + # setup + c = connect.Client("https://connect.example", "12345") + task = c.tasks.get(uid) + assert not task.is_finished + + # invoke + task.wait_for(initial_wait=1, max_wait=5, backoff=2.0) + + # assert + assert task.is_finished + assert mock_tasks_get[0].call_count == 1 + assert mock_tasks_get[1].call_count == 1 + + # Verify sleep calls + mock_sleep.assert_has_calls([mock.call(1), mock.call(2), mock.call(4)], any_order=False) + + @responses.activate + @mock.patch("time.sleep", autospec=True) + def test_no_backoff(self, mock_sleep): + uid = "jXhOhdm5OOSkGhJw" + + # behavior + mock_tasks_get = [ + responses.get( + f"https://connect.example/__api__/v1/tasks/{uid}", + json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": False}, + ), + responses.get( + f"https://connect.example/__api__/v1/tasks/{uid}", + json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": False}, + ), + responses.get( + f"https://connect.example/__api__/v1/tasks/{uid}", + json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": True}, + ), + ] + + # setup + c = connect.Client("https://connect.example", "12345") + task = c.tasks.get(uid) + assert not task.is_finished + + # invoke + task.wait_for(initial_wait=2, max_wait=5, backoff=1.0) + + # assert + assert task.is_finished + assert mock_tasks_get[0].call_count == 1 + assert mock_tasks_get[1].call_count == 1 + + # Verify sleep calls + mock_sleep.assert_has_calls([mock.call(2), mock.call(2)], any_order=False) + class TestTasksGet: @responses.activate