Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies = [
"mcp>=1.2.0",
"mcpcat-api==0.1.4",
"pydantic>=2.0.0",
"requests>=2.31.0",
]

[project.urls]
Expand Down
49 changes: 41 additions & 8 deletions src/mcpcat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,33 @@
)


def track(server: Any, project_id: str, options: MCPCatOptions | None = None) -> Any:
def track(server: Any, project_id: str | None = None, options: MCPCatOptions | None = None) -> Any:
"""
Initialize MCPCat tracking with optional telemetry export.

Args:
server: MCP server instance to track
project_id: MCPCat project ID (optional if using telemetry-only mode)
options: Configuration options including telemetry exporters

Returns:
The server instance with tracking enabled

Raises:
ValueError: If neither project_id nor exporters are provided
TypeError: If server is not a compatible MCP server instance
"""
# Use default options if not provided
if options is None:
options = MCPCatOptions()

# Validate configuration
if not project_id and not options.exporters:
raise ValueError(
"Either project_id or exporters must be provided. "
"Use project_id for MCPCat, exporters for telemetry-only mode, or both."
)

# Validate server compatibility
if not is_compatible_server(server):
raise TypeError(
Expand All @@ -37,6 +59,15 @@ def track(server: Any, project_id: str, options: MCPCatOptions | None = None) ->
if is_fastmcp:
lowlevel_server = server._mcp_server

# Initialize telemetry if exporters configured
if options.exporters:
from mcpcat.modules.telemetry import TelemetryManager
from mcpcat.modules.event_queue import set_telemetry_manager

telemetry_manager = TelemetryManager(options.exporters)
set_telemetry_manager(telemetry_manager)
write_to_log(f"Telemetry initialized with {len(options.exporters)} exporter(s)")

# Create and store tracking data
session_id = new_session_id()
session_info = get_session_info(lowlevel_server)
Expand All @@ -53,13 +84,12 @@ def track(server: Any, project_id: str, options: MCPCatOptions | None = None) ->
try:
# Always initialize dynamic tracking for complete tool coverage
from mcpcat.modules.overrides.monkey_patch import apply_monkey_patches

# Initialize the dynamic tracking system by setting the flag
if not data.tracker_initialized:
data.tracker_initialized = True
from mcpcat.modules.logging import write_to_log
write_to_log(f"Dynamic tracking initialized for server {id(lowlevel_server)}")

# Apply appropriate tracking method based on server type
if is_fastmcp:
# For FastMCP servers, use monkey-patching for tool tracking
Expand All @@ -70,12 +100,15 @@ def track(server: Any, project_id: str, options: MCPCatOptions | None = None) ->
else:
# For low-level servers, use the traditional overrides (no monkey patching needed)
override_lowlevel_mcp_server(lowlevel_server, data)

write_to_log(f"MCPCat initialized with dynamic tracking for session {session_id} on project {project_id}")


if project_id:
write_to_log(f"MCPCat initialized with dynamic tracking for session {session_id} on project {project_id}")
else:
write_to_log(f"MCPCat initialized in telemetry-only mode for session {session_id}")

except Exception as e:
write_to_log(f"Error initializing MCPCat: {e}")

return server

__all__ = [
Expand Down
103 changes: 79 additions & 24 deletions src/mcpcat/modules/event_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
import time
from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor
from typing import Any
from typing import Any, Optional, TYPE_CHECKING

if TYPE_CHECKING:
from .telemetry import TelemetryManager

from mcpcat_api import ApiClient, Configuration, EventsApi
from mcpcat.modules.constants import EVENT_ID_PREFIX, MCPCAT_API_URL
Expand All @@ -30,21 +33,21 @@ def __init__(self, api_client=None):
self.max_retries = 3
self.max_queue_size = 10000 # Prevent unbounded growth
self.concurrency = 5 # Max parallel requests

# Allow injection of api_client for testing
if api_client is None:
config = Configuration(host=MCPCAT_API_URL)
api_client_instance = ApiClient(configuration=config)
self.api_client = EventsApi(api_client=api_client_instance)
else:
self.api_client = api_client

self._shutdown = False
self._shutdown_event = threading.Event()

# Thread pool for processing events
self.executor = ThreadPoolExecutor(max_workers=self.concurrency)

# Start worker thread
self.worker_thread = threading.Thread(target=self._worker, daemon=True)
self.worker_thread.start()
Expand All @@ -60,15 +63,17 @@ def add(self, event: UnredactedEvent) -> None:
self.queue.put_nowait(event)
except queue.Full:
# Queue is full, drop the new event
write_to_log(f"Event queue full, dropping event {event.id or 'unknown'} of type {event.event_type}")
write_to_log(
f"Event queue full, dropping event {event.id or 'unknown'} of type {event.event_type}"
)

def _worker(self) -> None:
"""Worker thread that processes events from the queue."""
while not self._shutdown_event.is_set():
try:
# Wait for an event with timeout
event = self.queue.get(timeout=0.1)

# Submit event processing to thread pool
# The executor will queue it if all workers are busy
try:
Expand All @@ -79,8 +84,10 @@ def _worker(self) -> None:
try:
self.queue.put_nowait(event)
except queue.Full:
write_to_log(f"Could not requeue event {event.id or 'unknown'} - queue full")

write_to_log(
f"Could not requeue event {event.id or 'unknown'} - queue full"
)

except queue.Empty:
continue
except Exception as e:
Expand All @@ -100,12 +107,30 @@ def _process_event(self, event: UnredactedEvent) -> None:
event = redacted_event
event.redaction_fn = None # Clear the function to avoid reprocessing
except Exception as error:
write_to_log(f"WARNING: Dropping event {event.id or 'unknown'} due to redaction failure: {error}")
write_to_log(
f"WARNING: Dropping event {event.id or 'unknown'} due to redaction failure: {error}"
)
return # Skip this event if redaction fails

if event:
event.id = event.id or generate_prefixed_ksuid("evt")
self._send_event(event)

# Send to MCPCat API only if project_id exists
if event.project_id:
self._send_event(event)

# Export to telemetry backends if configured
if _telemetry_manager:
try:
_telemetry_manager.export(event)
except Exception as e:
write_to_log(f"Telemetry export submission failed: {e}")

if not event.project_id and not _telemetry_manager:
# Warn if we have neither MCPCat nor telemetry configured
write_to_log(
"Warning: Event has no project_id and no telemetry exporters configured"
)

def _send_event(self, event: Event, retries: int = 0) -> None:
"""Send event to API."""
Expand All @@ -126,7 +151,9 @@ def _send_event(self, event: Event, retries: int = 0) -> None:
time.sleep(2**retries)
self._send_event(event, retries + 1)
else:
write_to_log(f"Failed to send event {event.id} after {self.max_retries} retries")
write_to_log(
f"Failed to send event {event.id} after {self.max_retries} retries"
)

def get_stats(self) -> dict[str, Any]:
"""Get queue stats for monitoring."""
Expand All @@ -146,7 +173,9 @@ def destroy(self) -> None:
if self.queue.qsize() > 0:
# If there are events in queue, wait 5 seconds
wait_time = 5.0
write_to_log(f"Shutting down with {self.queue.qsize()} events in queue, waiting up to {wait_time}s")
write_to_log(
f"Shutting down with {self.queue.qsize()} events in queue, waiting up to {wait_time}s"
)
else:
# If queue is empty, just wait 1 second for in-flight requests
wait_time = 1.0
Expand All @@ -164,22 +193,41 @@ def destroy(self) -> None:
write_to_log(f"Shutdown complete. {remaining} events were not processed.")


# Global telemetry manager instance (optional)
_telemetry_manager: Optional["TelemetryManager"] = None


def set_telemetry_manager(manager: Optional["TelemetryManager"]) -> None:
"""
Set the global telemetry manager instance.

Args:
manager: TelemetryManager instance or None to disable telemetry
"""
global _telemetry_manager
_telemetry_manager = manager
if manager:
write_to_log(
f"Telemetry manager set with {manager.get_exporter_count()} exporter(s)"
)


# Global event queue instance
event_queue = EventQueue()


def _shutdown_handler(signum, frame):
"""Handle shutdown signals."""

write_to_log("Received shutdown signal, gracefully shutting down...")

# Reset signal handlers to default behavior to avoid recursive calls
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)

# Perform graceful shutdown
event_queue.destroy()

# Force exit after graceful shutdown
os._exit(0)

Expand All @@ -202,28 +250,35 @@ def publish_event(server: Any, event: UnredactedEvent) -> None:
"""Publish an event to the queue."""
if not event.duration:
if event.timestamp:
event.duration = int((datetime.now(timezone.utc).timestamp() - event.timestamp.timestamp()) * 1000)
event.duration = int(
(datetime.now(timezone.utc).timestamp() - event.timestamp.timestamp())
* 1000
)
else:
event.duration = None


data = get_server_tracking_data(server)
if not data:
write_to_log("Warning: Server tracking data not found. Event will not be published.")
write_to_log(
"Warning: Server tracking data not found. Event will not be published."
)
return

session_info = get_session_info(server, data)

# Create full event with all required fields
# Merge event data with session info
# Merge event data with session info
event_data = event.model_dump(exclude_none=True)
session_data = session_info.model_dump(exclude_none=True)


# Merge data, ensuring project_id from data takes precedence
merged_data = {**event_data, **session_data}

merged_data["project_id"] = (
data.project_id
) # Override with tracking data's project_id

full_event = UnredactedEvent(
**merged_data,
project_id=data.project_id,
redaction_fn=data.options.redact_sensitive_information,
)

Expand Down
22 changes: 22 additions & 0 deletions src/mcpcat/modules/exporters/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Base exporter interface for telemetry exporters."""

from abc import ABC, abstractmethod
from ...types import Event


class Exporter(ABC):
"""Abstract base class for telemetry exporters."""

@abstractmethod
def export(self, event: Event) -> None:
"""
Export an event to the telemetry backend.

Args:
event: The MCPCat event to export

Note:
This method should handle all errors internally and never
raise exceptions that could affect the main MCP server.
"""
pass
Loading