From 60779b738623f757baa1d57f966042f3f86e037a Mon Sep 17 00:00:00 2001 From: sushaanttb Date: Wed, 28 Feb 2024 19:00:42 +0530 Subject: [PATCH] Added handling considering retry-after header --- benchmark/oairequester.py | 41 ++++++++++++++++++++++++++------------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/benchmark/oairequester.py b/benchmark/oairequester.py index b480a61f..859e5ca6 100644 --- a/benchmark/oairequester.py +++ b/benchmark/oairequester.py @@ -14,6 +14,7 @@ REQUEST_ID_HEADER = "apim-request-id" UTILIZATION_HEADER = "azure-openai-deployment-utilization" RETRY_AFTER_MS_HEADER = "retry-after-ms" +RETRY_AFTER_S_HEADER = "retry-after" MAX_RETRY_SECONDS = 60.0 TELEMETRY_USER_AGENT_HEADER = "x-ms-useragent" @@ -92,19 +93,15 @@ async def _call(self, session:aiohttp.ClientSession, body: dict, stats: RequestS response = await session.post(self.url, headers=headers, json=body) stats.response_status_code = response.status # capture utilization in all cases, if found - self._read_utilization(response, stats) + self._read_utilization_header(response, stats) if response.status != 429: break - if self.backoff and RETRY_AFTER_MS_HEADER in response.headers: - try: - retry_after_str = response.headers[RETRY_AFTER_MS_HEADER] - retry_after_ms = float(retry_after_str) - logging.debug(f"retry-after sleeping for {retry_after_ms}ms") - await asyncio.sleep(retry_after_ms/1000.0) - except ValueError as e: - logging.warning(f"unable to parse retry-after header value: {UTILIZATION_HEADER}={retry_after_str}: {e}") - # fallback to backoff - break + + retry_after_ms = self._read_retry_after_headers(response) + + if self.backoff and retry_after_ms: + logging.debug(f"retry-after sleeping for {retry_after_ms}ms") + await asyncio.sleep(retry_after_ms/1000.0) else: # fallback to backoff break @@ -129,7 +126,7 @@ async def _handle_response(self, response: aiohttp.ClientResponse, stats: Reques stats.generated_tokens += 1 stats.response_end_time = time.time() - def _read_utilization(self, response: aiohttp.ClientResponse, stats: RequestStats): + def _read_utilization_header(self, response: aiohttp.ClientResponse, stats: RequestStats): if UTILIZATION_HEADER in response.headers: util_str = response.headers[UTILIZATION_HEADER] if len(util_str) == 0: @@ -141,5 +138,23 @@ def _read_utilization(self, response: aiohttp.ClientResponse, stats: RequestStat stats.deployment_utilization = float(util_str[:-1]) except ValueError as e: logging.warning(f"unable to parse utilization header value: {UTILIZATION_HEADER}={util_str}: {e}") - + + def _read_retry_after_headers(self, response: aiohttp.ClientResponse): + retry_after_ms = None + if RETRY_AFTER_MS_HEADER in response.headers: + try: + retry_after_str = response.headers[RETRY_AFTER_MS_HEADER] + retry_after_ms = float(retry_after_str) + except ValueError as e: + logging.warning(f"unable to parse {RETRY_AFTER_MS_HEADER} header value: {RETRY_AFTER_MS_HEADER}={retry_after_str}: {e}") + + elif RETRY_AFTER_S_HEADER in response.headers: + try: + retry_after_str = response.headers[RETRY_AFTER_S_HEADER] + retry_after_s = float(retry_after_str) + retry_after_ms = retry_after_s * 1000 + except ValueError as e: + logging.warning(f"unable to parse {RETRY_AFTER_S_HEADER} header value: {RETRY_AFTER_S_HEADER}={retry_after_str}: {e}") + + return retry_after_ms