From 3af82047ad7aa439db4364443bc251449c10d7f8 Mon Sep 17 00:00:00 2001 From: Kashish Hora Date: Tue, 19 Aug 2025 17:20:02 -0400 Subject: [PATCH 1/5] feat: add telemetry export support with multiple backends - Add support for OpenTelemetry (OTLP), Datadog, and Sentry exporters - Enable telemetry-only mode (project_id now optional) - Implement distributed tracing with W3C trace context propagation - Add TelemetryManager for coordinating multiple exporters - Support parallel export to multiple telemetry backends - Add requests dependency for HTTP-based exporters --- pyproject.toml | 1 + src/mcpcat/__init__.py | 55 +- src/mcpcat/modules/event_queue.py | 66 ++- src/mcpcat/modules/exporters/__init__.py | 22 + src/mcpcat/modules/exporters/datadog.py | 224 ++++++++ src/mcpcat/modules/exporters/otlp.py | 239 +++++++++ src/mcpcat/modules/exporters/sentry.py | 485 ++++++++++++++++++ src/mcpcat/modules/exporters/trace_context.py | 77 +++ src/mcpcat/modules/telemetry.py | 100 ++++ src/mcpcat/types.py | 43 +- 10 files changed, 1282 insertions(+), 30 deletions(-) create mode 100644 src/mcpcat/modules/exporters/__init__.py create mode 100644 src/mcpcat/modules/exporters/datadog.py create mode 100644 src/mcpcat/modules/exporters/otlp.py create mode 100644 src/mcpcat/modules/exporters/sentry.py create mode 100644 src/mcpcat/modules/exporters/trace_context.py create mode 100644 src/mcpcat/modules/telemetry.py diff --git a/pyproject.toml b/pyproject.toml index bcd75eb..fc7aac6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ dependencies = [ "mcp>=1.2.0", "mcpcat-api==0.1.4", "pydantic>=2.0.0", + "requests>=2.31.0", ] [project.urls] diff --git a/src/mcpcat/__init__.py b/src/mcpcat/__init__.py index 08c58d1..b58fbd6 100644 --- a/src/mcpcat/__init__.py +++ b/src/mcpcat/__init__.py @@ -21,11 +21,33 @@ ) -def track(server: Any, project_id: str, options: MCPCatOptions | None = None) -> Any: +def track(server: Any, project_id: str | None = None, options: MCPCatOptions | None = None) -> Any: + """ + Initialize MCPCat tracking with optional telemetry export. + + Args: + server: MCP server instance to track + project_id: MCPCat project ID (optional if using telemetry-only mode) + options: Configuration options including telemetry exporters + + Returns: + The server instance with tracking enabled + + Raises: + ValueError: If neither project_id nor exporters are provided + TypeError: If server is not a compatible MCP server instance + """ # Use default options if not provided if options is None: options = MCPCatOptions() + # Validate configuration + if not project_id and not options.exporters: + raise ValueError( + "Either project_id or exporters must be provided. " + "Use project_id for MCPCat, exporters for telemetry-only mode, or both." + ) + # Validate server compatibility if not is_compatible_server(server): raise TypeError( @@ -37,12 +59,25 @@ def track(server: Any, project_id: str, options: MCPCatOptions | None = None) -> if is_fastmcp: lowlevel_server = server._mcp_server + # Initialize telemetry if exporters configured + if options.exporters: + from mcpcat.modules.telemetry import TelemetryManager + from mcpcat.modules.event_queue import event_queue, set_telemetry_manager + + # Share the event queue's executor for consistency + telemetry_manager = TelemetryManager( + options.exporters, + event_queue.executor + ) + set_telemetry_manager(telemetry_manager) + write_to_log(f"Telemetry initialized with {len(options.exporters)} exporter(s)") + # Create and store tracking data session_id = new_session_id() session_info = get_session_info(lowlevel_server) data = MCPCatData( session_id=session_id, - project_id=project_id, + project_id=project_id or "", # Use empty string if None for compatibility last_activity=datetime.now(timezone.utc), session_info=session_info, identified_sessions=dict(), @@ -53,13 +88,12 @@ def track(server: Any, project_id: str, options: MCPCatOptions | None = None) -> try: # Always initialize dynamic tracking for complete tool coverage from mcpcat.modules.overrides.monkey_patch import apply_monkey_patches - + # Initialize the dynamic tracking system by setting the flag if not data.tracker_initialized: data.tracker_initialized = True - from mcpcat.modules.logging import write_to_log write_to_log(f"Dynamic tracking initialized for server {id(lowlevel_server)}") - + # Apply appropriate tracking method based on server type if is_fastmcp: # For FastMCP servers, use monkey-patching for tool tracking @@ -70,12 +104,15 @@ def track(server: Any, project_id: str, options: MCPCatOptions | None = None) -> else: # For low-level servers, use the traditional overrides (no monkey patching needed) override_lowlevel_mcp_server(lowlevel_server, data) - - write_to_log(f"MCPCat initialized with dynamic tracking for session {session_id} on project {project_id}") - + + if project_id: + write_to_log(f"MCPCat initialized with dynamic tracking for session {session_id} on project {project_id}") + else: + write_to_log(f"MCPCat initialized in telemetry-only mode for session {session_id}") + except Exception as e: write_to_log(f"Error initializing MCPCat: {e}") - + return server __all__ = [ diff --git a/src/mcpcat/modules/event_queue.py b/src/mcpcat/modules/event_queue.py index b8ef5c5..57286d1 100644 --- a/src/mcpcat/modules/event_queue.py +++ b/src/mcpcat/modules/event_queue.py @@ -8,7 +8,10 @@ import time from datetime import datetime, timezone from concurrent.futures import ThreadPoolExecutor -from typing import Any +from typing import Any, Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from .telemetry import TelemetryManager from mcpcat_api import ApiClient, Configuration, EventsApi from mcpcat.modules.constants import EVENT_ID_PREFIX, MCPCAT_API_URL @@ -30,7 +33,7 @@ def __init__(self, api_client=None): self.max_retries = 3 self.max_queue_size = 10000 # Prevent unbounded growth self.concurrency = 5 # Max parallel requests - + # Allow injection of api_client for testing if api_client is None: config = Configuration(host=MCPCAT_API_URL) @@ -38,13 +41,13 @@ def __init__(self, api_client=None): self.api_client = EventsApi(api_client=api_client_instance) else: self.api_client = api_client - + self._shutdown = False self._shutdown_event = threading.Event() - + # Thread pool for processing events self.executor = ThreadPoolExecutor(max_workers=self.concurrency) - + # Start worker thread self.worker_thread = threading.Thread(target=self._worker, daemon=True) self.worker_thread.start() @@ -68,7 +71,7 @@ def _worker(self) -> None: try: # Wait for an event with timeout event = self.queue.get(timeout=0.1) - + # Submit event processing to thread pool # The executor will queue it if all workers are busy try: @@ -80,7 +83,7 @@ def _worker(self) -> None: self.queue.put_nowait(event) except queue.Full: write_to_log(f"Could not requeue event {event.id or 'unknown'} - queue full") - + except queue.Empty: continue except Exception as e: @@ -105,7 +108,20 @@ def _process_event(self, event: UnredactedEvent) -> None: if event: event.id = event.id or generate_prefixed_ksuid("evt") - self._send_event(event) + + # Export to telemetry backends if configured (non-blocking) + if _telemetry_manager: + try: + _telemetry_manager.export(event) + except Exception as e: + write_to_log(f"Telemetry export submission failed: {e}") + + # Send to MCPCat API only if project_id exists + if event.project_id: + self._send_event(event) + elif not _telemetry_manager: + # Only warn if we have neither MCPCat nor telemetry configured + write_to_log("Warning: Event has no project_id and no telemetry exporters configured") def _send_event(self, event: Event, retries: int = 0) -> None: """Send event to API.""" @@ -164,22 +180,39 @@ def destroy(self) -> None: write_to_log(f"Shutdown complete. {remaining} events were not processed.") +# Global telemetry manager instance (optional) +_telemetry_manager: Optional['TelemetryManager'] = None + + +def set_telemetry_manager(manager: Optional['TelemetryManager']) -> None: + """ + Set the global telemetry manager instance. + + Args: + manager: TelemetryManager instance or None to disable telemetry + """ + global _telemetry_manager + _telemetry_manager = manager + if manager: + write_to_log(f"Telemetry manager set with {manager.get_exporter_count()} exporter(s)") + + # Global event queue instance event_queue = EventQueue() def _shutdown_handler(signum, frame): """Handle shutdown signals.""" - + write_to_log("Received shutdown signal, gracefully shutting down...") - + # Reset signal handlers to default behavior to avoid recursive calls signal.signal(signal.SIGINT, signal.SIG_DFL) signal.signal(signal.SIGTERM, signal.SIG_DFL) - + # Perform graceful shutdown event_queue.destroy() - + # Force exit after graceful shutdown os._exit(0) @@ -215,15 +248,16 @@ def publish_event(server: Any, event: UnredactedEvent) -> None: session_info = get_session_info(server, data) # Create full event with all required fields - # Merge event data with session info + # Merge event data with session info event_data = event.model_dump(exclude_none=True) session_data = session_info.model_dump(exclude_none=True) - + + # Merge data, ensuring project_id from data takes precedence merged_data = {**event_data, **session_data} - + merged_data['project_id'] = data.project_id # Override with tracking data's project_id + full_event = UnredactedEvent( **merged_data, - project_id=data.project_id, redaction_fn=data.options.redact_sensitive_information, ) diff --git a/src/mcpcat/modules/exporters/__init__.py b/src/mcpcat/modules/exporters/__init__.py new file mode 100644 index 0000000..01eb78d --- /dev/null +++ b/src/mcpcat/modules/exporters/__init__.py @@ -0,0 +1,22 @@ +"""Base exporter interface for telemetry exporters.""" + +from abc import ABC, abstractmethod +from ...types import Event + + +class Exporter(ABC): + """Abstract base class for telemetry exporters.""" + + @abstractmethod + def export(self, event: Event) -> None: + """ + Export an event to the telemetry backend. + + Args: + event: The MCPCat event to export + + Note: + This method should handle all errors internally and never + raise exceptions that could affect the main MCP server. + """ + pass \ No newline at end of file diff --git a/src/mcpcat/modules/exporters/datadog.py b/src/mcpcat/modules/exporters/datadog.py new file mode 100644 index 0000000..d48ce79 --- /dev/null +++ b/src/mcpcat/modules/exporters/datadog.py @@ -0,0 +1,224 @@ +"""Datadog exporter for MCPCat telemetry.""" + +import json +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime +from typing import Dict, List, Any, Optional +import requests + +from ...types import Event, DatadogExporterConfig +from ...modules.logging import write_to_log +from . import Exporter +from .trace_context import trace_context + + +class DatadogExporter(Exporter): + """Exports MCPCat events to Datadog logs and metrics.""" + + def __init__(self, config: DatadogExporterConfig): + """ + Initialize Datadog exporter. + + Args: + config: Datadog exporter configuration + """ + self.config = config + self.api_key = config["api_key"] + self.site = config["site"] + self.service = config["service"] + self.env = config.get("env", "production") + + # Build API endpoints based on site + site_clean = self.site.replace("https://", "").replace("http://", "").rstrip("/") + self.logs_url = f"https://http-intake.logs.{site_clean}/api/v2/logs" + self.metrics_url = f"https://api.{site_clean}/api/v1/series" + + # Create session for connection pooling + self.session = requests.Session() + + write_to_log(f"Datadog exporter initialized for service: {self.service}") + + def export(self, event: Event) -> None: + """ + Export an event to Datadog logs and metrics. + + Args: + event: MCPCat event to export + """ + write_to_log("DatadogExporter: Sending event immediately to Datadog") + + # Convert event to log and metrics + log = self.event_to_log(event) + metrics = self.event_to_metrics(event) + + # Debug: Log the metrics payload + write_to_log(f"DatadogExporter: Metrics URL: {self.metrics_url}") + write_to_log(f"DatadogExporter: Metrics payload: {json.dumps({'series': metrics})}") + + # Use ThreadPoolExecutor to send both requests in parallel + with ThreadPoolExecutor(max_workers=2) as executor: + # Submit both requests + logs_future = executor.submit(self._send_logs, [log]) + metrics_future = executor.submit(self._send_metrics, metrics) + + # Wait for both to complete (results will be logged in the methods) + logs_future.result() + metrics_future.result() + + def _send_logs(self, logs: List[Dict[str, Any]]) -> None: + """Send logs to Datadog.""" + try: + response = self.session.post( + self.logs_url, + headers={ + "DD-API-KEY": self.api_key, + "Content-Type": "application/json" + }, + json=logs, + timeout=10 + ) + + if not response.ok: + error_body = response.text + write_to_log(f"Datadog logs failed - Status: {response.status_code}, Body: {error_body}") + else: + write_to_log(f"Datadog logs success - Status: {response.status_code}") + except Exception as err: + write_to_log(f"Datadog logs network error: {err}") + + def _send_metrics(self, metrics: List[Dict[str, Any]]) -> None: + """Send metrics to Datadog.""" + try: + response = self.session.post( + self.metrics_url, + headers={ + "DD-API-KEY": self.api_key, + "Content-Type": "application/json" + }, + json={"series": metrics}, + timeout=10 + ) + + if not response.ok: + error_body = response.text + write_to_log(f"Datadog metrics failed - Status: {response.status_code}, Body: {error_body}") + else: + response_body = response.text + write_to_log(f"Datadog metrics success - Status: {response.status_code}, Body: {response_body}") + except Exception as err: + write_to_log(f"Datadog metrics network error: {err}") + + def event_to_log(self, event: Event) -> Dict[str, Any]: + """ + Convert MCPCat event to Datadog log format. + + Args: + event: MCPCat event + + Returns: + Datadog log dictionary + """ + tags: List[str] = [] + + # Add basic tags + if self.env: + tags.append(f"env:{self.env}") + if event.event_type: + tags.append(f"event_type:{event.event_type.replace('/', '.')}") + if event.resource_name: + tags.append(f"resource:{event.resource_name}") + if event.is_error: + tags.append("error:true") + + # Get timestamp in milliseconds + timestamp_ms = int(event.timestamp.timestamp() * 1000) if event.timestamp else int(datetime.now().timestamp() * 1000) + + log: Dict[str, Any] = { + "message": f"{event.event_type or 'unknown'} - {event.resource_name or 'unknown'}", + "service": self.service, + "ddsource": "mcpcat", + "ddtags": ",".join(tags), + "timestamp": timestamp_ms, + "status": "error" if event.is_error else "info", + "dd": { + "trace_id": trace_context.get_trace_id(event.session_id), + "span_id": trace_context.generate_span_id() + }, + "mcp": { + "session_id": event.session_id, + "event_id": event.id, + "event_type": event.event_type, + "resource": event.resource_name, + "duration_ms": event.duration, + "user_intent": event.user_intent, + "actor_id": event.identify_actor_given_id, + "actor_name": event.identify_actor_name, + "client_name": event.client_name, + "client_version": event.client_version, + "server_name": event.server_name, + "server_version": event.server_version, + "is_error": event.is_error, + "error": event.error + } + } + + # Add error at root level if it exists + if event.is_error and event.error: + log["error"] = { + "message": event.error if isinstance(event.error, str) else json.dumps(event.error) + } + + return log + + def event_to_metrics(self, event: Event) -> List[Dict[str, Any]]: + """ + Convert MCPCat event to Datadog metrics. + + Args: + event: MCPCat event + + Returns: + List of Datadog metric dictionaries + """ + metrics: List[Dict[str, Any]] = [] + + # Get timestamp in seconds (Unix timestamp) + timestamp = int(event.timestamp.timestamp()) if event.timestamp else int(datetime.now().timestamp()) + + tags: List[str] = [f"service:{self.service}"] + + # Add optional tags + if self.env: + tags.append(f"env:{self.env}") + if event.event_type: + tags.append(f"event_type:{event.event_type.replace('/', '.')}") + if event.resource_name: + tags.append(f"resource:{event.resource_name}") + + # Event count metric + metrics.append({ + "metric": "mcp.events.count", + "type": "count", + "points": [[timestamp, 1]], + "tags": tags + }) + + # Duration metric (only if duration exists) + if event.duration is not None: + metrics.append({ + "metric": "mcp.event.duration", + "type": "gauge", + "points": [[timestamp, event.duration]], + "tags": tags + }) + + # Error count metric + if event.is_error: + metrics.append({ + "metric": "mcp.errors.count", + "type": "count", + "points": [[timestamp, 1]], + "tags": tags + }) + + return metrics \ No newline at end of file diff --git a/src/mcpcat/modules/exporters/otlp.py b/src/mcpcat/modules/exporters/otlp.py new file mode 100644 index 0000000..dd40575 --- /dev/null +++ b/src/mcpcat/modules/exporters/otlp.py @@ -0,0 +1,239 @@ +"""OpenTelemetry Protocol (OTLP) exporter for MCPCat telemetry.""" + +from datetime import datetime +from typing import Any, Dict, List, Optional + +import requests + +from ...types import Event, OTLPExporterConfig +from ...modules.logging import write_to_log +from . import Exporter +from .trace_context import trace_context + + +class OTLPExporter(Exporter): + """Exports MCPCat events to OpenTelemetry collectors via OTLP.""" + + def __init__(self, config: OTLPExporterConfig): + """ + Initialize OTLP exporter. + + Args: + config: OTLP exporter configuration + """ + self.protocol = config.get("protocol", "http/protobuf") + + # Set default endpoint based on protocol + if "endpoint" in config: + self.endpoint = config["endpoint"] + else: + if self.protocol == "grpc": + self.endpoint = "http://localhost:4317" + else: + self.endpoint = "http://localhost:4318/v1/traces" + + # Set up headers + self.headers = { + "Content-Type": "application/json", # Using JSON for easier debugging + } + if "headers" in config: + self.headers.update(config["headers"]) + + # Create session for connection pooling + self.session = requests.Session() + self.session.headers.update(self.headers) + + write_to_log(f"OTLP exporter initialized with endpoint: {self.endpoint}") + + def export(self, event: Event) -> None: + """ + Export an event to OTLP collector. + + Args: + event: MCPCat event to export + """ + try: + # Convert event to OTLP span format + span = self._convert_to_otlp_span(event) + + # Create OTLP JSON format request + otlp_request = { + "resourceSpans": [ + { + "resource": { + "attributes": self._get_resource_attributes(event) + }, + "scopeSpans": [ + { + "scope": { + "name": "mcpcat", + "version": event.mcpcat_version or "0.1.0" + }, + "spans": [span] + } + ] + } + ] + } + + # Send to OTLP collector + response = self.session.post( + self.endpoint, + json=otlp_request, + timeout=5 + ) + response.raise_for_status() + + write_to_log(f"Successfully exported event to OTLP: {event.id}") + + except requests.exceptions.RequestException as e: + raise Exception(f"OTLP export failed: {e}") + except Exception as e: + raise Exception(f"OTLP export error: {e}") + + def _convert_to_otlp_span(self, event: Event) -> Dict[str, Any]: + """ + Convert MCPCat event to OTLP span format. + + Args: + event: MCPCat event + + Returns: + OTLP span dictionary + """ + # Convert timestamp to nanoseconds + if event.timestamp: + start_time_nanos = int(event.timestamp.timestamp() * 1_000_000_000) + else: + start_time_nanos = int(datetime.now().timestamp() * 1_000_000_000) + + # Calculate end time based on duration + end_time_nanos = start_time_nanos + if event.duration: + end_time_nanos += event.duration * 1_000_000 # duration is in ms + + return { + "traceId": trace_context.get_trace_id(event.session_id), + "spanId": trace_context.generate_span_id(), + "name": event.event_type or "mcp.event", + "kind": 2, # SPAN_KIND_SERVER + "startTimeUnixNano": str(start_time_nanos), + "endTimeUnixNano": str(end_time_nanos), + "attributes": self._get_span_attributes(event), + "status": { + "code": 2 if getattr(event, 'is_error', False) else 1 # ERROR : OK + } + } + + def _get_resource_attributes(self, event: Event) -> List[Dict[str, Any]]: + """ + Get resource-level attributes for OTLP. + + Args: + event: MCPCat event + + Returns: + List of attribute key-value pairs + """ + attributes = [] + + if event.server_name: + attributes.append({ + "key": "service.name", + "value": {"stringValue": event.server_name} + }) + + if event.server_version: + attributes.append({ + "key": "service.version", + "value": {"stringValue": event.server_version} + }) + + # Add SDK information + attributes.append({ + "key": "telemetry.sdk.name", + "value": {"stringValue": "mcpcat-python"} + }) + + if event.mcpcat_version: + attributes.append({ + "key": "telemetry.sdk.version", + "value": {"stringValue": event.mcpcat_version} + }) + + return attributes + + def _get_span_attributes(self, event: Event) -> List[Dict[str, Any]]: + """ + Get span-level attributes for OTLP. + + Args: + event: MCPCat event + + Returns: + List of attribute key-value pairs + """ + attributes = [] + + # Add MCP-specific attributes + if event.event_type: + attributes.append({ + "key": "mcp.event_type", + "value": {"stringValue": event.event_type} + }) + + if event.session_id: + attributes.append({ + "key": "mcp.session_id", + "value": {"stringValue": event.session_id} + }) + + if event.project_id: + attributes.append({ + "key": "mcp.project_id", + "value": {"stringValue": event.project_id} + }) + + # Add resource name (for tools, prompts, resources) + if event.resource_name: + attributes.append({ + "key": "mcp.resource_name", + "value": {"stringValue": event.resource_name} + }) + + # Add user intent if available + if event.user_intent: + attributes.append({ + "key": "mcp.user_intent", + "value": {"stringValue": event.user_intent} + }) + + # Add actor information + if event.identify_actor_given_id: + attributes.append({ + "key": "mcp.actor_id", + "value": {"stringValue": event.identify_actor_given_id} + }) + + if event.identify_actor_name: + attributes.append({ + "key": "mcp.actor_name", + "value": {"stringValue": event.identify_actor_name} + }) + + # Add client information + if event.client_name: + attributes.append({ + "key": "mcp.client_name", + "value": {"stringValue": event.client_name} + }) + + if event.client_version: + attributes.append({ + "key": "mcp.client_version", + "value": {"stringValue": event.client_version} + }) + + # Filter out empty attributes + return [attr for attr in attributes if attr["value"].get("stringValue")] + diff --git a/src/mcpcat/modules/exporters/sentry.py b/src/mcpcat/modules/exporters/sentry.py new file mode 100644 index 0000000..15ad2c0 --- /dev/null +++ b/src/mcpcat/modules/exporters/sentry.py @@ -0,0 +1,485 @@ +"""Sentry exporter for MCPCat telemetry.""" + +import json +import re +from datetime import datetime, timezone +from typing import Dict, Any, Optional, List +import requests + +from ...types import Event, SentryExporterConfig +from ...modules.logging import write_to_log +from . import Exporter +from .trace_context import trace_context + + +class SentryExporter(Exporter): + """Exports MCPCat events to Sentry as logs, transactions, and error events.""" + + def __init__(self, config: SentryExporterConfig): + """ + Initialize Sentry exporter. + + Args: + config: Sentry exporter configuration + """ + self.config = config + self.dsn = config["dsn"] + self.environment = config.get("environment", "production") + self.release = config.get("release") + self.enable_tracing = config.get("enable_tracing", False) + + # Parse DSN + self.parsed_dsn = self.parse_dsn(self.dsn) + + # Build envelope endpoint + protocol = self.parsed_dsn["protocol"] + host = self.parsed_dsn["host"] + port = f":{self.parsed_dsn['port']}" if self.parsed_dsn.get("port") else "" + path = self.parsed_dsn.get("path", "") + project_id = self.parsed_dsn["project_id"] + + self.endpoint = f"{protocol}://{host}{port}{path}/api/{project_id}/envelope/" + + # Build auth header + self.auth_header = f"Sentry sentry_version=7, sentry_client=mcpcat/1.0.0, sentry_key={self.parsed_dsn['public_key']}" + + # Create session for connection pooling + self.session = requests.Session() + + write_to_log(f"SentryExporter: Initialized with endpoint {self.endpoint}") + + def parse_dsn(self, dsn: str) -> Dict[str, str]: + """ + Parse Sentry DSN to extract components. + + Args: + dsn: Sentry DSN string + + Returns: + Dictionary with DSN components + + Raises: + ValueError: If DSN is invalid + """ + # DSN format: protocol://publicKey@host[:port]/path/projectId + regex = r'^(https?):\/\/([a-f0-9]+)@([\w.-]+)(:\d+)?(\/.*)?\/(\d+)$' + match = re.match(regex, dsn) + + if not match: + raise ValueError(f"Invalid Sentry DSN: {dsn}") + + return { + "protocol": match.group(1), + "public_key": match.group(2), + "host": match.group(3), + "port": match.group(4)[1:] if match.group(4) else None, # Remove leading ':' + "path": match.group(5) or "", + "project_id": match.group(6) + } + + def export(self, event: Event) -> None: + """ + Export an event to Sentry. + + Args: + event: MCPCat event to export + """ + try: + # ALWAYS send log + log = self.event_to_log(event) + log_envelope = self.create_log_envelope(log) + + write_to_log(f"SentryExporter: Sending log for event {event.id} to Sentry") + + log_response = self.session.post( + self.endpoint, + headers={ + "X-Sentry-Auth": self.auth_header, + "Content-Type": "application/x-sentry-envelope" + }, + data=log_envelope, + timeout=10 + ) + + if not log_response.ok: + error_body = log_response.text + write_to_log(f"Sentry log export failed - Status: {log_response.status_code}, Body: {error_body}") + else: + write_to_log(f"Sentry log export success - Event: {event.id}") + + # OPTIONALLY send transaction for performance monitoring + if self.enable_tracing: + transaction = self.event_to_transaction(event) + transaction_envelope = self.create_transaction_envelope(transaction) + + write_to_log(f"SentryExporter: Sending transaction {transaction['event_id']} to Sentry") + + transaction_response = self.session.post( + self.endpoint, + headers={ + "X-Sentry-Auth": self.auth_header, + "Content-Type": "application/x-sentry-envelope" + }, + data=transaction_envelope, + timeout=10 + ) + + if not transaction_response.ok: + error_body = transaction_response.text + write_to_log(f"Sentry transaction export failed - Status: {transaction_response.status_code}, Body: {error_body}") + else: + write_to_log(f"Sentry transaction export success - Event: {event.id}") + + # ALWAYS send error event for Issue creation if this is an error + if event.is_error: + # Use transaction if available for better context + transaction = self.event_to_transaction(event) if self.enable_tracing else None + error_event = self.event_to_error_event(event, transaction) + error_envelope = self.create_error_envelope(error_event) + + write_to_log(f"SentryExporter: Sending error event {error_event['event_id']} to Sentry for Issue creation") + + error_response = self.session.post( + self.endpoint, + headers={ + "X-Sentry-Auth": self.auth_header, + "Content-Type": "application/x-sentry-envelope" + }, + data=error_envelope, + timeout=10 + ) + + if not error_response.ok: + error_body = error_response.text + write_to_log(f"Sentry error export failed - Status: {error_response.status_code}, Body: {error_body}") + else: + write_to_log(f"Sentry error export success - Event: {event.id}") + + except Exception as error: + write_to_log(f"Sentry export error: {error}") + + def event_to_log(self, event: Event) -> Dict[str, Any]: + """ + Convert MCPCat event to Sentry log format. + + Args: + event: MCPCat event + + Returns: + Sentry log dictionary + """ + timestamp = event.timestamp.timestamp() if event.timestamp else datetime.now().timestamp() + trace_id = trace_context.get_trace_id(event.session_id) + + # Build message + message = f"MCP {event.event_type or 'event'}: {event.resource_name}" if event.resource_name else f"MCP {event.event_type or 'event'}" + + return { + "timestamp": timestamp, + "trace_id": trace_id, + "level": "error" if event.is_error else "info", + "body": message, + "attributes": self.build_log_attributes(event) + } + + def build_log_attributes(self, event: Event) -> Dict[str, Dict[str, Any]]: + """ + Build log attributes for Sentry. + + Args: + event: MCPCat event + + Returns: + Attributes dictionary + """ + attributes: Dict[str, Dict[str, Any]] = {} + + if event.event_type: + attributes["eventType"] = {"value": event.event_type, "type": "string"} + if event.resource_name: + attributes["resourceName"] = {"value": event.resource_name, "type": "string"} + if event.server_name: + attributes["serverName"] = {"value": event.server_name, "type": "string"} + if event.client_name: + attributes["clientName"] = {"value": event.client_name, "type": "string"} + if event.session_id: + attributes["sessionId"] = {"value": event.session_id, "type": "string"} + if event.project_id: + attributes["projectId"] = {"value": event.project_id, "type": "string"} + if event.duration is not None: + attributes["duration_ms"] = {"value": event.duration, "type": "double"} + if event.identify_actor_given_id: + attributes["actorId"] = {"value": event.identify_actor_given_id, "type": "string"} + if event.identify_actor_name: + attributes["actorName"] = {"value": event.identify_actor_name, "type": "string"} + if event.user_intent: + attributes["userIntent"] = {"value": event.user_intent, "type": "string"} + if event.server_version: + attributes["serverVersion"] = {"value": event.server_version, "type": "string"} + if event.client_version: + attributes["clientVersion"] = {"value": event.client_version, "type": "string"} + if event.is_error is not None: + attributes["isError"] = {"value": event.is_error, "type": "boolean"} + + return attributes + + def create_log_envelope(self, log: Dict[str, Any]) -> str: + """ + Create Sentry log envelope. + + Args: + log: Log dictionary + + Returns: + Envelope string + """ + # Envelope header + envelope_header = { + "event_id": trace_context.generate_span_id() + trace_context.generate_span_id(), + "sent_at": datetime.now(timezone.utc).isoformat() + } + + # Item header with ALL MANDATORY fields + item_header = { + "type": "log", + "item_count": 1, # MANDATORY - must match number of logs + "content_type": "application/vnd.sentry.items.log+json" # MANDATORY - exact string + } + + # Payload with CORRECT key + payload = { + "items": [log] # Changed from 'logs' to 'items' + } + + # Build envelope with TRAILING NEWLINE + return "\n".join([ + json.dumps(envelope_header), + json.dumps(item_header), + json.dumps(payload) + ]) + "\n" # Added required trailing newline + + def event_to_transaction(self, event: Event) -> Dict[str, Any]: + """ + Convert MCPCat event to Sentry transaction. + + Args: + event: MCPCat event + + Returns: + Sentry transaction dictionary + """ + # Calculate timestamps + end_timestamp = event.timestamp.timestamp() if event.timestamp else datetime.now().timestamp() + start_timestamp = end_timestamp - (event.duration / 1000) if event.duration else end_timestamp + + trace_id = trace_context.get_trace_id(event.session_id) + span_id = trace_context.generate_span_id() + + # Build transaction name + transaction_name = f"{event.event_type or 'mcp'} - {event.resource_name}" if event.resource_name else (event.event_type or "mcp.event") + + return { + "type": "transaction", + "event_id": trace_context.generate_span_id() + trace_context.generate_span_id(), + "timestamp": end_timestamp, + "start_timestamp": start_timestamp, + "transaction": transaction_name, + "contexts": { + "trace": { + "trace_id": trace_id, + "span_id": span_id, + "op": event.event_type or "mcp.event", + "status": "internal_error" if event.is_error else "ok" + } + }, + "tags": self.build_tags(event), + "extra": self.build_extra(event) + } + + def build_tags(self, event: Event) -> Dict[str, str]: + """ + Build tags for Sentry transaction/error. + + Args: + event: MCPCat event + + Returns: + Tags dictionary + """ + tags: Dict[str, str] = {} + + if self.environment: + tags["environment"] = self.environment + if self.release: + tags["release"] = self.release + if event.event_type: + tags["event_type"] = event.event_type + if event.resource_name: + tags["resource"] = event.resource_name + if event.server_name: + tags["server_name"] = event.server_name + if event.client_name: + tags["client_name"] = event.client_name + if event.identify_actor_given_id: + tags["actor_id"] = event.identify_actor_given_id + + return tags + + def build_extra(self, event: Event) -> Dict[str, Any]: + """ + Build extra data for Sentry transaction/error. + + Args: + event: MCPCat event + + Returns: + Extra data dictionary + """ + extra: Dict[str, Any] = {} + + if event.session_id: + extra["session_id"] = event.session_id + if event.project_id: + extra["project_id"] = event.project_id + if event.user_intent: + extra["user_intent"] = event.user_intent + if event.identify_actor_name: + extra["actor_name"] = event.identify_actor_name + if event.server_version: + extra["server_version"] = event.server_version + if event.client_version: + extra["client_version"] = event.client_version + if event.duration is not None: + extra["duration_ms"] = event.duration + if event.error: + extra["error"] = event.error + + return extra + + def event_to_error_event(self, event: Event, transaction: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + """ + Convert MCPCat event to Sentry error event. + + Args: + event: MCPCat event + transaction: Optional transaction for context + + Returns: + Sentry error event dictionary + """ + # Extract error message + error_message = "Unknown error" + error_type = "ToolCallError" + + if event.error: + if isinstance(event.error, str): + error_message = event.error + elif isinstance(event.error, dict): + if "message" in event.error: + error_message = str(event.error["message"]) + elif "error" in event.error: + error_message = str(event.error["error"]) + else: + error_message = json.dumps(event.error) + if "type" in event.error: + error_type = str(event.error["type"]) + + # Use same trace context as the transaction for correlation (if available) + trace_id = transaction["contexts"]["trace"]["trace_id"] if transaction else trace_context.get_trace_id(event.session_id) + span_id = trace_context.generate_span_id() + + timestamp = transaction["timestamp"] if transaction else (event.timestamp.timestamp() if event.timestamp else datetime.now().timestamp()) + + return { + "type": "event", + "event_id": trace_context.generate_span_id() + trace_context.generate_span_id(), + "timestamp": timestamp, + "level": "error", + "exception": { + "values": [{ + "type": error_type, + "value": error_message, + "mechanism": { + "type": "mcp_tool_call", + "handled": False + } + }] + }, + "contexts": { + "trace": { + "trace_id": trace_id, # Same trace ID as transaction/log for correlation + "span_id": span_id, + "parent_span_id": transaction["contexts"]["trace"]["span_id"] if transaction else None, # Link to transaction span if available + "op": transaction["contexts"]["trace"]["op"] if transaction else (event.event_type or "mcp.event") + }, + "mcp": { + "resource_name": event.resource_name, + "session_id": event.session_id, + "event_type": event.event_type, + "user_intent": event.user_intent + } + }, + "tags": self.build_tags(event), + "extra": self.build_extra(event), + "transaction": transaction["transaction"] if transaction else ( + f"{event.event_type or 'mcp'} - {event.resource_name}" if event.resource_name else (event.event_type or "mcp.event") + ) + } + + def create_transaction_envelope(self, transaction: Dict[str, Any]) -> str: + """ + Create Sentry transaction envelope. + + Args: + transaction: Transaction dictionary + + Returns: + Envelope string + """ + # Envelope header + envelope_header = { + "event_id": transaction["event_id"], + "sent_at": datetime.now(timezone.utc).isoformat() + } + + # Item header for transaction + item_header = { + "type": "transaction" + } + + # Build envelope (newline-separated JSON) + return "\n".join([ + json.dumps(envelope_header), + json.dumps(item_header), + json.dumps(transaction) + ]) + + def create_error_envelope(self, error_event: Dict[str, Any]) -> str: + """ + Create Sentry error event envelope. + + Args: + error_event: Error event dictionary + + Returns: + Envelope string + """ + # Envelope header + envelope_header = { + "event_id": error_event["event_id"], + "sent_at": datetime.now(timezone.utc).isoformat() + } + + # Item header for error event + item_header = { + "type": "event", + "content_type": "application/json" + } + + # Build envelope (newline-separated JSON) + return "\n".join([ + json.dumps(envelope_header), + json.dumps(item_header), + json.dumps(error_event) + ]) + diff --git a/src/mcpcat/modules/exporters/trace_context.py b/src/mcpcat/modules/exporters/trace_context.py new file mode 100644 index 0000000..4ba0a69 --- /dev/null +++ b/src/mcpcat/modules/exporters/trace_context.py @@ -0,0 +1,77 @@ +""" +Shared trace context management for all exporters. +Maintains one trace ID per session for proper observability tool correlation. +""" + +import random +from typing import Dict, Optional + + +class TraceContext: + """Manages trace and span ID generation for all exporters.""" + + def __init__(self): + self.session_traces: Dict[str, str] = {} + + def get_trace_id(self, session_id: Optional[str] = None) -> str: + """ + Get or create a trace ID for a session. + Returns the same trace ID for all events in a session. + + Args: + session_id: Optional session identifier + + Returns: + 32-character hex trace ID + """ + if not session_id: + # No session, return random trace ID + return self.random_hex(32) + + if session_id not in self.session_traces: + # First event in session, create new trace ID + self.session_traces[session_id] = self.random_hex(32) + + return self.session_traces[session_id] + + def generate_span_id(self) -> str: + """ + Generate a random span ID. + Always returns a new random ID for uniqueness. + + Returns: + 16-character hex span ID + """ + return self.random_hex(16) + + def random_hex(self, length: int) -> str: + """ + Generate random hex string of specified length. + Uses random.choices for performance (same approach as OpenTelemetry). + + Args: + length: Length of hex string to generate + + Returns: + Random hex string + """ + return "".join(random.choices("0123456789abcdef", k=length)) + + def clear_old_sessions(self, max_sessions: int = 1000) -> None: + """ + Optional: Clear old sessions to prevent memory leaks. + Can be called periodically for long-running servers. + + Args: + max_sessions: Maximum number of sessions to keep + """ + if len(self.session_traces) > max_sessions: + # Simple strategy: clear oldest half when limit exceeded + # In production, might want LRU cache or timestamp-based clearing + to_remove = len(self.session_traces) - (max_sessions // 2) + for key in list(self.session_traces.keys())[:to_remove]: + del self.session_traces[key] + + +# Export singleton instance +trace_context = TraceContext() diff --git a/src/mcpcat/modules/telemetry.py b/src/mcpcat/modules/telemetry.py new file mode 100644 index 0000000..f7bf9d9 --- /dev/null +++ b/src/mcpcat/modules/telemetry.py @@ -0,0 +1,100 @@ +"""Telemetry manager for exporting events to observability platforms.""" + +from concurrent.futures import ThreadPoolExecutor +from typing import Dict, Optional +from ..types import Event, ExporterConfig, OTLPExporterConfig, DatadogExporterConfig, SentryExporterConfig +from .exporters import Exporter +from .logging import write_to_log + + +class TelemetryManager: + """Manages telemetry exporters and coordinates event export.""" + + def __init__(self, exporter_configs: dict[str, ExporterConfig], executor: ThreadPoolExecutor): + """ + Initialize the telemetry manager with configured exporters. + + Args: + exporter_configs: Dictionary of exporter configurations + executor: Thread pool executor for async exports + """ + self.exporters: Dict[str, Exporter] = {} + self.executor = executor + self._initialize_exporters(exporter_configs) + + def _initialize_exporters(self, configs: dict[str, ExporterConfig]) -> None: + """Initialize all configured exporters.""" + for name, config in configs.items(): + try: + exporter = self._create_exporter(name, config) + if exporter: + self.exporters[name] = exporter + write_to_log(f"Initialized telemetry exporter: {name} (type: {config['type']})") + except Exception as e: + write_to_log(f"Failed to initialize exporter {name}: {e}") + + def _create_exporter(self, name: str, config: ExporterConfig) -> Optional[Exporter]: + """ + Factory method to create appropriate exporter based on type. + + Args: + name: Name of the exporter + config: Exporter configuration + + Returns: + Exporter instance or None if type is unknown + """ + exporter_type = config.get("type") + + if exporter_type == "otlp": + from .exporters.otlp import OTLPExporter + return OTLPExporter(config) + elif exporter_type == "datadog": + from .exporters.datadog import DatadogExporter + return DatadogExporter(config) + elif exporter_type == "sentry": + from .exporters.sentry import SentryExporter + return SentryExporter(config) + else: + write_to_log(f"Unknown exporter type: {exporter_type}") + return None + + def export(self, event: Event) -> None: + """ + Export event to all configured exporters (non-blocking). + + Args: + event: Event to export + """ + if not self.exporters: + return + + # Submit export tasks to thread pool for each exporter + for name, exporter in self.exporters.items(): + self.executor.submit(self._safe_export, name, exporter, event) + + def _safe_export(self, name: str, exporter: Exporter, event: Event) -> None: + """ + Safely export an event, catching and logging any errors. + + Args: + name: Name of the exporter + exporter: Exporter instance + event: Event to export + """ + try: + exporter.export(event) + write_to_log(f"Successfully exported event {event.id} to {name}") + except Exception as e: + # Log error but don't propagate - telemetry should never crash the main app + write_to_log(f"Telemetry export failed for {name}: {e}") + + def get_exporter_count(self) -> int: + """Get the number of active exporters.""" + return len(self.exporters) + + def destroy(self) -> None: + """Clean up any resources used by exporters.""" + # Currently no cleanup needed, but this is here for future use + # (e.g., if we add connection pooling or background threads) + pass \ No newline at end of file diff --git a/src/mcpcat/types.py b/src/mcpcat/types.py index e36dbf5..04e743d 100644 --- a/src/mcpcat/types.py +++ b/src/mcpcat/types.py @@ -2,9 +2,9 @@ from collections.abc import Awaitable, Callable from dataclasses import dataclass, field -from datetime import datetime, timezone +from datetime import datetime from enum import Enum -from typing import Any, Dict, Optional, Set, TypedDict +from typing import Any, Dict, Optional, Set, TypedDict, Literal, Union from mcpcat_api import PublishEventRequest from pydantic import BaseModel @@ -13,8 +13,6 @@ # Type alias for redaction function RedactionFunction = Callable[[str], str | Awaitable[str]] - - @dataclass class UserIdentity(): """User identification data.""" @@ -67,6 +65,40 @@ class ToolRegistration: tracked: bool = False wrapped: bool = False + +# Telemetry Exporter Configuration Types + +class OTLPExporterConfig(TypedDict, total=False): + """Configuration for OpenTelemetry Protocol (OTLP) exporter.""" + type: Literal["otlp"] + endpoint: str # Optional, defaults to http://localhost:4318/v1/traces + protocol: Literal["http/protobuf", "grpc"] # Optional, defaults to http/protobuf + headers: dict[str, str] # Optional custom headers + compression: Literal["gzip", "none"] # Optional compression + + +class DatadogExporterConfig(TypedDict): + """Configuration for Datadog exporter.""" + type: Literal["datadog"] + api_key: str # Required - Datadog API key + site: str # Required - Datadog site (e.g., datadoghq.com, datadoghq.eu) + service: str # Required - Service name for Datadog + env: Optional[str] # Optional environment name + + +class SentryExporterConfig(TypedDict): + """Configuration for Sentry exporter.""" + type: Literal["sentry"] + dsn: str # Required - Sentry DSN + environment: Optional[str] # Optional environment name + release: Optional[str] # Optional release version + enable_tracing: Optional[bool] # Optional, defaults to True + + +# Union type for all exporter configurations +ExporterConfig = Union[OTLPExporterConfig, DatadogExporterConfig, SentryExporterConfig] + + @dataclass class MCPCatOptions: """Configuration options for MCPCat.""" @@ -75,6 +107,7 @@ class MCPCatOptions: enable_tool_call_context: bool = True identify: IdentifyFunction | None = None redact_sensitive_information: RedactionFunction | None = None + exporters: dict[str, ExporterConfig] | None = None @dataclass class MCPCatData: @@ -85,7 +118,7 @@ class MCPCatData: last_activity: datetime identified_sessions: dict[str, UserIdentity] options: MCPCatOptions - + # Dynamic tracking fields (initialized on demand) tool_registry: Dict[str, ToolRegistration] = field(default_factory=dict) wrapped_tools: Set[str] = field(default_factory=set) From 7ef65e67eb2dd733214c6ca74ae4bd964e67981c Mon Sep 17 00:00:00 2001 From: Kashish Hora Date: Wed, 20 Aug 2025 14:09:25 -0400 Subject: [PATCH 2/5] make traces use deterministic IDs --- src/mcpcat/modules/exporters/datadog.py | 4 +- src/mcpcat/modules/exporters/otlp.py | 2 +- src/mcpcat/modules/exporters/sentry.py | 14 ++-- src/mcpcat/modules/exporters/trace_context.py | 67 ++++++++++--------- 4 files changed, 47 insertions(+), 40 deletions(-) diff --git a/src/mcpcat/modules/exporters/datadog.py b/src/mcpcat/modules/exporters/datadog.py index d48ce79..bccd909 100644 --- a/src/mcpcat/modules/exporters/datadog.py +++ b/src/mcpcat/modules/exporters/datadog.py @@ -141,8 +141,8 @@ def event_to_log(self, event: Event) -> Dict[str, Any]: "timestamp": timestamp_ms, "status": "error" if event.is_error else "info", "dd": { - "trace_id": trace_context.get_trace_id(event.session_id), - "span_id": trace_context.generate_span_id() + "trace_id": trace_context.get_datadog_trace_id(event.session_id), + "span_id": trace_context.get_datadog_span_id(event.id) }, "mcp": { "session_id": event.session_id, diff --git a/src/mcpcat/modules/exporters/otlp.py b/src/mcpcat/modules/exporters/otlp.py index dd40575..165b1f0 100644 --- a/src/mcpcat/modules/exporters/otlp.py +++ b/src/mcpcat/modules/exporters/otlp.py @@ -114,7 +114,7 @@ def _convert_to_otlp_span(self, event: Event) -> Dict[str, Any]: return { "traceId": trace_context.get_trace_id(event.session_id), - "spanId": trace_context.generate_span_id(), + "spanId": trace_context.get_span_id(event.id), "name": event.event_type or "mcp.event", "kind": 2, # SPAN_KIND_SERVER "startTimeUnixNano": str(start_time_nanos), diff --git a/src/mcpcat/modules/exporters/sentry.py b/src/mcpcat/modules/exporters/sentry.py index 15ad2c0..59debb8 100644 --- a/src/mcpcat/modules/exporters/sentry.py +++ b/src/mcpcat/modules/exporters/sentry.py @@ -170,6 +170,9 @@ def event_to_log(self, event: Event) -> Dict[str, Any]: """ timestamp = event.timestamp.timestamp() if event.timestamp else datetime.now().timestamp() trace_id = trace_context.get_trace_id(event.session_id) + + # Generate deterministic event_id for Sentry + event_id = trace_context.get_span_id(event.id) + trace_context.get_span_id(event.id) # Build message message = f"MCP {event.event_type or 'event'}: {event.resource_name}" if event.resource_name else f"MCP {event.event_type or 'event'}" @@ -177,6 +180,7 @@ def event_to_log(self, event: Event) -> Dict[str, Any]: return { "timestamp": timestamp, "trace_id": trace_id, + "event_id": event_id, "level": "error" if event.is_error else "info", "body": message, "attributes": self.build_log_attributes(event) @@ -235,7 +239,7 @@ def create_log_envelope(self, log: Dict[str, Any]) -> str: """ # Envelope header envelope_header = { - "event_id": trace_context.generate_span_id() + trace_context.generate_span_id(), + "event_id": log["event_id"], "sent_at": datetime.now(timezone.utc).isoformat() } @@ -273,14 +277,14 @@ def event_to_transaction(self, event: Event) -> Dict[str, Any]: start_timestamp = end_timestamp - (event.duration / 1000) if event.duration else end_timestamp trace_id = trace_context.get_trace_id(event.session_id) - span_id = trace_context.generate_span_id() + span_id = trace_context.get_span_id(event.id) # Build transaction name transaction_name = f"{event.event_type or 'mcp'} - {event.resource_name}" if event.resource_name else (event.event_type or "mcp.event") return { "type": "transaction", - "event_id": trace_context.generate_span_id() + trace_context.generate_span_id(), + "event_id": trace_context.get_span_id(event.id) + trace_context.get_span_id(), "timestamp": end_timestamp, "start_timestamp": start_timestamp, "transaction": transaction_name, @@ -386,13 +390,13 @@ def event_to_error_event(self, event: Event, transaction: Optional[Dict[str, Any # Use same trace context as the transaction for correlation (if available) trace_id = transaction["contexts"]["trace"]["trace_id"] if transaction else trace_context.get_trace_id(event.session_id) - span_id = trace_context.generate_span_id() + span_id = trace_context.get_span_id(event.id) timestamp = transaction["timestamp"] if transaction else (event.timestamp.timestamp() if event.timestamp else datetime.now().timestamp()) return { "type": "event", - "event_id": trace_context.generate_span_id() + trace_context.generate_span_id(), + "event_id": trace_context.get_span_id(event.id) + trace_context.get_span_id(), "timestamp": timestamp, "level": "error", "exception": { diff --git a/src/mcpcat/modules/exporters/trace_context.py b/src/mcpcat/modules/exporters/trace_context.py index 4ba0a69..1a23b1c 100644 --- a/src/mcpcat/modules/exporters/trace_context.py +++ b/src/mcpcat/modules/exporters/trace_context.py @@ -3,16 +3,14 @@ Maintains one trace ID per session for proper observability tool correlation. """ -import random -from typing import Dict, Optional +import hashlib +import secrets +from typing import Optional class TraceContext: """Manages trace and span ID generation for all exporters.""" - def __init__(self): - self.session_traces: Dict[str, str] = {} - def get_trace_id(self, session_id: Optional[str] = None) -> str: """ Get or create a trace ID for a session. @@ -26,52 +24,57 @@ def get_trace_id(self, session_id: Optional[str] = None) -> str: """ if not session_id: # No session, return random trace ID - return self.random_hex(32) - - if session_id not in self.session_traces: - # First event in session, create new trace ID - self.session_traces[session_id] = self.random_hex(32) + return secrets.token_hex(16) - return self.session_traces[session_id] + # Hash session ID to get deterministic trace ID + return hashlib.sha256(session_id.encode()).hexdigest()[:32] - def generate_span_id(self) -> str: + def get_span_id(self, event_id: Optional[str] = None) -> str: """ - Generate a random span ID. - Always returns a new random ID for uniqueness. + Generate a span ID from event ID. + Returns deterministic span ID based on event ID. + + Args: + event_id: Optional event identifier Returns: 16-character hex span ID """ - return self.random_hex(16) + if not event_id: + # No event ID, return random span ID + return secrets.token_hex(8) - def random_hex(self, length: int) -> str: + # Hash event ID to get deterministic span ID + return hashlib.sha256(event_id.encode()).hexdigest()[:16] + + def get_datadog_trace_id(self, session_id: Optional[str] = None) -> str: """ - Generate random hex string of specified length. - Uses random.choices for performance (same approach as OpenTelemetry). + Get Datadog-compatible numeric trace ID. Args: - length: Length of hex string to generate + session_id: Optional session identifier Returns: - Random hex string + Numeric string trace ID for Datadog """ - return "".join(random.choices("0123456789abcdef", k=length)) + hex_id = self.get_trace_id(session_id) + # Take last 16 chars (64 bits) and convert to decimal + return str(int(hex_id[16:32], 16)) - def clear_old_sessions(self, max_sessions: int = 1000) -> None: + def get_datadog_span_id(self, event_id: Optional[str] = None) -> str: """ - Optional: Clear old sessions to prevent memory leaks. - Can be called periodically for long-running servers. + Get Datadog-compatible numeric span ID. Args: - max_sessions: Maximum number of sessions to keep + event_id: Optional event identifier + + Returns: + Numeric string span ID for Datadog """ - if len(self.session_traces) > max_sessions: - # Simple strategy: clear oldest half when limit exceeded - # In production, might want LRU cache or timestamp-based clearing - to_remove = len(self.session_traces) - (max_sessions // 2) - for key in list(self.session_traces.keys())[:to_remove]: - del self.session_traces[key] + hex_id = self.get_span_id(event_id) + # Convert full 16 chars to decimal + return str(int(hex_id, 16)) # Export singleton instance -trace_context = TraceContext() +trace_context = TraceContext() \ No newline at end of file From 387a69ff5fd436846b760191fb96685513fac172 Mon Sep 17 00:00:00 2001 From: Kashish Hora Date: Wed, 20 Aug 2025 14:18:56 -0400 Subject: [PATCH 3/5] remove threading, fix some PR comments --- src/mcpcat/__init__.py | 10 +-- src/mcpcat/modules/exporters/datadog.py | 111 ++++++++++++++---------- src/mcpcat/modules/exporters/otlp.py | 4 +- src/mcpcat/modules/telemetry.py | 11 +-- src/mcpcat/types.py | 2 +- 5 files changed, 76 insertions(+), 62 deletions(-) diff --git a/src/mcpcat/__init__.py b/src/mcpcat/__init__.py index b58fbd6..d05054a 100644 --- a/src/mcpcat/__init__.py +++ b/src/mcpcat/__init__.py @@ -62,13 +62,9 @@ def track(server: Any, project_id: str | None = None, options: MCPCatOptions | N # Initialize telemetry if exporters configured if options.exporters: from mcpcat.modules.telemetry import TelemetryManager - from mcpcat.modules.event_queue import event_queue, set_telemetry_manager + from mcpcat.modules.event_queue import set_telemetry_manager - # Share the event queue's executor for consistency - telemetry_manager = TelemetryManager( - options.exporters, - event_queue.executor - ) + telemetry_manager = TelemetryManager(options.exporters) set_telemetry_manager(telemetry_manager) write_to_log(f"Telemetry initialized with {len(options.exporters)} exporter(s)") @@ -77,7 +73,7 @@ def track(server: Any, project_id: str | None = None, options: MCPCatOptions | N session_info = get_session_info(lowlevel_server) data = MCPCatData( session_id=session_id, - project_id=project_id or "", # Use empty string if None for compatibility + project_id=project_id, last_activity=datetime.now(timezone.utc), session_info=session_info, identified_sessions=dict(), diff --git a/src/mcpcat/modules/exporters/datadog.py b/src/mcpcat/modules/exporters/datadog.py index bccd909..f6670f3 100644 --- a/src/mcpcat/modules/exporters/datadog.py +++ b/src/mcpcat/modules/exporters/datadog.py @@ -1,9 +1,8 @@ """Datadog exporter for MCPCat telemetry.""" import json -from concurrent.futures import ThreadPoolExecutor from datetime import datetime -from typing import Dict, List, Any, Optional +from typing import Dict, List, Any import requests from ...types import Event, DatadogExporterConfig @@ -29,7 +28,9 @@ def __init__(self, config: DatadogExporterConfig): self.env = config.get("env", "production") # Build API endpoints based on site - site_clean = self.site.replace("https://", "").replace("http://", "").rstrip("/") + site_clean = ( + self.site.replace("https://", "").replace("http://", "").rstrip("/") + ) self.logs_url = f"https://http-intake.logs.{site_clean}/api/v2/logs" self.metrics_url = f"https://api.{site_clean}/api/v1/series" @@ -53,17 +54,13 @@ def export(self, event: Event) -> None: # Debug: Log the metrics payload write_to_log(f"DatadogExporter: Metrics URL: {self.metrics_url}") - write_to_log(f"DatadogExporter: Metrics payload: {json.dumps({'series': metrics})}") + write_to_log( + f"DatadogExporter: Metrics payload: {json.dumps({'series': metrics})}" + ) - # Use ThreadPoolExecutor to send both requests in parallel - with ThreadPoolExecutor(max_workers=2) as executor: - # Submit both requests - logs_future = executor.submit(self._send_logs, [log]) - metrics_future = executor.submit(self._send_metrics, metrics) - - # Wait for both to complete (results will be logged in the methods) - logs_future.result() - metrics_future.result() + # Send logs and metrics synchronously + self._send_logs([log]) + self._send_metrics(metrics) def _send_logs(self, logs: List[Dict[str, Any]]) -> None: """Send logs to Datadog.""" @@ -72,15 +69,17 @@ def _send_logs(self, logs: List[Dict[str, Any]]) -> None: self.logs_url, headers={ "DD-API-KEY": self.api_key, - "Content-Type": "application/json" + "Content-Type": "application/json", }, json=logs, - timeout=10 + timeout=10, ) if not response.ok: error_body = response.text - write_to_log(f"Datadog logs failed - Status: {response.status_code}, Body: {error_body}") + write_to_log( + f"Datadog logs failed - Status: {response.status_code}, Body: {error_body}" + ) else: write_to_log(f"Datadog logs success - Status: {response.status_code}") except Exception as err: @@ -93,18 +92,22 @@ def _send_metrics(self, metrics: List[Dict[str, Any]]) -> None: self.metrics_url, headers={ "DD-API-KEY": self.api_key, - "Content-Type": "application/json" + "Content-Type": "application/json", }, json={"series": metrics}, - timeout=10 + timeout=10, ) if not response.ok: error_body = response.text - write_to_log(f"Datadog metrics failed - Status: {response.status_code}, Body: {error_body}") + write_to_log( + f"Datadog metrics failed - Status: {response.status_code}, Body: {error_body}" + ) else: response_body = response.text - write_to_log(f"Datadog metrics success - Status: {response.status_code}, Body: {response_body}") + write_to_log( + f"Datadog metrics success - Status: {response.status_code}, Body: {response_body}" + ) except Exception as err: write_to_log(f"Datadog metrics network error: {err}") @@ -131,7 +134,11 @@ def event_to_log(self, event: Event) -> Dict[str, Any]: tags.append("error:true") # Get timestamp in milliseconds - timestamp_ms = int(event.timestamp.timestamp() * 1000) if event.timestamp else int(datetime.now().timestamp() * 1000) + timestamp_ms = ( + int(event.timestamp.timestamp() * 1000) + if event.timestamp + else int(datetime.now().timestamp() * 1000) + ) log: Dict[str, Any] = { "message": f"{event.event_type or 'unknown'} - {event.resource_name or 'unknown'}", @@ -142,7 +149,7 @@ def event_to_log(self, event: Event) -> Dict[str, Any]: "status": "error" if event.is_error else "info", "dd": { "trace_id": trace_context.get_datadog_trace_id(event.session_id), - "span_id": trace_context.get_datadog_span_id(event.id) + "span_id": trace_context.get_datadog_span_id(event.id), }, "mcp": { "session_id": event.session_id, @@ -158,14 +165,18 @@ def event_to_log(self, event: Event) -> Dict[str, Any]: "server_name": event.server_name, "server_version": event.server_version, "is_error": event.is_error, - "error": event.error - } + "error": event.error, + }, } # Add error at root level if it exists if event.is_error and event.error: log["error"] = { - "message": event.error if isinstance(event.error, str) else json.dumps(event.error) + "message": ( + event.error + if isinstance(event.error, str) + else json.dumps(event.error) + ) } return log @@ -183,7 +194,11 @@ def event_to_metrics(self, event: Event) -> List[Dict[str, Any]]: metrics: List[Dict[str, Any]] = [] # Get timestamp in seconds (Unix timestamp) - timestamp = int(event.timestamp.timestamp()) if event.timestamp else int(datetime.now().timestamp()) + timestamp = ( + int(event.timestamp.timestamp()) + if event.timestamp + else int(datetime.now().timestamp()) + ) tags: List[str] = [f"service:{self.service}"] @@ -196,29 +211,35 @@ def event_to_metrics(self, event: Event) -> List[Dict[str, Any]]: tags.append(f"resource:{event.resource_name}") # Event count metric - metrics.append({ - "metric": "mcp.events.count", - "type": "count", - "points": [[timestamp, 1]], - "tags": tags - }) + metrics.append( + { + "metric": "mcp.events.count", + "type": "count", + "points": [[timestamp, 1]], + "tags": tags, + } + ) # Duration metric (only if duration exists) if event.duration is not None: - metrics.append({ - "metric": "mcp.event.duration", - "type": "gauge", - "points": [[timestamp, event.duration]], - "tags": tags - }) + metrics.append( + { + "metric": "mcp.event.duration", + "type": "gauge", + "points": [[timestamp, event.duration]], + "tags": tags, + } + ) # Error count metric if event.is_error: - metrics.append({ - "metric": "mcp.errors.count", - "type": "count", - "points": [[timestamp, 1]], - "tags": tags - }) + metrics.append( + { + "metric": "mcp.errors.count", + "type": "count", + "points": [[timestamp, 1]], + "tags": tags, + } + ) - return metrics \ No newline at end of file + return metrics diff --git a/src/mcpcat/modules/exporters/otlp.py b/src/mcpcat/modules/exporters/otlp.py index 165b1f0..f33a827 100644 --- a/src/mcpcat/modules/exporters/otlp.py +++ b/src/mcpcat/modules/exporters/otlp.py @@ -87,9 +87,9 @@ def export(self, event: Event) -> None: write_to_log(f"Successfully exported event to OTLP: {event.id}") except requests.exceptions.RequestException as e: - raise Exception(f"OTLP export failed: {e}") + write_to_log(f"OTLP export failed: {e}") except Exception as e: - raise Exception(f"OTLP export error: {e}") + write_to_log(f"OTLP export error: {e}") def _convert_to_otlp_span(self, event: Event) -> Dict[str, Any]: """ diff --git a/src/mcpcat/modules/telemetry.py b/src/mcpcat/modules/telemetry.py index f7bf9d9..1c00a43 100644 --- a/src/mcpcat/modules/telemetry.py +++ b/src/mcpcat/modules/telemetry.py @@ -1,6 +1,5 @@ """Telemetry manager for exporting events to observability platforms.""" -from concurrent.futures import ThreadPoolExecutor from typing import Dict, Optional from ..types import Event, ExporterConfig, OTLPExporterConfig, DatadogExporterConfig, SentryExporterConfig from .exporters import Exporter @@ -10,16 +9,14 @@ class TelemetryManager: """Manages telemetry exporters and coordinates event export.""" - def __init__(self, exporter_configs: dict[str, ExporterConfig], executor: ThreadPoolExecutor): + def __init__(self, exporter_configs: dict[str, ExporterConfig]): """ Initialize the telemetry manager with configured exporters. Args: exporter_configs: Dictionary of exporter configurations - executor: Thread pool executor for async exports """ self.exporters: Dict[str, Exporter] = {} - self.executor = executor self._initialize_exporters(exporter_configs) def _initialize_exporters(self, configs: dict[str, ExporterConfig]) -> None: @@ -61,7 +58,7 @@ def _create_exporter(self, name: str, config: ExporterConfig) -> Optional[Export def export(self, event: Event) -> None: """ - Export event to all configured exporters (non-blocking). + Export event to all configured exporters. Args: event: Event to export @@ -69,9 +66,9 @@ def export(self, event: Event) -> None: if not self.exporters: return - # Submit export tasks to thread pool for each exporter + # Export to each exporter synchronously for name, exporter in self.exporters.items(): - self.executor.submit(self._safe_export, name, exporter, event) + self._safe_export(name, exporter, event) def _safe_export(self, name: str, exporter: Exporter, event: Event) -> None: """ diff --git a/src/mcpcat/types.py b/src/mcpcat/types.py index 04e743d..19addb9 100644 --- a/src/mcpcat/types.py +++ b/src/mcpcat/types.py @@ -112,7 +112,7 @@ class MCPCatOptions: @dataclass class MCPCatData: """Internal data structure for tracking.""" - project_id: str + project_id: str | None session_id: str session_info: SessionInfo last_activity: datetime From c34a86c7a103e6a750d0424295319b250c94b4b6 Mon Sep 17 00:00:00 2001 From: Kashish Hora Date: Wed, 20 Aug 2025 14:38:19 -0400 Subject: [PATCH 4/5] process mcpcat events before telemetry --- src/mcpcat/modules/event_queue.py | 55 +++++++++++++++++++++---------- 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/src/mcpcat/modules/event_queue.py b/src/mcpcat/modules/event_queue.py index 57286d1..f0fbd50 100644 --- a/src/mcpcat/modules/event_queue.py +++ b/src/mcpcat/modules/event_queue.py @@ -63,7 +63,9 @@ def add(self, event: UnredactedEvent) -> None: self.queue.put_nowait(event) except queue.Full: # Queue is full, drop the new event - write_to_log(f"Event queue full, dropping event {event.id or 'unknown'} of type {event.event_type}") + write_to_log( + f"Event queue full, dropping event {event.id or 'unknown'} of type {event.event_type}" + ) def _worker(self) -> None: """Worker thread that processes events from the queue.""" @@ -82,7 +84,9 @@ def _worker(self) -> None: try: self.queue.put_nowait(event) except queue.Full: - write_to_log(f"Could not requeue event {event.id or 'unknown'} - queue full") + write_to_log( + f"Could not requeue event {event.id or 'unknown'} - queue full" + ) except queue.Empty: continue @@ -103,25 +107,30 @@ def _process_event(self, event: UnredactedEvent) -> None: event = redacted_event event.redaction_fn = None # Clear the function to avoid reprocessing except Exception as error: - write_to_log(f"WARNING: Dropping event {event.id or 'unknown'} due to redaction failure: {error}") + write_to_log( + f"WARNING: Dropping event {event.id or 'unknown'} due to redaction failure: {error}" + ) return # Skip this event if redaction fails if event: event.id = event.id or generate_prefixed_ksuid("evt") - # Export to telemetry backends if configured (non-blocking) + # Send to MCPCat API only if project_id exists + if event.project_id: + self._send_event(event) + + # Export to telemetry backends if configured if _telemetry_manager: try: _telemetry_manager.export(event) except Exception as e: write_to_log(f"Telemetry export submission failed: {e}") - # Send to MCPCat API only if project_id exists - if event.project_id: - self._send_event(event) elif not _telemetry_manager: # Only warn if we have neither MCPCat nor telemetry configured - write_to_log("Warning: Event has no project_id and no telemetry exporters configured") + write_to_log( + "Warning: Event has no project_id and no telemetry exporters configured" + ) def _send_event(self, event: Event, retries: int = 0) -> None: """Send event to API.""" @@ -142,7 +151,9 @@ def _send_event(self, event: Event, retries: int = 0) -> None: time.sleep(2**retries) self._send_event(event, retries + 1) else: - write_to_log(f"Failed to send event {event.id} after {self.max_retries} retries") + write_to_log( + f"Failed to send event {event.id} after {self.max_retries} retries" + ) def get_stats(self) -> dict[str, Any]: """Get queue stats for monitoring.""" @@ -162,7 +173,9 @@ def destroy(self) -> None: if self.queue.qsize() > 0: # If there are events in queue, wait 5 seconds wait_time = 5.0 - write_to_log(f"Shutting down with {self.queue.qsize()} events in queue, waiting up to {wait_time}s") + write_to_log( + f"Shutting down with {self.queue.qsize()} events in queue, waiting up to {wait_time}s" + ) else: # If queue is empty, just wait 1 second for in-flight requests wait_time = 1.0 @@ -181,10 +194,10 @@ def destroy(self) -> None: # Global telemetry manager instance (optional) -_telemetry_manager: Optional['TelemetryManager'] = None +_telemetry_manager: Optional["TelemetryManager"] = None -def set_telemetry_manager(manager: Optional['TelemetryManager']) -> None: +def set_telemetry_manager(manager: Optional["TelemetryManager"]) -> None: """ Set the global telemetry manager instance. @@ -194,7 +207,9 @@ def set_telemetry_manager(manager: Optional['TelemetryManager']) -> None: global _telemetry_manager _telemetry_manager = manager if manager: - write_to_log(f"Telemetry manager set with {manager.get_exporter_count()} exporter(s)") + write_to_log( + f"Telemetry manager set with {manager.get_exporter_count()} exporter(s)" + ) # Global event queue instance @@ -235,14 +250,18 @@ def publish_event(server: Any, event: UnredactedEvent) -> None: """Publish an event to the queue.""" if not event.duration: if event.timestamp: - event.duration = int((datetime.now(timezone.utc).timestamp() - event.timestamp.timestamp()) * 1000) + event.duration = int( + (datetime.now(timezone.utc).timestamp() - event.timestamp.timestamp()) + * 1000 + ) else: event.duration = None - data = get_server_tracking_data(server) if not data: - write_to_log("Warning: Server tracking data not found. Event will not be published.") + write_to_log( + "Warning: Server tracking data not found. Event will not be published." + ) return session_info = get_session_info(server, data) @@ -254,7 +273,9 @@ def publish_event(server: Any, event: UnredactedEvent) -> None: # Merge data, ensuring project_id from data takes precedence merged_data = {**event_data, **session_data} - merged_data['project_id'] = data.project_id # Override with tracking data's project_id + merged_data["project_id"] = ( + data.project_id + ) # Override with tracking data's project_id full_event = UnredactedEvent( **merged_data, From f8d14ad7045c3bcd494484ae674fbf25d34deac8 Mon Sep 17 00:00:00 2001 From: Kashish Hora Date: Wed, 20 Aug 2025 14:40:42 -0400 Subject: [PATCH 5/5] fix quick logging conditional --- src/mcpcat/modules/event_queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mcpcat/modules/event_queue.py b/src/mcpcat/modules/event_queue.py index f0fbd50..dcf2414 100644 --- a/src/mcpcat/modules/event_queue.py +++ b/src/mcpcat/modules/event_queue.py @@ -126,8 +126,8 @@ def _process_event(self, event: UnredactedEvent) -> None: except Exception as e: write_to_log(f"Telemetry export submission failed: {e}") - elif not _telemetry_manager: - # Only warn if we have neither MCPCat nor telemetry configured + if not event.project_id and not _telemetry_manager: + # Warn if we have neither MCPCat nor telemetry configured write_to_log( "Warning: Event has no project_id and no telemetry exporters configured" )