diff --git a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/utils.py b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/utils.py index 54ce157c97..f0dbf440c8 100644 --- a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/utils.py +++ b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/utils.py @@ -113,6 +113,42 @@ def wrapper(wrapped, instance, args, kwargs): return _with_chat_telemetry +def _with_responses_telemetry_wrapper(func): + """ + Decorator that injects telemetry parameters for OpenAI Responses API instrumentation. + + Provides tracer, metrics counters, and histograms for monitoring responses operations + including token usage, duration, exceptions, and streaming performance metrics. + """ + def _with_responses_telemetry( + tracer, + token_counter, + choice_counter, + duration_histogram, + exception_counter, + streaming_time_to_first_token, + streaming_time_to_generate, + ): + def wrapper(wrapped, instance, args, kwargs): + return func( + tracer, + token_counter, + choice_counter, + duration_histogram, + exception_counter, + streaming_time_to_first_token, + streaming_time_to_generate, + wrapped, + instance, + args, + kwargs, + ) + + return wrapper + + return _with_responses_telemetry + + def _with_tracer_wrapper(func): def _with_tracer(tracer): def wrapper(wrapped, instance, args, kwargs): diff --git a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/__init__.py b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/__init__.py index b93b06f3d8..3ee15b28ad 100644 --- a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/__init__.py +++ b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/__init__.py @@ -302,12 +302,28 @@ def _instrument(self, **kwargs): self._try_wrap( "openai.resources.responses", "Responses.create", - responses_get_or_create_wrapper(tracer), + responses_get_or_create_wrapper( + tracer, + tokens_histogram, + chat_choice_counter, + duration_histogram, + chat_exception_counter, + streaming_time_to_first_token, + streaming_time_to_generate, + ), ) self._try_wrap( "openai.resources.responses", "Responses.retrieve", - responses_get_or_create_wrapper(tracer), + responses_get_or_create_wrapper( + tracer, + tokens_histogram, + chat_choice_counter, + duration_histogram, + chat_exception_counter, + streaming_time_to_first_token, + streaming_time_to_generate, + ), ) self._try_wrap( "openai.resources.responses", @@ -317,18 +333,60 @@ def _instrument(self, **kwargs): self._try_wrap( "openai.resources.responses", "AsyncResponses.create", - async_responses_get_or_create_wrapper(tracer), + async_responses_get_or_create_wrapper( + tracer, + tokens_histogram, + chat_choice_counter, + duration_histogram, + chat_exception_counter, + streaming_time_to_first_token, + streaming_time_to_generate, + ), ) self._try_wrap( "openai.resources.responses", "AsyncResponses.retrieve", - async_responses_get_or_create_wrapper(tracer), + async_responses_get_or_create_wrapper( + tracer, + tokens_histogram, + chat_choice_counter, + duration_histogram, + chat_exception_counter, + streaming_time_to_first_token, + streaming_time_to_generate, + ), ) self._try_wrap( "openai.resources.responses", "AsyncResponses.cancel", async_responses_cancel_wrapper(tracer), ) + self._try_wrap( + "openai.resources.responses", + "Responses.parse", + responses_get_or_create_wrapper( + tracer, + tokens_histogram, + chat_choice_counter, + duration_histogram, + chat_exception_counter, + streaming_time_to_first_token, + streaming_time_to_generate, + ), + ) + self._try_wrap( + "openai.resources.responses", + "AsyncResponses.parse", + async_responses_get_or_create_wrapper( + tracer, + tokens_histogram, + chat_choice_counter, + duration_histogram, + chat_exception_counter, + streaming_time_to_first_token, + streaming_time_to_generate, + ), + ) def _uninstrument(self, **kwargs): unwrap("openai.resources.chat.completions", "Completions.create") @@ -354,5 +412,7 @@ def _uninstrument(self, **kwargs): unwrap("openai.resources.responses", "AsyncResponses.create") unwrap("openai.resources.responses", "AsyncResponses.retrieve") unwrap("openai.resources.responses", "AsyncResponses.cancel") + unwrap("openai.resources.responses", "Responses.parse") + unwrap("openai.resources.responses", "AsyncResponses.parse") except ImportError: pass diff --git a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py index 24113d1060..00d3cfcaf2 100644 --- a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py +++ b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py @@ -1,8 +1,11 @@ import json -import pydantic +import logging import re +import threading import time +import pydantic + from openai import AsyncStream, Stream # Conditional imports for backward compatibility @@ -35,31 +38,36 @@ ResponseOutputMessageParam = Dict[str, Any] RESPONSES_AVAILABLE = False -from openai._legacy_response import LegacyAPIResponse -from opentelemetry import context as context_api +from typing import Any, Optional, Union + from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY -from opentelemetry.semconv_ai import SpanAttributes -from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import ( GEN_AI_COMPLETION, GEN_AI_PROMPT, - GEN_AI_USAGE_INPUT_TOKENS, - GEN_AI_USAGE_OUTPUT_TOKENS, - GEN_AI_RESPONSE_ID, GEN_AI_REQUEST_MODEL, + GEN_AI_RESPONSE_ID, GEN_AI_RESPONSE_MODEL, GEN_AI_SYSTEM, + GEN_AI_USAGE_INPUT_TOKENS, + GEN_AI_USAGE_OUTPUT_TOKENS, ) -from opentelemetry.trace import SpanKind, Span, StatusCode, Tracer -from typing import Any, Optional, Union +from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE +from opentelemetry.semconv_ai import SpanAttributes +from opentelemetry.trace import Span, SpanKind, Status, StatusCode, Tracer from typing_extensions import NotRequired +from wrapt import ObjectProxy +from openai._legacy_response import LegacyAPIResponse +from opentelemetry import context as context_api +from opentelemetry import trace from opentelemetry.instrumentation.openai.shared import ( + _get_openai_base_url, _set_span_attribute, + metric_shared_attributes, model_as_dict, ) - from opentelemetry.instrumentation.openai.utils import ( + _with_responses_telemetry_wrapper, _with_tracer_wrapper, dont_throw, should_send_prompts, @@ -67,6 +75,8 @@ SPAN_NAME = "openai.response" +logger = logging.getLogger(__name__) + def prepare_input_param(input_param: ResponseInputItemParam) -> ResponseInputItemParam: """ @@ -88,6 +98,10 @@ def prepare_input_param(input_param: ResponseInputItemParam) -> ResponseInputIte def process_input(inp: ResponseInputParam) -> ResponseInputParam: + """ + Process input parameters for OpenAI Responses API. + Ensures list inputs have proper type annotations for each item. + """ if not isinstance(inp, list): return inp return [prepare_input_param(item) for item in inp] @@ -115,7 +129,7 @@ class ResponseOutputMessageParamWithoutId(ResponseOutputMessageParam): class TracedData(pydantic.BaseModel): - start_time: float # time.time_ns() + start_time: int # nanoseconds since Unix epoch for span creation response_id: str # actually Union[str, list[Union[ResponseInputItemParam, ResponseOutputMessageParamWithoutId]]], # but this only works properly in Python 3.10+ / newer pydantic @@ -142,12 +156,20 @@ class TracedData(pydantic.BaseModel): def parse_response(response: Union[LegacyAPIResponse, Response]) -> Response: + """ + Parse OpenAI response objects. + Handles both legacy and modern response formats. + """ if isinstance(response, LegacyAPIResponse): return response.parse() return response def get_tools_from_kwargs(kwargs: dict) -> list[ToolParam]: + """ + Extract tool parameters from request kwargs. + Converts function tool definitions to proper type instances. + """ tools_input = kwargs.get("tools", []) tools = [] @@ -164,6 +186,10 @@ def get_tools_from_kwargs(kwargs: dict) -> list[ToolParam]: def process_content_block( block: dict[str, Any], ) -> dict[str, Any]: + """ + Process content blocks from response output. + Normalizes different content types to a standard format. + """ # TODO: keep the original type once backend supports it if block.get("type") in ["text", "input_text", "output_text"]: return {"type": "text", "text": block.get("text")} @@ -184,8 +210,338 @@ def process_content_block( return block +class ResponseStreamBase: + """Base class to ensure async iteration methods are visible""" + def __aiter__(self): + """Return self as the async iterator.""" + return self + + async def __anext__(self): + """Async iteration - must be overridden""" + raise NotImplementedError + + +class ResponseStream(ResponseStreamBase, ObjectProxy): + """ + Stream wrapper for OpenAI Responses API streaming responses. + Handles span lifecycle and data accumulation for streaming responses. + Aligned with ChatStream pattern for consistency. + """ + _span = None + _instance = None + _token_counter = None + _choice_counter = None + _duration_histogram = None + _streaming_time_to_first_token = None + _streaming_time_to_generate = None + _start_time = None + _request_kwargs = None + _traced_data = None + + def __init__( + self, + span, + response, + traced_data, + instance=None, + token_counter=None, + choice_counter=None, + duration_histogram=None, + streaming_time_to_first_token=None, + streaming_time_to_generate=None, + start_time=None, + request_kwargs=None, + ): + super().__init__(response) + self._span = span + self._instance = instance + self._traced_data = traced_data + self._token_counter = token_counter + self._choice_counter = choice_counter + self._duration_histogram = duration_histogram + self._streaming_time_to_first_token = streaming_time_to_first_token + self._streaming_time_to_generate = streaming_time_to_generate + self._start_time = start_time + self._request_kwargs = request_kwargs or {} + + self._first_token = True + self._time_of_first_token = self._start_time + self._cleanup_lock = threading.Lock() + self._cleanup_completed = False + self._error_recorded = threading.Event() + self._text_deltas = [] + + if self._traced_data.response_id: + responses[self._traced_data.response_id] = self._traced_data + + def __del__(self): + """Cleanup when garbage collected.""" + self._ensure_cleanup() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + # Call the wrapped stream's __exit__ + result = self.__wrapped__.__exit__(exc_type, exc_val, exc_tb) + + # Perform cleanup + try: + self._ensure_cleanup() + except Exception as cleanup_exception: + logger.debug( + "Error during ResponseStream cleanup in __exit__: %s", cleanup_exception + ) + + return result + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + result = await self.__wrapped__.__aexit__(exc_type, exc_val, exc_tb) + + # Perform cleanup + try: + self._ensure_cleanup() + except Exception as cleanup_exception: + logger.debug( + "Error during ResponseStream cleanup in __aexit__: %s", cleanup_exception + ) + + return result + + def __iter__(self): + return self + + def __next__(self): + try: + chunk = self.__wrapped__.__next__() + except Exception as e: + if isinstance(e, StopIteration): + self._process_complete_response() + else: + if self._span and self._span.is_recording(): + self._span.record_exception(e) + self._span.set_status(Status(StatusCode.ERROR, str(e))) + self._error_recorded.set() + self._ensure_cleanup() + raise + else: + self._process_chunk(chunk) + return chunk + + async def __anext__(self): + try: + chunk = await self.__wrapped__.__anext__() + except Exception as e: + if isinstance(e, StopAsyncIteration): + self._process_complete_response() + else: + if self._span and self._span.is_recording(): + self._span.record_exception(e) + self._span.set_status(Status(StatusCode.ERROR, str(e))) + self._error_recorded.set() + self._ensure_cleanup() + raise + else: + self._process_chunk(chunk) + return chunk + + def _process_chunk(self, chunk): + """ + Process a streaming chunk and update TracedData. + Accumulates response data and records streaming metrics. + """ + if self._span and self._span.is_recording(): + self._span.add_event(name=f"{SpanAttributes.LLM_CONTENT_COMPLETION_CHUNK}") + + if self._first_token and self._streaming_time_to_first_token: + self._time_of_first_token = time.time() + self._streaming_time_to_first_token.record( + self._time_of_first_token - self._start_time, + attributes=self._shared_attributes() + ) + self._first_token = False + + try: + # Chunks are already event objects, not Response objects + event_type = type(chunk).__name__ + + # Handle different event types + if event_type == "ResponseCreatedEvent": + # This event should have the response_id + if hasattr(chunk, 'response') and hasattr(chunk.response, 'id'): + self._traced_data.response_id = chunk.response.id + responses[chunk.response.id] = self._traced_data + + elif event_type == "ResponseTextDeltaEvent": + if delta := getattr(chunk, 'delta', None): + self._text_deltas.append(delta) + + elif event_type == "ResponseOutputItemAddedEvent": + # New output item added + if hasattr(chunk, 'output_item'): + if self._traced_data.output_blocks is None: + self._traced_data.output_blocks = {} + if hasattr(chunk.output_item, 'id'): + self._traced_data.output_blocks[chunk.output_item.id] = chunk.output_item + + elif event_type == "ResponseInProgressEvent": + # Status update - might contain usage or other metadata + if hasattr(chunk, 'response'): + if hasattr(chunk.response, 'usage'): + self._traced_data.usage = chunk.response.usage + if hasattr(chunk.response, 'model'): + self._traced_data.response_model = chunk.response.model + + elif event_type == "ResponseCompletedEvent": + # Final event with complete response data + if hasattr(chunk, 'response'): + if hasattr(chunk.response, 'usage'): + self._traced_data.usage = chunk.response.usage + if hasattr(chunk.response, 'output_text'): + # Override with final complete text if available + self._traced_data.output_text = chunk.response.output_text + + elif event_type == "ResponseTextDoneEvent": + if text := getattr(chunk, 'text', None): + self._traced_data.output_text = text + + elif event_type == "ResponseContentPartDoneEvent": + if (hasattr(chunk, 'part') and hasattr(chunk.part, 'text') and + (self._traced_data.output_text is None or self._traced_data.output_text == "")): + self._traced_data.output_text = chunk.part.text + + elif event_type == "ResponseOutputItemDoneEvent": + if hasattr(chunk, 'item') and self._traced_data.output_blocks is None: + self._traced_data.output_blocks = {} + if hasattr(chunk, 'item') and hasattr(chunk.item, 'id'): + self._traced_data.output_blocks[chunk.item.id] = chunk.item + if hasattr(chunk, 'item') and hasattr(chunk.item, 'content'): + for content_item in chunk.item.content: + if (hasattr(content_item, 'text') and content_item.text and + (self._traced_data.output_text is None or self._traced_data.output_text == "")): + self._traced_data.output_text = content_item.text + + # Update global dict with latest data + if self._traced_data.response_id: + responses[self._traced_data.response_id] = self._traced_data + + except Exception as e: + logger.debug("Error processing response chunk: %s", e) + + def _shared_attributes(self): + """Get shared attributes for metrics.""" + return metric_shared_attributes( + response_model=self._traced_data.response_model + or self._traced_data.request_model + or "", + operation="response", + server_address=_get_openai_base_url(self._instance), + is_streaming=True, + ) + + @dont_throw + def _process_complete_response(self): + """ + Process the complete streaming response. + Sets final span attributes, records metrics, and ends the span. + """ + if self._text_deltas and not self._traced_data.output_text: + self._traced_data.output_text = "".join(self._text_deltas) + + if self._span and self._span.is_recording(): + set_data_attributes(self._traced_data, self._span) + + if self._token_counter and self._traced_data.usage: + usage = self._traced_data.usage + shared_attrs = self._shared_attributes() + if hasattr(usage, 'input_tokens') and usage.input_tokens: + attributes_with_token_type = { + **shared_attrs, + SpanAttributes.LLM_TOKEN_TYPE: "input", + } + self._token_counter.record(usage.input_tokens, attributes=attributes_with_token_type) + if hasattr(usage, 'output_tokens') and usage.output_tokens: + attributes_with_token_type = { + **shared_attrs, + SpanAttributes.LLM_TOKEN_TYPE: "output", + } + self._token_counter.record(usage.output_tokens, attributes=attributes_with_token_type) + + if self._choice_counter and self._traced_data.output_blocks: + shared_attrs = self._shared_attributes() + num_blocks = len(self._traced_data.output_blocks) + if num_blocks > 0: + self._choice_counter.add(num_blocks, attributes=shared_attrs) + + if self._duration_histogram and self._start_time: + duration = time.time() - self._start_time + self._duration_histogram.record(duration, attributes=self._shared_attributes()) + + if self._streaming_time_to_generate and self._time_of_first_token: + self._streaming_time_to_generate.record( + time.time() - self._time_of_first_token, + attributes=self._shared_attributes() + ) + + if self._span and self._span.is_recording(): + if not self._error_recorded.is_set(): + self._span.set_status(Status(StatusCode.OK)) + self._span.end() + + if self._traced_data.response_id and self._traced_data.response_id in responses: + del responses[self._traced_data.response_id] + + self._cleanup_completed = True + logger.debug("ResponseStream span closed successfully") + + @dont_throw + def _ensure_cleanup(self): + """ + Ensure proper cleanup of streaming response. + Thread-safe cleanup to prevent double-closing of spans. + """ + with self._cleanup_lock: + if self._cleanup_completed: + logger.debug("ResponseStream cleanup already completed, skipping") + return + + try: + logger.debug("Starting ResponseStream cleanup") + + if self._span and self._span.is_recording(): + if not self._error_recorded.is_set(): + self._span.set_status(Status(StatusCode.OK)) + self._span.end() + logger.debug("ResponseStream span closed in cleanup") + + if self._traced_data.response_id and self._traced_data.response_id in responses: + del responses[self._traced_data.response_id] + + self._cleanup_completed = True + logger.debug("ResponseStream cleanup completed successfully") + + except Exception as e: + logger.debug("Error during ResponseStream cleanup: %s", str(e)) + + try: + if self._span and self._span.is_recording(): + if not self._error_recorded.is_set(): + self._span.set_status(Status(StatusCode.ERROR, "Cleanup failed")) + self._span.end() + self._cleanup_completed = True + except Exception: + self._cleanup_completed = True + + @dont_throw def set_data_attributes(traced_response: TracedData, span: Span): + """ + Set OpenTelemetry span attributes from traced response data. + Includes model info, usage stats, prompts, and completions. + """ _set_span_attribute(span, GEN_AI_SYSTEM, "openai") _set_span_attribute(span, GEN_AI_REQUEST_MODEL, traced_response.request_model) _set_span_attribute(span, GEN_AI_RESPONSE_ID, traced_response.response_id) @@ -283,7 +639,7 @@ def set_data_attributes(traced_response: TracedData, span: Span): ) _set_span_attribute(span, f"{GEN_AI_PROMPT}.{prompt_index}.role", "user") prompt_index += 1 - else: + elif traced_response.input: for block in traced_response.input: block_dict = model_as_dict(block) if block_dict.get("type", "message") == "message": @@ -346,6 +702,18 @@ def set_data_attributes(traced_response: TracedData, span: Span): json.dumps(call_content), ) prompt_index += 1 + elif block_dict.get("type") == "function_call_output": + _set_span_attribute( + span, f"{GEN_AI_PROMPT}.{prompt_index}.role", "tool" + ) + output_content = block_dict.get("output", "") + call_id = block_dict.get("call_id", "") + _set_span_attribute( + span, + f"{GEN_AI_PROMPT}.{prompt_index}.content", + json.dumps({"call_id": call_id, "output": output_content}), + ) + prompt_index += 1 # TODO: handle other block types _set_span_attribute(span, f"{GEN_AI_COMPLETION}.0.role", "assistant") @@ -353,120 +721,159 @@ def set_data_attributes(traced_response: TracedData, span: Span): _set_span_attribute( span, f"{GEN_AI_COMPLETION}.0.content", traced_response.output_text ) + else: + logger.debug("No output_text to set as completion content") tool_call_index = 0 - for block in traced_response.output_blocks.values(): - block_dict = model_as_dict(block) - if block_dict.get("type") == "message": - # either a refusal or handled in output_text above - continue - if block_dict.get("type") == "function_call": - _set_span_attribute( - span, - f"{GEN_AI_COMPLETION}.0.tool_calls.{tool_call_index}.id", - block_dict.get("id"), - ) - _set_span_attribute( - span, - f"{GEN_AI_COMPLETION}.0.tool_calls.{tool_call_index}.name", - block_dict.get("name"), - ) - _set_span_attribute( - span, - f"{GEN_AI_COMPLETION}.0.tool_calls.{tool_call_index}.arguments", - block_dict.get("arguments"), - ) - tool_call_index += 1 - elif block_dict.get("type") == "file_search_call": - _set_span_attribute( - span, - f"{GEN_AI_COMPLETION}.0.tool_calls.{tool_call_index}.id", - block_dict.get("id"), - ) - _set_span_attribute( - span, - f"{GEN_AI_COMPLETION}.0.tool_calls.{tool_call_index}.name", - "file_search_call", - ) - tool_call_index += 1 - elif block_dict.get("type") == "web_search_call": - _set_span_attribute( - span, - f"{GEN_AI_COMPLETION}.0.tool_calls.{tool_call_index}.id", - block_dict.get("id"), - ) - _set_span_attribute( - span, - f"{GEN_AI_COMPLETION}.0.tool_calls.{tool_call_index}.name", - "web_search_call", - ) - tool_call_index += 1 - elif block_dict.get("type") == "computer_call": - _set_span_attribute( - span, - f"{GEN_AI_COMPLETION}.0.tool_calls.{tool_call_index}.id", - block_dict.get("call_id"), - ) - _set_span_attribute( - span, - f"{GEN_AI_COMPLETION}.0.tool_calls.{tool_call_index}.name", - "computer_call", - ) - _set_span_attribute( - span, - f"{GEN_AI_COMPLETION}.0.tool_calls.{tool_call_index}.arguments", - json.dumps(block_dict.get("action")), - ) - tool_call_index += 1 - elif block_dict.get("type") == "reasoning": - reasoning_summary = block_dict.get("summary") - if reasoning_summary is not None and reasoning_summary != []: - if isinstance(reasoning_summary, (dict, list)): - reasoning_value = json.dumps(reasoning_summary) - else: - reasoning_value = reasoning_summary + if traced_response.output_blocks: + for block in traced_response.output_blocks.values(): + block_dict = model_as_dict(block) + if block_dict.get("type") == "message": + # either a refusal or handled in output_text above + continue + if block_dict.get("type") == "function_call": + _set_span_attribute( + span, + f"{GEN_AI_COMPLETION}.0.tool_calls.{tool_call_index}.id", + block_dict.get("id"), + ) + _set_span_attribute( + span, + f"{GEN_AI_COMPLETION}.0.tool_calls.{tool_call_index}.name", + block_dict.get("name"), + ) + _set_span_attribute( + span, + f"{GEN_AI_COMPLETION}.0.tool_calls.{tool_call_index}.arguments", + block_dict.get("arguments"), + ) + tool_call_index += 1 + elif block_dict.get("type") == "file_search_call": + _set_span_attribute( + span, + f"{GEN_AI_COMPLETION}.0.tool_calls.{tool_call_index}.id", + block_dict.get("id"), + ) + _set_span_attribute( + span, + f"{GEN_AI_COMPLETION}.0.tool_calls.{tool_call_index}.name", + "file_search_call", + ) + tool_call_index += 1 + elif block_dict.get("type") == "web_search_call": + _set_span_attribute( + span, + f"{GEN_AI_COMPLETION}.0.tool_calls.{tool_call_index}.id", + block_dict.get("id"), + ) + _set_span_attribute( + span, + f"{GEN_AI_COMPLETION}.0.tool_calls.{tool_call_index}.name", + "web_search_call", + ) + tool_call_index += 1 + elif block_dict.get("type") == "computer_call": + _set_span_attribute( + span, + f"{GEN_AI_COMPLETION}.0.tool_calls.{tool_call_index}.id", + block_dict.get("call_id"), + ) + _set_span_attribute( + span, + f"{GEN_AI_COMPLETION}.0.tool_calls.{tool_call_index}.name", + "computer_call", + ) _set_span_attribute( - span, f"{GEN_AI_COMPLETION}.0.reasoning", reasoning_value + span, + f"{GEN_AI_COMPLETION}.0.tool_calls.{tool_call_index}.arguments", + json.dumps(block_dict.get("action")), ) - # TODO: handle other block types, in particular other calls + tool_call_index += 1 + elif block_dict.get("type") == "reasoning": + reasoning_summary = block_dict.get("summary") + if reasoning_summary is not None and reasoning_summary != []: + if isinstance(reasoning_summary, (dict, list)): + reasoning_value = json.dumps(reasoning_summary) + else: + reasoning_value = reasoning_summary + _set_span_attribute( + span, f"{GEN_AI_COMPLETION}.0.reasoning", reasoning_value + ) + # TODO: handle other block types, in particular other calls @dont_throw -@_with_tracer_wrapper -def responses_get_or_create_wrapper(tracer: Tracer, wrapped, instance, args, kwargs): +@_with_responses_telemetry_wrapper +def responses_get_or_create_wrapper( + tracer: Tracer, + token_counter, + choice_counter, + duration_histogram, + exception_counter, + streaming_time_to_first_token, + streaming_time_to_generate, + wrapped, + instance, + args, + kwargs, +): + """ + Wrapper for OpenAI responses.create method. + Handles both streaming and non-streaming responses with full telemetry. + """ if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return wrapped(*args, **kwargs) - start_time = time.time_ns() - try: - response = wrapped(*args, **kwargs) - if isinstance(response, Stream): - return response - except Exception as e: - response_id = kwargs.get("response_id") - existing_data = {} - if response_id and response_id in responses: - existing_data = responses[response_id].model_dump() + span = tracer.start_span( + SPAN_NAME, + kind=SpanKind.CLIENT, + ) + + with trace.use_span(span, end_on_exit=False): try: + start_time = time.time() + response = wrapped(*args, **kwargs) + end_time = time.time() + except Exception as e: + end_time = time.time() + duration = end_time - start_time if "start_time" in locals() else 0 + + if duration > 0 and duration_histogram: + duration_histogram.record(duration, attributes={"error.type": e.__class__.__name__}) + if exception_counter: + exception_counter.add(1, attributes={"error.type": e.__class__.__name__}) + + span.set_attribute(ERROR_TYPE, e.__class__.__name__) + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + span.end() + + raise + + if isinstance(response, Stream): + response_id = kwargs.get("response_id") + existing_data = {} + if response_id and response_id in responses: + existing_data = responses[response_id].model_dump() + + input_data = process_input( + kwargs.get("input", existing_data.get("input", [])) + ) + traced_data = TracedData( - start_time=existing_data.get("start_time", start_time), + start_time=int(start_time * 1e9), # Convert seconds to nanoseconds response_id=response_id or "", - input=process_input( - kwargs.get("input", existing_data.get("input", [])) - ), + input=input_data, instructions=kwargs.get( "instructions", existing_data.get("instructions") ), tools=get_tools_from_kwargs(kwargs) or existing_data.get("tools", []), output_blocks=existing_data.get("output_blocks", {}), usage=existing_data.get("usage"), - output_text=kwargs.get( - "output_text", existing_data.get("output_text", "") - ), + output_text=existing_data.get("output_text", None), request_model=kwargs.get( "model", existing_data.get("request_model", "") ), response_model=existing_data.get("response_model", ""), - # Reasoning attributes request_reasoning_summary=( kwargs.get("reasoning", {}).get( "summary", existing_data.get("request_reasoning_summary") @@ -479,30 +886,24 @@ def responses_get_or_create_wrapper(tracer: Tracer, wrapped, instance, args, kwa ), response_reasoning_effort=kwargs.get("reasoning", {}).get("effort"), ) - except Exception: - traced_data = None - span = tracer.start_span( - SPAN_NAME, - kind=SpanKind.CLIENT, - start_time=( - start_time if traced_data is None else int(traced_data.start_time) - ), - ) - span.set_attribute(ERROR_TYPE, e.__class__.__name__) - span.record_exception(e) - span.set_status(StatusCode.ERROR, str(e)) - if traced_data: - set_data_attributes(traced_data, span) - span.end() - raise + return ResponseStream( + span, + response, + traced_data, + instance, + token_counter, + choice_counter, + duration_histogram, + streaming_time_to_first_token, + streaming_time_to_generate, + start_time, + kwargs, + ) parsed_response = parse_response(response) existing_data = responses.get(parsed_response.id) - if existing_data is None: - existing_data = {} - else: - existing_data = existing_data.model_dump() + existing_data = {} if existing_data is None else existing_data.model_dump() request_tools = get_tools_from_kwargs(kwargs) @@ -512,13 +913,14 @@ def responses_get_or_create_wrapper(tracer: Tracer, wrapped, instance, args, kwa parsed_response_output_text = None if hasattr(parsed_response, "output_text"): parsed_response_output_text = parsed_response.output_text - else: - try: - parsed_response_output_text = parsed_response.output[0].content[0].text - except Exception: - pass + elif hasattr(parsed_response, "output") and len(parsed_response.output) > 0: + first_output = parsed_response.output[0] + if hasattr(first_output, "content") and len(first_output.content) > 0: + first_content = first_output.content[0] + if hasattr(first_content, "text"): + parsed_response_output_text = first_content.text traced_data = TracedData( - start_time=existing_data.get("start_time", start_time), + start_time=existing_data.get("start_time", int(start_time * 1e9)), response_id=parsed_response.id, input=process_input(existing_data.get("input", kwargs.get("input"))), instructions=existing_data.get("instructions", kwargs.get("instructions")), @@ -547,11 +949,6 @@ def responses_get_or_create_wrapper(tracer: Tracer, wrapped, instance, args, kwa return response if parsed_response.status == "completed": - span = tracer.start_span( - SPAN_NAME, - kind=SpanKind.CLIENT, - start_time=int(traced_data.start_time), - ) set_data_attributes(traced_data, span) span.end() @@ -559,40 +956,76 @@ def responses_get_or_create_wrapper(tracer: Tracer, wrapped, instance, args, kwa @dont_throw -@_with_tracer_wrapper +@_with_responses_telemetry_wrapper async def async_responses_get_or_create_wrapper( - tracer: Tracer, wrapped, instance, args, kwargs + tracer: Tracer, + token_counter, + choice_counter, + duration_histogram, + exception_counter, + streaming_time_to_first_token, + streaming_time_to_generate, + wrapped, + instance, + args, + kwargs, ): + """ + Async wrapper for OpenAI responses.create method. + Handles both streaming and non-streaming async responses with full telemetry. + """ if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return await wrapped(*args, **kwargs) - start_time = time.time_ns() - try: - response = await wrapped(*args, **kwargs) - if isinstance(response, (Stream, AsyncStream)): - return response - except Exception as e: - response_id = kwargs.get("response_id") - existing_data = {} - if response_id and response_id in responses: - existing_data = responses[response_id].model_dump() + span = tracer.start_span( + SPAN_NAME, + kind=SpanKind.CLIENT, + ) + + with trace.use_span(span, end_on_exit=False): try: + start_time = time.time() + response = await wrapped(*args, **kwargs) + end_time = time.time() + except Exception as e: + end_time = time.time() + duration = end_time - start_time if "start_time" in locals() else 0 + + if duration > 0 and duration_histogram: + duration_histogram.record(duration, attributes={"error.type": e.__class__.__name__}) + if exception_counter: + exception_counter.add(1, attributes={"error.type": e.__class__.__name__}) + + span.set_attribute(ERROR_TYPE, e.__class__.__name__) + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + span.end() + + raise + + if isinstance(response, (Stream, AsyncStream)): + response_id = kwargs.get("response_id") + existing_data = {} + if response_id and response_id in responses: + existing_data = responses[response_id].model_dump() + + input_data = process_input( + kwargs.get("input", existing_data.get("input", [])) + ) + traced_data = TracedData( - start_time=existing_data.get("start_time", start_time), + start_time=int(start_time * 1e9), # Convert seconds to nanoseconds response_id=response_id or "", - input=process_input( - kwargs.get("input", existing_data.get("input", [])) - ), + input=input_data, instructions=kwargs.get( "instructions", existing_data.get("instructions", "") ), tools=get_tools_from_kwargs(kwargs) or existing_data.get("tools", []), output_blocks=existing_data.get("output_blocks", {}), usage=existing_data.get("usage"), - output_text=kwargs.get("output_text", existing_data.get("output_text")), + output_text=existing_data.get("output_text", None), request_model=kwargs.get("model", existing_data.get("request_model")), response_model=existing_data.get("response_model"), - # Reasoning attributes request_reasoning_summary=( kwargs.get("reasoning", {}).get( "summary", existing_data.get("request_reasoning_summary") @@ -605,30 +1038,24 @@ async def async_responses_get_or_create_wrapper( ), response_reasoning_effort=kwargs.get("reasoning", {}).get("effort"), ) - except Exception: - traced_data = None - span = tracer.start_span( - SPAN_NAME, - kind=SpanKind.CLIENT, - start_time=( - start_time if traced_data is None else int(traced_data.start_time) - ), - ) - span.set_attribute(ERROR_TYPE, e.__class__.__name__) - span.record_exception(e) - span.set_status(StatusCode.ERROR, str(e)) - if traced_data: - set_data_attributes(traced_data, span) - span.end() - raise + return ResponseStream( + span, + response, + traced_data, + instance, + token_counter, + choice_counter, + duration_histogram, + streaming_time_to_first_token, + streaming_time_to_generate, + start_time, + kwargs, + ) parsed_response = parse_response(response) existing_data = responses.get(parsed_response.id) - if existing_data is None: - existing_data = {} - else: - existing_data = existing_data.model_dump() + existing_data = {} if existing_data is None else existing_data.model_dump() request_tools = get_tools_from_kwargs(kwargs) @@ -638,14 +1065,15 @@ async def async_responses_get_or_create_wrapper( parsed_response_output_text = None if hasattr(parsed_response, "output_text"): parsed_response_output_text = parsed_response.output_text - else: - try: - parsed_response_output_text = parsed_response.output[0].content[0].text - except Exception: - pass + elif hasattr(parsed_response, "output") and len(parsed_response.output) > 0: + first_output = parsed_response.output[0] + if hasattr(first_output, "content") and len(first_output.content) > 0: + first_content = first_output.content[0] + if hasattr(first_content, "text"): + parsed_response_output_text = first_content.text traced_data = TracedData( - start_time=existing_data.get("start_time", start_time), + start_time=existing_data.get("start_time", int(start_time * 1e9)), response_id=parsed_response.id, input=process_input(existing_data.get("input", kwargs.get("input"))), instructions=existing_data.get("instructions", kwargs.get("instructions")), @@ -674,11 +1102,6 @@ async def async_responses_get_or_create_wrapper( return response if parsed_response.status == "completed": - span = tracer.start_span( - SPAN_NAME, - kind=SpanKind.CLIENT, - start_time=int(traced_data.start_time), - ) set_data_attributes(traced_data, span) span.end() @@ -688,6 +1111,10 @@ async def async_responses_get_or_create_wrapper( @dont_throw @_with_tracer_wrapper def responses_cancel_wrapper(tracer: Tracer, wrapped, instance, args, kwargs): + """ + Wrapper for OpenAI responses.cancel method. + Creates a span for cancelled responses and handles cleanup. + """ if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return wrapped(*args, **kwargs) @@ -700,7 +1127,7 @@ def responses_cancel_wrapper(tracer: Tracer, wrapped, instance, args, kwargs): span = tracer.start_span( SPAN_NAME, kind=SpanKind.CLIENT, - start_time=existing_data.start_time, + start_time=existing_data.start_time, # Already in nanoseconds record_exception=True, ) span.record_exception(Exception("Response cancelled")) @@ -714,6 +1141,10 @@ def responses_cancel_wrapper(tracer: Tracer, wrapped, instance, args, kwargs): async def async_responses_cancel_wrapper( tracer: Tracer, wrapped, instance, args, kwargs ): + """ + Async wrapper for OpenAI responses.cancel method. + Creates a span for cancelled async responses and handles cleanup. + """ if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return await wrapped(*args, **kwargs) @@ -726,13 +1157,10 @@ async def async_responses_cancel_wrapper( span = tracer.start_span( SPAN_NAME, kind=SpanKind.CLIENT, - start_time=existing_data.start_time, + start_time=existing_data.start_time, # Already in nanoseconds record_exception=True, ) span.record_exception(Exception("Response cancelled")) set_data_attributes(existing_data, span) span.end() return response - - -# TODO: build streaming responses diff --git a/packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py b/packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py new file mode 100644 index 0000000000..204889cf3c --- /dev/null +++ b/packages/opentelemetry-instrumentation-openai/tests/traces/test_responses_streaming.py @@ -0,0 +1,226 @@ +"""Tests for OpenAI Responses API streaming instrumentation.""" + +import pytest +from openai import OpenAI +from opentelemetry import trace +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + +@pytest.mark.vcr +def test_issue_3395_streaming_spans(instrument_legacy, span_exporter: InMemorySpanExporter, openai_client: OpenAI): + """ + Test that reproduces the exact issue from #3395. + + Issue: OpenAI responses API does not emit spans when using stream=True. + + The original code that failed: + ```python + response = client.responses.create( + model="gpt-4o-mini", + input=[{"role": "user", "content": "ping"}], + stream=True + ) + for chunk in response: + print(chunk) + ``` + """ + # Clear any existing spans + span_exporter.clear() + + # Create a streaming response + response = openai_client.responses.create( + model="gpt-4o-mini", + input=[{"role": "user", "content": "ping"}], + stream=True + ) + + # Consume the stream - exact pattern from the issue + chunks_received = [] + for chunk in response: + chunks_received.append(chunk) + + # Verify that spans were created (this was the bug - no spans were created) + spans = span_exporter.get_finished_spans() + + # Verify that we do have a span (the responses.create span) + assert len(spans) > 0, "No spans were created for streaming response - issue #3395 not fixed!" + + # Verify the attributes + response_span = None + for span in spans: + if span.name == "openai.response": + response_span = span + break + + assert response_span is not None, "No 'openai.response' span found" + + # Verify key attributes are present + assert response_span.attributes.get("gen_ai.system") == "openai" + assert response_span.attributes.get("gen_ai.request.model") == "gpt-4o-mini" + + # Verify the input was captured + prompt_content = response_span.attributes.get("gen_ai.prompt.0.content") + if isinstance(prompt_content, str): + assert "ping" in prompt_content or prompt_content == "ping" + + # Verify we actually received chunks + assert len(chunks_received) > 0, "No chunks were received from the stream" + + +@pytest.mark.vcr +def test_responses_streaming(instrument_legacy, span_exporter: InMemorySpanExporter, openai_client: OpenAI): + """Test that streaming responses generate proper spans.""" + + # Create a streaming response + response_stream = openai_client.responses.create( + model="gpt-4.1-nano", + input="What is the capital of France?", + stream=True + ) + + # Consume the stream + full_text = "" + for chunk in response_stream: + if chunk.output: + for output_item in chunk.output: + if hasattr(output_item, 'content'): + for content_item in output_item.content: + if hasattr(content_item, 'text'): + full_text += content_item.text + + # Check that span was created + spans = span_exporter.get_finished_spans() + assert len(spans) == 1, f"Expected 1 span, got {len(spans)}" + + span = spans[0] + assert span.name == "openai.response" + assert span.attributes["gen_ai.system"] == "openai" + assert span.attributes["gen_ai.request.model"] == "gpt-4.1-nano" + + # Check that prompt was captured + assert span.attributes.get("gen_ai.prompt.0.content") == "What is the capital of France?" + assert span.attributes.get("gen_ai.prompt.0.role") == "user" + + # Check that completion was captured + assert "gen_ai.completion.0.content" in span.attributes + assert span.attributes["gen_ai.completion.0.role"] == "assistant" + + # Basic content check - should mention Paris + assert full_text, "No content was streamed" + + +@pytest.mark.vcr +@pytest.mark.asyncio +async def test_responses_streaming_async(instrument_legacy, span_exporter: InMemorySpanExporter, openai_client: OpenAI): + """Test that async streaming responses generate proper spans.""" + + # Create async client + from openai import AsyncOpenAI + async_client = AsyncOpenAI() + + # Create a streaming response + response_stream = await async_client.responses.create( + model="gpt-4.1-nano", + input="What is 2+2?", + stream=True + ) + + # Consume the stream + full_text = "" + async for chunk in response_stream: + if chunk.output: + for output_item in chunk.output: + if hasattr(output_item, 'content'): + for content_item in output_item.content: + if hasattr(content_item, 'text'): + full_text += content_item.text + + # Check that span was created + spans = span_exporter.get_finished_spans() + assert len(spans) == 1, f"Expected 1 span, got {len(spans)}" + + span = spans[0] + assert span.name == "openai.response" + assert span.attributes["gen_ai.system"] == "openai" + assert span.attributes["gen_ai.request.model"] == "gpt-4.1-nano" + + # Check that prompt was captured + assert span.attributes.get("gen_ai.prompt.0.content") == "What is 2+2?" + assert span.attributes.get("gen_ai.prompt.0.role") == "user" + + # Check that completion was captured + assert "gen_ai.completion.0.content" in span.attributes + assert span.attributes["gen_ai.completion.0.role"] == "assistant" + + # Basic content check + assert full_text, "No content was streamed" + + +@pytest.mark.vcr +def test_responses_streaming_with_context_manager( + instrument_legacy, span_exporter: InMemorySpanExporter, openai_client: OpenAI +): + """Test streaming responses with context manager usage.""" + + full_text = "" + with openai_client.responses.create( + model="gpt-4.1-nano", + input="Count to 5", + stream=True + ) as response_stream: + for chunk in response_stream: + if chunk.output: + for output_item in chunk.output: + if hasattr(output_item, 'content'): + for content_item in output_item.content: + if hasattr(content_item, 'text'): + full_text += content_item.text + + # Check that span was created after context manager exits + spans = span_exporter.get_finished_spans() + assert len(spans) == 1, f"Expected 1 span, got {len(spans)}" + + span = spans[0] + assert span.name == "openai.response" + assert span.attributes["gen_ai.system"] == "openai" + + # Verify content was captured + assert full_text, "No content was streamed" + assert "gen_ai.completion.0.content" in span.attributes + + +@pytest.mark.vcr +def test_responses_streaming_with_tracer_context( + instrument_legacy, span_exporter: InMemorySpanExporter, openai_client: OpenAI +): + """ + Test streaming with tracer context (similar to issue #3395 example). + Ensures spans are properly nested when using tracer.start_as_current_span. + """ + tracer = trace.get_tracer(__name__) + + # Clear any existing spans + span_exporter.clear() + + # Similar to the issue example with tracer.start_as_current_span + with tracer.start_as_current_span("example-span"): + response = openai_client.responses.create( + model="gpt-4o-mini", + input=[{"role": "user", "content": "ping"}], + stream=True + ) + + chunks_received = [] + for chunk in response: + chunks_received.append(chunk) + + # Get all spans + spans = span_exporter.get_finished_spans() + + # We should have both the example-span and the openai.response span + span_names = [span.name for span in spans] + assert "openai.response" in span_names, "Missing openai.response span" + assert "example-span" in span_names, "Missing example-span" + + # Verify we got chunks + assert len(chunks_received) > 0, "No chunks received" \ No newline at end of file