From e657f4b4ea48c83d96764417fe1f2709f3982e6d Mon Sep 17 00:00:00 2001 From: tdstein Date: Tue, 6 May 2025 17:35:11 -0400 Subject: [PATCH 1/3] feat: improve task polling Implements an improved task polling mechanism with configurable wait time. This helps to minimize resource consumption on the Connect server while still providing responsive feedback. --- src/posit/connect/tasks.py | 26 ++++----------------- tests/posit/connect/test_tasks.py | 39 +++++++------------------------ 2 files changed, 12 insertions(+), 53 deletions(-) diff --git a/src/posit/connect/tasks.py b/src/posit/connect/tasks.py index 5b96e28d..aaa6f0fd 100644 --- a/src/posit/connect/tasks.py +++ b/src/posit/connect/tasks.py @@ -97,39 +97,21 @@ def update(self, *args, **kwargs) -> None: result = response.json() super().update(**result) - def wait_for(self, *, initial_wait: int = 1, max_wait: int = 10, backoff: float = 1.5) -> None: + def wait_for(self, *, wait: int = 1) -> None: """Wait for the task to finish. Parameters ---------- - initial_wait : int, default 1 - Initial wait time in seconds. First API request will use this as the wait parameter. - max_wait : int, default 10 - Maximum wait time in seconds between polling requests. - backoff : float, default 1.5 - Backoff multiplier for increasing wait times. + wait : int, default 1 + Wait time in seconds between polling requests. Examples -------- >>> task.wait_for() None - - Notes - ----- - This method implements an exponential backoff strategy to reduce the number of API calls - while waiting for long-running tasks. The first request uses the initial_wait value, - and subsequent requests increase the wait time by the backoff factor, up to max_wait. To disable exponential backoff, set backoff to 1.0. """ - wait_time = initial_wait - while not self.is_finished: - self.update() - - # Wait client-side - time.sleep(wait_time) - - # Calculate next wait time with backoff - wait_time = min(wait_time * backoff, max_wait) + self.update(wait=wait) class Tasks(resources.Resources): diff --git a/tests/posit/connect/test_tasks.py b/tests/posit/connect/test_tasks.py index 27bc4e2a..cb2c41fa 100644 --- a/tests/posit/connect/test_tasks.py +++ b/tests/posit/connect/test_tasks.py @@ -118,6 +118,7 @@ def test(self): responses.get( f"https://connect.example/__api__/v1/tasks/{uid}", json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": True}, + match=[matchers.query_param_matcher({"wait": 1})], ), ] @@ -127,7 +128,7 @@ def test(self): assert not task.is_finished # invoke - task.wait_for() + task.wait_for(wait=1) # assert assert task.is_finished @@ -135,20 +136,11 @@ def test(self): assert mock_tasks_get[1].call_count == 1 @responses.activate - @mock.patch("time.sleep", autospec=True) - def test_exponential_backoff(self, mock_sleep): + def test_with_custom_wait(self): uid = "jXhOhdm5OOSkGhJw" # behavior mock_tasks_get = [ - responses.get( - f"https://connect.example/__api__/v1/tasks/{uid}", - json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": False}, - ), - responses.get( - f"https://connect.example/__api__/v1/tasks/{uid}", - json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": False}, - ), responses.get( f"https://connect.example/__api__/v1/tasks/{uid}", json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": False}, @@ -156,6 +148,7 @@ def test_exponential_backoff(self, mock_sleep): responses.get( f"https://connect.example/__api__/v1/tasks/{uid}", json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": True}, + match=[matchers.query_param_matcher({"wait": 5})], ), ] @@ -165,31 +158,19 @@ def test_exponential_backoff(self, mock_sleep): assert not task.is_finished # invoke - task.wait_for(initial_wait=1, max_wait=5, backoff=2.0) + task.wait_for(wait=5) # assert assert task.is_finished assert mock_tasks_get[0].call_count == 1 assert mock_tasks_get[1].call_count == 1 - # Verify sleep calls - mock_sleep.assert_has_calls([mock.call(1), mock.call(2), mock.call(4)], any_order=False) - @responses.activate - @mock.patch("time.sleep", autospec=True) - def test_no_backoff(self, mock_sleep): + def test_immediate_completion(self): uid = "jXhOhdm5OOSkGhJw" # behavior mock_tasks_get = [ - responses.get( - f"https://connect.example/__api__/v1/tasks/{uid}", - json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": False}, - ), - responses.get( - f"https://connect.example/__api__/v1/tasks/{uid}", - json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": False}, - ), responses.get( f"https://connect.example/__api__/v1/tasks/{uid}", json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": True}, @@ -199,18 +180,14 @@ def test_no_backoff(self, mock_sleep): # setup c = connect.Client("https://connect.example", "12345") task = c.tasks.get(uid) - assert not task.is_finished + assert task.is_finished # invoke - task.wait_for(initial_wait=2, max_wait=5, backoff=1.0) + task.wait_for(wait=1) # assert assert task.is_finished assert mock_tasks_get[0].call_count == 1 - assert mock_tasks_get[1].call_count == 1 - - # Verify sleep calls - mock_sleep.assert_has_calls([mock.call(2), mock.call(2)], any_order=False) class TestTasksGet: From bdf0399e80f8563618df9e6f9373897dfc9d670c Mon Sep 17 00:00:00 2001 From: tdstein Date: Tue, 6 May 2025 17:38:19 -0400 Subject: [PATCH 2/3] chore: fix linting errors --- src/posit/connect/tasks.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/posit/connect/tasks.py b/src/posit/connect/tasks.py index aaa6f0fd..684ddaff 100644 --- a/src/posit/connect/tasks.py +++ b/src/posit/connect/tasks.py @@ -2,8 +2,6 @@ from __future__ import annotations -import time - from typing_extensions import overload from . import resources From c3c26b12014f27a2708fda7f4c09e87910d5749e Mon Sep 17 00:00:00 2001 From: tdstein Date: Wed, 7 May 2025 18:24:17 -0400 Subject: [PATCH 3/3] feat: adds maximum attempts parameter to wait_for --- src/posit/connect/tasks.py | 36 +++++++++++++++++++++++++++++-- tests/posit/connect/test_tasks.py | 26 ++++++++++++++++++++++ 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/src/posit/connect/tasks.py b/src/posit/connect/tasks.py index 684ddaff..417c2464 100644 --- a/src/posit/connect/tasks.py +++ b/src/posit/connect/tasks.py @@ -95,21 +95,53 @@ def update(self, *args, **kwargs) -> None: result = response.json() super().update(**result) - def wait_for(self, *, wait: int = 1) -> None: + def wait_for(self, *, wait: int = 1, max_attempts: int | None = None) -> None: """Wait for the task to finish. Parameters ---------- wait : int, default 1 - Wait time in seconds between polling requests. + Maximum wait time in seconds between polling requests. + max_attempts : int | None, default None + Maximum number of polling attempts. If None, polling will continue indefinitely. + + Raises + ------ + TimeoutError + If the task does not finish within the maximum attempts. + + Notes + ----- + If the task finishes before the wait time or maximum attempts are reached, the function will return immediately. For example, if the wait time is set to 5 seconds and the task finishes in 2 seconds, the function will return after 2 seconds. + + If the task does not finished after the maximum attempts, a TimeoutError will be raised. By default, the maximum attempts is None, which means the function will wait indefinitely until the task finishes. Examples -------- >>> task.wait_for() None + + Waiting for a task to finish with a custom wait time. + + >>> task.wait_for(wait=5) + None + + Waiting for a task with a maximum number of attempts. + + >>> task.wait_for(max_attempts=3) + None """ + attempts = 0 while not self.is_finished: + if max_attempts is not None and attempts >= max_attempts: + break self.update(wait=wait) + attempts += 1 + + if not self.is_finished: + raise TimeoutError( + f"Task {self['id']} did not finish within the specified wait time or maximum attempts." + ) class Tasks(resources.Resources): diff --git a/tests/posit/connect/test_tasks.py b/tests/posit/connect/test_tasks.py index cb2c41fa..b5d5593e 100644 --- a/tests/posit/connect/test_tasks.py +++ b/tests/posit/connect/test_tasks.py @@ -1,5 +1,6 @@ from unittest import mock +import pytest import responses from responses import BaseResponse, matchers @@ -189,6 +190,31 @@ def test_immediate_completion(self): assert task.is_finished assert mock_tasks_get[0].call_count == 1 + @responses.activate + def test_maximum_attempts(self): + uid = "jXhOhdm5OOSkGhJw" + + # behavior + mock_tasks_get = [ + responses.get( + f"https://connect.example/__api__/v1/tasks/{uid}", + json={**load_mock_dict(f"v1/tasks/{uid}.json"), "finished": False}, + ), + ] + + # setup + c = connect.Client("https://connect.example", "12345") + task = c.tasks.get(uid) + assert not task.is_finished + + # invoke and assert + with pytest.raises(TimeoutError): + task.wait_for(wait=1, max_attempts=1) + + # assert + assert not task.is_finished + assert mock_tasks_get[0].call_count == 2 # 1 for initial check, 1 for timeout check + class TestTasksGet: @responses.activate