Skip to content

Commit 360a8eb

Browse files
committed
feat: add infra diagnostics tool to the Moose Dev MCP
refactor
1 parent 8f46497 commit 360a8eb

File tree

11 files changed

+2102
-1
lines changed

11 files changed

+2102
-1
lines changed

apps/framework-cli/src/mcp/server.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ use rmcp::{
1414
use std::sync::Arc;
1515

1616
use super::embedded_docs;
17-
use super::tools::{create_error_result, get_source, infra_map, logs, query_olap, sample_stream};
17+
use super::tools::{
18+
create_error_result, get_source, infra_issues, infra_map, logs, query_olap, sample_stream,
19+
};
1820
use crate::cli::processing_coordinator::ProcessingCoordinator;
1921
use crate::infrastructure::olap::clickhouse::config::ClickHouseConfig;
2022
use crate::infrastructure::redis::redis_client::RedisClient;
@@ -84,6 +86,7 @@ impl ServerHandler for MooseMcpHandler {
8486
tools: vec![
8587
logs::tool_definition(),
8688
infra_map::tool_definition(),
89+
infra_issues::tool_definition(),
8790
query_olap::tool_definition(),
8891
sample_stream::tool_definition(),
8992
get_source::tool_definition(),
@@ -108,6 +111,12 @@ impl ServerHandler for MooseMcpHandler {
108111
self.redis_client.clone(),
109112
)
110113
.await),
114+
"diagnose_infrastructure" => Ok(infra_issues::handle_call(
115+
param.arguments.as_ref(),
116+
self.redis_client.clone(),
117+
&self.clickhouse_config,
118+
)
119+
.await),
111120
"query_olap" => Ok(query_olap::handle_call(
112121
&self.clickhouse_config,
113122
param.arguments.as_ref(),
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
//! Diagnostic provider for checking system-wide errors
2+
3+
use log::debug;
4+
use serde_json::{json, Map, Value};
5+
6+
use super::{Component, DiagnoseError, DiagnosticProvider, Issue, Severity};
7+
use crate::infrastructure::olap::clickhouse::client::ClickHouseClient;
8+
use crate::infrastructure::olap::clickhouse::config::ClickHouseConfig;
9+
use crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine;
10+
11+
/// Query timeout for diagnostic checks (30 seconds)
12+
const DIAGNOSTIC_QUERY_TIMEOUT_SECS: u64 = 30;
13+
14+
/// Diagnostic provider for checking system-wide errors
15+
pub struct ErrorStatsDiagnostic;
16+
17+
#[async_trait::async_trait]
18+
impl DiagnosticProvider for ErrorStatsDiagnostic {
19+
fn name(&self) -> &str {
20+
"ErrorStatsDiagnostic"
21+
}
22+
23+
fn applicable_to(&self, _component: &Component, _engine: Option<&ClickhouseEngine>) -> bool {
24+
// Error stats are system-wide, check only once (for first component)
25+
true
26+
}
27+
28+
async fn diagnose(
29+
&self,
30+
component: &Component,
31+
_engine: Option<&ClickhouseEngine>,
32+
config: &ClickHouseConfig,
33+
_since: Option<&str>,
34+
) -> Result<Vec<Issue>, DiagnoseError> {
35+
let client = ClickHouseClient::new(config)
36+
.map_err(|e| DiagnoseError::ClickHouseConnection(format!("{}", e)))?;
37+
38+
// Get recent errors with significant counts
39+
let query = "SELECT
40+
name,
41+
value,
42+
last_error_time,
43+
last_error_message
44+
FROM system.errors
45+
WHERE value > 0
46+
ORDER BY value DESC
47+
LIMIT 10
48+
FORMAT JSON";
49+
50+
debug!("Executing errors query: {}", query);
51+
52+
let result = tokio::time::timeout(
53+
std::time::Duration::from_secs(DIAGNOSTIC_QUERY_TIMEOUT_SECS),
54+
client.execute_sql(query),
55+
)
56+
.await
57+
.map_err(|_| DiagnoseError::QueryTimeout(DIAGNOSTIC_QUERY_TIMEOUT_SECS))?
58+
.map_err(|e| DiagnoseError::QueryFailed(format!("{}", e)))?;
59+
60+
let json_response: Value = serde_json::from_str(&result)
61+
.map_err(|e| DiagnoseError::ParseError(format!("{}", e)))?;
62+
63+
let data = json_response
64+
.get("data")
65+
.and_then(|v| v.as_array())
66+
.ok_or_else(|| {
67+
DiagnoseError::ParseError("Missing 'data' field in response".to_string())
68+
})?;
69+
70+
let mut issues = Vec::new();
71+
72+
for row in data {
73+
let name = row
74+
.get("name")
75+
.and_then(|v| v.as_str())
76+
.unwrap_or("UNKNOWN");
77+
let value = row.get("value").and_then(|v| v.as_u64()).unwrap_or(0);
78+
let last_error_message = row
79+
.get("last_error_message")
80+
.and_then(|v| v.as_str())
81+
.unwrap_or("");
82+
83+
// Skip if no occurrences
84+
if value == 0 {
85+
continue;
86+
}
87+
88+
let severity = if value > 100 {
89+
Severity::Error
90+
} else if value > 10 {
91+
Severity::Warning
92+
} else {
93+
Severity::Info
94+
};
95+
96+
let mut details = Map::new();
97+
details.insert("error_name".to_string(), json!(name));
98+
details.insert("occurrence_count".to_string(), json!(value));
99+
details.insert(
100+
"last_error_time".to_string(),
101+
row.get("last_error_time").cloned().unwrap_or(json!("")),
102+
);
103+
if !last_error_message.is_empty() {
104+
details.insert("last_error_message".to_string(), json!(last_error_message));
105+
}
106+
107+
issues.push(Issue {
108+
severity,
109+
source: "system.errors".to_string(),
110+
component: component.clone(),
111+
error_type: "system_error".to_string(),
112+
message: format!("Error '{}' occurred {} times. Last: {}", name, value, last_error_message),
113+
details,
114+
suggested_action: "Review error pattern and recent query logs. Check ClickHouse server logs for more details.".to_string(),
115+
related_queries: vec![
116+
"SELECT * FROM system.errors WHERE value > 0 ORDER BY value DESC".to_string(),
117+
format!("SELECT * FROM system.query_log WHERE exception LIKE '%{}%' ORDER BY event_time DESC LIMIT 10", name),
118+
],
119+
});
120+
}
121+
122+
Ok(issues)
123+
}
124+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
//! Diagnostic provider for checking merge failures from system.metrics
2+
3+
use log::debug;
4+
use serde_json::{json, Map, Value};
5+
6+
use super::{Component, DiagnoseError, DiagnosticProvider, Issue, Severity};
7+
use crate::infrastructure::olap::clickhouse::client::ClickHouseClient;
8+
use crate::infrastructure::olap::clickhouse::config::ClickHouseConfig;
9+
use crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine;
10+
11+
/// Query timeout for diagnostic checks (30 seconds)
12+
const DIAGNOSTIC_QUERY_TIMEOUT_SECS: u64 = 30;
13+
14+
/// Diagnostic provider for checking merge failures from system.metrics
15+
pub struct MergeFailureDiagnostic;
16+
17+
#[async_trait::async_trait]
18+
impl DiagnosticProvider for MergeFailureDiagnostic {
19+
fn name(&self) -> &str {
20+
"merge_failures"
21+
}
22+
23+
fn applicable_to(&self, _component: &Component, _engine: Option<&ClickhouseEngine>) -> bool {
24+
// Merge failures are system-wide but we report per-table for context
25+
true
26+
}
27+
28+
async fn diagnose(
29+
&self,
30+
component: &Component,
31+
_engine: Option<&ClickhouseEngine>,
32+
config: &ClickHouseConfig,
33+
_since: Option<&str>,
34+
) -> Result<Vec<Issue>, DiagnoseError> {
35+
let client = ClickHouseClient::new(config)
36+
.map_err(|e| DiagnoseError::ClickHouseConnection(format!("{}", e)))?;
37+
38+
let mut issues = Vec::new();
39+
40+
// Check system.metrics for background merge failures
41+
// Note: This is a system-wide metric, not per-table, but we report it per-table for context
42+
let metrics_query =
43+
"SELECT value FROM system.metrics WHERE metric = 'FailedBackgroundMerges' FORMAT JSON";
44+
45+
debug!("Executing merge failure metrics query: {}", metrics_query);
46+
47+
let result = tokio::time::timeout(
48+
std::time::Duration::from_secs(DIAGNOSTIC_QUERY_TIMEOUT_SECS),
49+
client.execute_sql(metrics_query),
50+
)
51+
.await
52+
.map_err(|_| DiagnoseError::QueryTimeout(DIAGNOSTIC_QUERY_TIMEOUT_SECS))?
53+
.map_err(|e| DiagnoseError::QueryFailed(format!("{}", e)))?;
54+
55+
let json_response: Value = serde_json::from_str(&result)
56+
.map_err(|e| DiagnoseError::ParseError(format!("{}", e)))?;
57+
58+
let failed_merges = json_response
59+
.get("data")
60+
.and_then(|v| v.as_array())
61+
.and_then(|arr| arr.first())
62+
.and_then(|row| row.get("value"))
63+
.and_then(|v| v.as_u64())
64+
.unwrap_or(0);
65+
66+
if failed_merges > 0 {
67+
let severity = if failed_merges > 10 {
68+
Severity::Error
69+
} else {
70+
Severity::Warning
71+
};
72+
73+
let mut details = Map::new();
74+
details.insert("failed_merges".to_string(), json!(failed_merges));
75+
76+
issues.push(Issue {
77+
severity,
78+
source: "system.metrics".to_string(),
79+
component: component.clone(),
80+
error_type: "merge_failures".to_string(),
81+
message: format!(
82+
"Background merge failures detected: {} failed merges currently in system. This may affect table maintenance.",
83+
failed_merges
84+
),
85+
details,
86+
suggested_action: "Check system.errors for merge failure details. Review disk space and memory availability. Consider increasing merge-related settings if failures persist.".to_string(),
87+
related_queries: vec![
88+
"SELECT * FROM system.errors WHERE name LIKE '%Merge%' ORDER BY last_error_time DESC LIMIT 10".to_string(),
89+
"SELECT * FROM system.metrics WHERE metric LIKE '%Merge%'".to_string(),
90+
format!(
91+
"SELECT * FROM system.merges WHERE database = '{}' AND table = '{}'",
92+
config.db_name, component.name
93+
),
94+
],
95+
});
96+
}
97+
98+
Ok(issues)
99+
}
100+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
//! Diagnostic provider for checking stuck background merges
2+
3+
use log::debug;
4+
use serde_json::{json, Map, Value};
5+
6+
use super::{Component, DiagnoseError, DiagnosticProvider, Issue, Severity};
7+
use crate::infrastructure::olap::clickhouse::client::ClickHouseClient;
8+
use crate::infrastructure::olap::clickhouse::config::ClickHouseConfig;
9+
use crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine;
10+
11+
/// Query timeout for diagnostic checks (30 seconds)
12+
const DIAGNOSTIC_QUERY_TIMEOUT_SECS: u64 = 30;
13+
14+
/// Diagnostic provider for checking stuck background merges
15+
pub struct MergeDiagnostic;
16+
17+
#[async_trait::async_trait]
18+
impl DiagnosticProvider for MergeDiagnostic {
19+
fn name(&self) -> &str {
20+
"MergeDiagnostic"
21+
}
22+
23+
fn applicable_to(&self, _component: &Component, _engine: Option<&ClickhouseEngine>) -> bool {
24+
true
25+
}
26+
27+
async fn diagnose(
28+
&self,
29+
component: &Component,
30+
_engine: Option<&ClickhouseEngine>,
31+
config: &ClickHouseConfig,
32+
_since: Option<&str>,
33+
) -> Result<Vec<Issue>, DiagnoseError> {
34+
let client = ClickHouseClient::new(config)
35+
.map_err(|e| DiagnoseError::ClickHouseConnection(format!("{}", e)))?;
36+
37+
// Check for long-running merges
38+
let query = format!(
39+
"SELECT
40+
elapsed,
41+
progress,
42+
num_parts,
43+
result_part_name,
44+
total_size_bytes_compressed
45+
FROM system.merges
46+
WHERE database = '{}' AND table = '{}'
47+
AND elapsed > 300
48+
ORDER BY elapsed DESC
49+
FORMAT JSON",
50+
config.db_name, component.name
51+
);
52+
53+
debug!("Executing merges query: {}", query);
54+
55+
let result = tokio::time::timeout(
56+
std::time::Duration::from_secs(DIAGNOSTIC_QUERY_TIMEOUT_SECS),
57+
client.execute_sql(&query),
58+
)
59+
.await
60+
.map_err(|_| DiagnoseError::QueryTimeout(DIAGNOSTIC_QUERY_TIMEOUT_SECS))?
61+
.map_err(|e| DiagnoseError::QueryFailed(format!("{}", e)))?;
62+
63+
let json_response: Value = serde_json::from_str(&result)
64+
.map_err(|e| DiagnoseError::ParseError(format!("{}", e)))?;
65+
66+
let data = json_response
67+
.get("data")
68+
.and_then(|v| v.as_array())
69+
.ok_or_else(|| {
70+
DiagnoseError::ParseError("Missing 'data' field in response".to_string())
71+
})?;
72+
73+
let mut issues = Vec::new();
74+
75+
for row in data {
76+
let elapsed = row.get("elapsed").and_then(|v| v.as_f64()).unwrap_or(0.0);
77+
let progress = row.get("progress").and_then(|v| v.as_f64()).unwrap_or(0.0);
78+
79+
let severity = if elapsed > 1800.0 {
80+
// 30 minutes
81+
Severity::Error
82+
} else {
83+
Severity::Warning
84+
};
85+
86+
let mut details = Map::new();
87+
details.insert("elapsed_seconds".to_string(), json!(elapsed));
88+
details.insert("progress".to_string(), json!(progress));
89+
details.insert(
90+
"num_parts".to_string(),
91+
row.get("num_parts").cloned().unwrap_or(json!(0)),
92+
);
93+
details.insert(
94+
"result_part_name".to_string(),
95+
row.get("result_part_name").cloned().unwrap_or(json!("")),
96+
);
97+
98+
issues.push(Issue {
99+
severity,
100+
source: "system.merges".to_string(),
101+
component: component.clone(),
102+
error_type: "slow_merge".to_string(),
103+
message: format!(
104+
"Background merge running for {:.1} seconds ({:.1}% complete)",
105+
elapsed, progress * 100.0
106+
),
107+
details,
108+
suggested_action: "Monitor merge progress. If stuck, check server resources (CPU, disk I/O, memory). Consider stopping merge with SYSTEM STOP MERGES if necessary.".to_string(),
109+
related_queries: vec![
110+
format!(
111+
"SELECT * FROM system.merges WHERE database = '{}' AND table = '{}'",
112+
config.db_name, component.name
113+
),
114+
format!("SYSTEM STOP MERGES {}.{}", config.db_name, component.name),
115+
],
116+
});
117+
}
118+
119+
Ok(issues)
120+
}
121+
}

0 commit comments

Comments
 (0)