8
8
import time
9
9
from datetime import datetime , timezone
10
10
from concurrent .futures import ThreadPoolExecutor
11
- from typing import Any
11
+ from typing import Any , Optional , TYPE_CHECKING
12
+
13
+ if TYPE_CHECKING :
14
+ from .telemetry import TelemetryManager
12
15
13
16
from mcpcat_api import ApiClient , Configuration , EventsApi
14
17
from mcpcat .modules .constants import EVENT_ID_PREFIX , MCPCAT_API_URL
@@ -30,21 +33,21 @@ def __init__(self, api_client=None):
30
33
self .max_retries = 3
31
34
self .max_queue_size = 10000 # Prevent unbounded growth
32
35
self .concurrency = 5 # Max parallel requests
33
-
36
+
34
37
# Allow injection of api_client for testing
35
38
if api_client is None :
36
39
config = Configuration (host = MCPCAT_API_URL )
37
40
api_client_instance = ApiClient (configuration = config )
38
41
self .api_client = EventsApi (api_client = api_client_instance )
39
42
else :
40
43
self .api_client = api_client
41
-
44
+
42
45
self ._shutdown = False
43
46
self ._shutdown_event = threading .Event ()
44
-
47
+
45
48
# Thread pool for processing events
46
49
self .executor = ThreadPoolExecutor (max_workers = self .concurrency )
47
-
50
+
48
51
# Start worker thread
49
52
self .worker_thread = threading .Thread (target = self ._worker , daemon = True )
50
53
self .worker_thread .start ()
@@ -60,15 +63,17 @@ def add(self, event: UnredactedEvent) -> None:
60
63
self .queue .put_nowait (event )
61
64
except queue .Full :
62
65
# Queue is full, drop the new event
63
- write_to_log (f"Event queue full, dropping event { event .id or 'unknown' } of type { event .event_type } " )
66
+ write_to_log (
67
+ f"Event queue full, dropping event { event .id or 'unknown' } of type { event .event_type } "
68
+ )
64
69
65
70
def _worker (self ) -> None :
66
71
"""Worker thread that processes events from the queue."""
67
72
while not self ._shutdown_event .is_set ():
68
73
try :
69
74
# Wait for an event with timeout
70
75
event = self .queue .get (timeout = 0.1 )
71
-
76
+
72
77
# Submit event processing to thread pool
73
78
# The executor will queue it if all workers are busy
74
79
try :
@@ -79,8 +84,10 @@ def _worker(self) -> None:
79
84
try :
80
85
self .queue .put_nowait (event )
81
86
except queue .Full :
82
- write_to_log (f"Could not requeue event { event .id or 'unknown' } - queue full" )
83
-
87
+ write_to_log (
88
+ f"Could not requeue event { event .id or 'unknown' } - queue full"
89
+ )
90
+
84
91
except queue .Empty :
85
92
continue
86
93
except Exception as e :
@@ -100,12 +107,30 @@ def _process_event(self, event: UnredactedEvent) -> None:
100
107
event = redacted_event
101
108
event .redaction_fn = None # Clear the function to avoid reprocessing
102
109
except Exception as error :
103
- write_to_log (f"WARNING: Dropping event { event .id or 'unknown' } due to redaction failure: { error } " )
110
+ write_to_log (
111
+ f"WARNING: Dropping event { event .id or 'unknown' } due to redaction failure: { error } "
112
+ )
104
113
return # Skip this event if redaction fails
105
114
106
115
if event :
107
116
event .id = event .id or generate_prefixed_ksuid ("evt" )
108
- self ._send_event (event )
117
+
118
+ # Send to MCPCat API only if project_id exists
119
+ if event .project_id :
120
+ self ._send_event (event )
121
+
122
+ # Export to telemetry backends if configured
123
+ if _telemetry_manager :
124
+ try :
125
+ _telemetry_manager .export (event )
126
+ except Exception as e :
127
+ write_to_log (f"Telemetry export submission failed: { e } " )
128
+
129
+ if not event .project_id and not _telemetry_manager :
130
+ # Warn if we have neither MCPCat nor telemetry configured
131
+ write_to_log (
132
+ "Warning: Event has no project_id and no telemetry exporters configured"
133
+ )
109
134
110
135
def _send_event (self , event : Event , retries : int = 0 ) -> None :
111
136
"""Send event to API."""
@@ -126,7 +151,9 @@ def _send_event(self, event: Event, retries: int = 0) -> None:
126
151
time .sleep (2 ** retries )
127
152
self ._send_event (event , retries + 1 )
128
153
else :
129
- write_to_log (f"Failed to send event { event .id } after { self .max_retries } retries" )
154
+ write_to_log (
155
+ f"Failed to send event { event .id } after { self .max_retries } retries"
156
+ )
130
157
131
158
def get_stats (self ) -> dict [str , Any ]:
132
159
"""Get queue stats for monitoring."""
@@ -146,7 +173,9 @@ def destroy(self) -> None:
146
173
if self .queue .qsize () > 0 :
147
174
# If there are events in queue, wait 5 seconds
148
175
wait_time = 5.0
149
- write_to_log (f"Shutting down with { self .queue .qsize ()} events in queue, waiting up to { wait_time } s" )
176
+ write_to_log (
177
+ f"Shutting down with { self .queue .qsize ()} events in queue, waiting up to { wait_time } s"
178
+ )
150
179
else :
151
180
# If queue is empty, just wait 1 second for in-flight requests
152
181
wait_time = 1.0
@@ -164,22 +193,41 @@ def destroy(self) -> None:
164
193
write_to_log (f"Shutdown complete. { remaining } events were not processed." )
165
194
166
195
196
+ # Global telemetry manager instance (optional)
197
+ _telemetry_manager : Optional ["TelemetryManager" ] = None
198
+
199
+
200
+ def set_telemetry_manager (manager : Optional ["TelemetryManager" ]) -> None :
201
+ """
202
+ Set the global telemetry manager instance.
203
+
204
+ Args:
205
+ manager: TelemetryManager instance or None to disable telemetry
206
+ """
207
+ global _telemetry_manager
208
+ _telemetry_manager = manager
209
+ if manager :
210
+ write_to_log (
211
+ f"Telemetry manager set with { manager .get_exporter_count ()} exporter(s)"
212
+ )
213
+
214
+
167
215
# Global event queue instance
168
216
event_queue = EventQueue ()
169
217
170
218
171
219
def _shutdown_handler (signum , frame ):
172
220
"""Handle shutdown signals."""
173
-
221
+
174
222
write_to_log ("Received shutdown signal, gracefully shutting down..." )
175
-
223
+
176
224
# Reset signal handlers to default behavior to avoid recursive calls
177
225
signal .signal (signal .SIGINT , signal .SIG_DFL )
178
226
signal .signal (signal .SIGTERM , signal .SIG_DFL )
179
-
227
+
180
228
# Perform graceful shutdown
181
229
event_queue .destroy ()
182
-
230
+
183
231
# Force exit after graceful shutdown
184
232
os ._exit (0 )
185
233
@@ -202,28 +250,35 @@ def publish_event(server: Any, event: UnredactedEvent) -> None:
202
250
"""Publish an event to the queue."""
203
251
if not event .duration :
204
252
if event .timestamp :
205
- event .duration = int ((datetime .now (timezone .utc ).timestamp () - event .timestamp .timestamp ()) * 1000 )
253
+ event .duration = int (
254
+ (datetime .now (timezone .utc ).timestamp () - event .timestamp .timestamp ())
255
+ * 1000
256
+ )
206
257
else :
207
258
event .duration = None
208
259
209
-
210
260
data = get_server_tracking_data (server )
211
261
if not data :
212
- write_to_log ("Warning: Server tracking data not found. Event will not be published." )
262
+ write_to_log (
263
+ "Warning: Server tracking data not found. Event will not be published."
264
+ )
213
265
return
214
266
215
267
session_info = get_session_info (server , data )
216
268
217
269
# Create full event with all required fields
218
- # Merge event data with session info
270
+ # Merge event data with session info
219
271
event_data = event .model_dump (exclude_none = True )
220
272
session_data = session_info .model_dump (exclude_none = True )
221
-
273
+
274
+ # Merge data, ensuring project_id from data takes precedence
222
275
merged_data = {** event_data , ** session_data }
223
-
276
+ merged_data ["project_id" ] = (
277
+ data .project_id
278
+ ) # Override with tracking data's project_id
279
+
224
280
full_event = UnredactedEvent (
225
281
** merged_data ,
226
- project_id = data .project_id ,
227
282
redaction_fn = data .options .redact_sensitive_information ,
228
283
)
229
284
0 commit comments