Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions benchmark/oairequester.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
REQUEST_ID_HEADER = "apim-request-id"
UTILIZATION_HEADER = "azure-openai-deployment-utilization"
RETRY_AFTER_MS_HEADER = "retry-after-ms"
RETRY_AFTER_S_HEADER = "retry-after"
MAX_RETRY_SECONDS = 60.0

TELEMETRY_USER_AGENT_HEADER = "x-ms-useragent"
Expand Down Expand Up @@ -92,19 +93,15 @@ async def _call(self, session:aiohttp.ClientSession, body: dict, stats: RequestS
response = await session.post(self.url, headers=headers, json=body)
stats.response_status_code = response.status
# capture utilization in all cases, if found
self._read_utilization(response, stats)
self._read_utilization_header(response, stats)
if response.status != 429:
break
if self.backoff and RETRY_AFTER_MS_HEADER in response.headers:
try:
retry_after_str = response.headers[RETRY_AFTER_MS_HEADER]
retry_after_ms = float(retry_after_str)
logging.debug(f"retry-after sleeping for {retry_after_ms}ms")
await asyncio.sleep(retry_after_ms/1000.0)
except ValueError as e:
logging.warning(f"unable to parse retry-after header value: {UTILIZATION_HEADER}={retry_after_str}: {e}")
# fallback to backoff
break

retry_after_ms = self._read_retry_after_headers(response)

if self.backoff and retry_after_ms:
logging.debug(f"retry-after sleeping for {retry_after_ms}ms")
await asyncio.sleep(retry_after_ms/1000.0)
else:
# fallback to backoff
break
Expand All @@ -129,7 +126,7 @@ async def _handle_response(self, response: aiohttp.ClientResponse, stats: Reques
stats.generated_tokens += 1
stats.response_end_time = time.time()

def _read_utilization(self, response: aiohttp.ClientResponse, stats: RequestStats):
def _read_utilization_header(self, response: aiohttp.ClientResponse, stats: RequestStats):
if UTILIZATION_HEADER in response.headers:
util_str = response.headers[UTILIZATION_HEADER]
if len(util_str) == 0:
Expand All @@ -141,5 +138,23 @@ def _read_utilization(self, response: aiohttp.ClientResponse, stats: RequestStat
stats.deployment_utilization = float(util_str[:-1])
except ValueError as e:
logging.warning(f"unable to parse utilization header value: {UTILIZATION_HEADER}={util_str}: {e}")


def _read_retry_after_headers(self, response: aiohttp.ClientResponse):
retry_after_ms = None
if RETRY_AFTER_MS_HEADER in response.headers:
try:
retry_after_str = response.headers[RETRY_AFTER_MS_HEADER]
retry_after_ms = float(retry_after_str)
except ValueError as e:
logging.warning(f"unable to parse {RETRY_AFTER_MS_HEADER} header value: {RETRY_AFTER_MS_HEADER}={retry_after_str}: {e}")

elif RETRY_AFTER_S_HEADER in response.headers:
try:
retry_after_str = response.headers[RETRY_AFTER_S_HEADER]
retry_after_s = float(retry_after_str)
retry_after_ms = retry_after_s * 1000
except ValueError as e:
logging.warning(f"unable to parse {RETRY_AFTER_S_HEADER} header value: {RETRY_AFTER_S_HEADER}={retry_after_str}: {e}")

return retry_after_ms