Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ mcpcat.track(
)
```

### Disabling log retention

MCPcat saves debug logs by default. In production environments or when debugging is not needed, this can create unnecessary disk I/O and storage usage. To prevent this, disable debug_mode using the following environment variable:

```bash
export MCPCAT_DEBUG_MODE="false"
```

Learn more about our free and open source [telemetry integrations](https://docs.mcpcat.io/telemetry/integrations).

## Free for open source
Expand Down
22 changes: 17 additions & 5 deletions src/mcpcat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
)


def track(server: Any, project_id: str | None = None, options: MCPCatOptions | None = None) -> Any:
def track(
server: Any, project_id: str | None = None, options: MCPCatOptions | None = None
) -> Any:
"""
Initialize MCPCat tracking with optional telemetry export.

Expand Down Expand Up @@ -88,29 +90,39 @@ def track(server: Any, project_id: str | None = None, options: MCPCatOptions | N
# Initialize the dynamic tracking system by setting the flag
if not data.tracker_initialized:
data.tracker_initialized = True
write_to_log(f"Dynamic tracking initialized for server {id(lowlevel_server)}")
write_to_log(
f"Dynamic tracking initialized for server {id(lowlevel_server)}"
)

# Apply appropriate tracking method based on server type
if is_fastmcp:
# For FastMCP servers, use monkey-patching for tool tracking
apply_monkey_patches(server, data)
# Only apply minimal overrides for non-tool events (like initialize, list_tools display)
from mcpcat.modules.overrides.mcp_server import override_lowlevel_mcp_server_minimal
from mcpcat.modules.overrides.mcp_server import (
override_lowlevel_mcp_server_minimal,
)

override_lowlevel_mcp_server_minimal(lowlevel_server, data)
else:
# For low-level servers, use the traditional overrides (no monkey patching needed)
override_lowlevel_mcp_server(lowlevel_server, data)

if project_id:
write_to_log(f"MCPCat initialized with dynamic tracking for session {session_id} on project {project_id}")
write_to_log(
f"MCPCat initialized with dynamic tracking for session {session_id} on project {project_id}"
)
else:
write_to_log(f"MCPCat initialized in telemetry-only mode for session {session_id}")
write_to_log(
f"MCPCat initialized in telemetry-only mode for session {session_id}"
)

except Exception as e:
write_to_log(f"Error initializing MCPCat: {e}")

return server


__all__ = [
# Main API
"track",
Expand Down
45 changes: 26 additions & 19 deletions src/mcpcat/modules/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,50 +24,54 @@ def is_fastmcp_server(server: Any) -> bool:
# A FastMCP server should have both _mcp_server and _tool_manager
return hasattr(server, "_mcp_server") and hasattr(server, "_tool_manager")


def has_required_fastmcp_attributes(server: Any) -> bool:
"""Check if a FastMCP server has all required attributes for monkey patching.

This validates that the server has all the attributes that monkey_patch.py expects.
"""
# Check for _tool_manager and its required methods
if not hasattr(server, "_tool_manager"):
return False

tool_manager = server._tool_manager
required_tool_manager_methods = ["add_tool", "call_tool", "list_tools"]
for method in required_tool_manager_methods:
if not hasattr(tool_manager, method) or not callable(getattr(tool_manager, method)):
if not hasattr(tool_manager, method) or not callable(
getattr(tool_manager, method)
):
return False

# Check for _tools dict on tool_manager (used for tracking existing tools)
if not hasattr(tool_manager, "_tools") or not isinstance(tool_manager._tools, dict):
return False

# Check for add_tool method on the server itself (used for adding get_more_tools)
if not hasattr(server, "add_tool") or not callable(server.add_tool):
return False

# Check for _mcp_server (used for event tracking and session management)
if not hasattr(server, "_mcp_server"):
return False

return True


def has_neccessary_attributes(server: Any) -> bool:
"""Check if the server has necessary attributes for compatibility."""
required_methods = ["list_tools", "call_tool"]

# Check for core methods that both FastMCP and Server implementations have
for method in required_methods:
if not hasattr(server, method):
return False

# For FastMCP servers, verify all required attributes for monkey patching
if is_fastmcp_server(server):
# Use the comprehensive FastMCP validation
if not has_required_fastmcp_attributes(server):
return False

# Additional checks for request handling
# Use dir() to avoid triggering property getters that might raise exceptions
if "request_context" not in dir(server._mcp_server):
Expand All @@ -90,7 +94,7 @@ def has_neccessary_attributes(server: Any) -> bool:
return False
if not isinstance(server.request_handlers, dict):
return False

return True


Expand All @@ -105,26 +109,29 @@ def get_mcp_compatible_error_message(error: Any) -> str:
return str(error)
return str(error)


def is_mcp_error_response(response: ServerResult) -> tuple[bool, str]:
"""Check if the response is an MCP error."""
try:
# ServerResult is a RootModel, so we need to access its root attribute
if hasattr(response, 'root'):
if hasattr(response, "root"):
result = response.root
# Check if it's a CallToolResult with an error
if hasattr(result, 'isError') and result.isError:
if hasattr(result, "isError") and result.isError:
# Extract error message from content
if hasattr(result, 'content') and result.content:
if hasattr(result, "content") and result.content:
# content is a list of TextContent/ImageContent/EmbeddedResource
for content_item in result.content:
# Check if it has a text attribute (TextContent)
if hasattr(content_item, 'text'):
if hasattr(content_item, "text"):
return True, str(content_item.text)
# Check if it has type and content attributes
elif hasattr(content_item, 'type') and hasattr(content_item, 'content'):
if content_item.type == 'text':
elif hasattr(content_item, "type") and hasattr(
content_item, "content"
):
if content_item.type == "text":
return True, str(content_item.content)

# If no text content found, stringify the first item
if result.content and len(result.content) > 0:
return True, str(result.content[0])
Expand All @@ -136,4 +143,4 @@ def is_mcp_error_response(response: ServerResult) -> tuple[bool, str]:
return False, ""
except Exception as e:
# Log unexpected errors but still return a valid response
return False, f"Error checking response: {str(e)}"
return False, f"Error checking response: {str(e)}"
2 changes: 1 addition & 1 deletion src/mcpcat/modules/context_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def add_context_parameter_to_schema(schema: dict[str, Any]) -> dict[str, Any]:
# Add context parameter
modified_schema["properties"]["context"] = {
"type": "string",
"description": "Describe why you are calling this tool and how it fits into your overall task"
"description": "Describe why you are calling this tool and how it fits into your overall task",
}

# Add to required fields
Expand Down
2 changes: 1 addition & 1 deletion src/mcpcat/modules/exporters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ def export(self, event: Event) -> None:
This method should handle all errors internally and never
raise exceptions that could affect the main MCP server.
"""
pass
pass
126 changes: 63 additions & 63 deletions src/mcpcat/modules/exporters/otlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,17 @@ def export(self, event: Event) -> None:
{
"scope": {
"name": "mcpcat",
"version": event.mcpcat_version or "0.1.0"
"version": event.mcpcat_version or "0.1.0",
},
"spans": [span]
"spans": [span],
}
]
],
}
]
}

# Send to OTLP collector
response = self.session.post(
self.endpoint,
json=otlp_request,
timeout=5
)
response = self.session.post(self.endpoint, json=otlp_request, timeout=5)
response.raise_for_status()

write_to_log(f"Successfully exported event to OTLP: {event.id}")
Expand Down Expand Up @@ -121,8 +117,8 @@ def _convert_to_otlp_span(self, event: Event) -> Dict[str, Any]:
"endTimeUnixNano": str(end_time_nanos),
"attributes": self._get_span_attributes(event),
"status": {
"code": 2 if getattr(event, 'is_error', False) else 1 # ERROR : OK
}
"code": 2 if getattr(event, "is_error", False) else 1 # ERROR : OK
},
}

def _get_resource_attributes(self, event: Event) -> List[Dict[str, Any]]:
Expand All @@ -138,28 +134,30 @@ def _get_resource_attributes(self, event: Event) -> List[Dict[str, Any]]:
attributes = []

if event.server_name:
attributes.append({
"key": "service.name",
"value": {"stringValue": event.server_name}
})
attributes.append(
{"key": "service.name", "value": {"stringValue": event.server_name}}
)

if event.server_version:
attributes.append({
"key": "service.version",
"value": {"stringValue": event.server_version}
})
attributes.append(
{
"key": "service.version",
"value": {"stringValue": event.server_version},
}
)

# Add SDK information
attributes.append({
"key": "telemetry.sdk.name",
"value": {"stringValue": "mcpcat-python"}
})
attributes.append(
{"key": "telemetry.sdk.name", "value": {"stringValue": "mcpcat-python"}}
)

if event.mcpcat_version:
attributes.append({
"key": "telemetry.sdk.version",
"value": {"stringValue": event.mcpcat_version}
})
attributes.append(
{
"key": "telemetry.sdk.version",
"value": {"stringValue": event.mcpcat_version},
}
)

return attributes

Expand All @@ -177,63 +175,65 @@ def _get_span_attributes(self, event: Event) -> List[Dict[str, Any]]:

# Add MCP-specific attributes
if event.event_type:
attributes.append({
"key": "mcp.event_type",
"value": {"stringValue": event.event_type}
})
attributes.append(
{"key": "mcp.event_type", "value": {"stringValue": event.event_type}}
)

if event.session_id:
attributes.append({
"key": "mcp.session_id",
"value": {"stringValue": event.session_id}
})
attributes.append(
{"key": "mcp.session_id", "value": {"stringValue": event.session_id}}
)

if event.project_id:
attributes.append({
"key": "mcp.project_id",
"value": {"stringValue": event.project_id}
})
attributes.append(
{"key": "mcp.project_id", "value": {"stringValue": event.project_id}}
)

# Add resource name (for tools, prompts, resources)
if event.resource_name:
attributes.append({
"key": "mcp.resource_name",
"value": {"stringValue": event.resource_name}
})
attributes.append(
{
"key": "mcp.resource_name",
"value": {"stringValue": event.resource_name},
}
)

# Add user intent if available
if event.user_intent:
attributes.append({
"key": "mcp.user_intent",
"value": {"stringValue": event.user_intent}
})
attributes.append(
{"key": "mcp.user_intent", "value": {"stringValue": event.user_intent}}
)

# Add actor information
if event.identify_actor_given_id:
attributes.append({
"key": "mcp.actor_id",
"value": {"stringValue": event.identify_actor_given_id}
})
attributes.append(
{
"key": "mcp.actor_id",
"value": {"stringValue": event.identify_actor_given_id},
}
)

if event.identify_actor_name:
attributes.append({
"key": "mcp.actor_name",
"value": {"stringValue": event.identify_actor_name}
})
attributes.append(
{
"key": "mcp.actor_name",
"value": {"stringValue": event.identify_actor_name},
}
)

# Add client information
if event.client_name:
attributes.append({
"key": "mcp.client_name",
"value": {"stringValue": event.client_name}
})
attributes.append(
{"key": "mcp.client_name", "value": {"stringValue": event.client_name}}
)

if event.client_version:
attributes.append({
"key": "mcp.client_version",
"value": {"stringValue": event.client_version}
})
attributes.append(
{
"key": "mcp.client_version",
"value": {"stringValue": event.client_version},
}
)

# Filter out empty attributes
return [attr for attr in attributes if attr["value"].get("stringValue")]

Loading