Skip to content

Commit eafab1e

Browse files
add validation for cluster & keeper configs
1 parent 831ad62 commit eafab1e

File tree

3 files changed

+229
-0
lines changed

3 files changed

+229
-0
lines changed

apps/framework-cli/src/framework/core/plan_validator.rs

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,59 @@ fn validate_cluster_references(project: &Project, plan: &InfraPlan) -> Result<()
7070
Ok(())
7171
}
7272

73+
/// Validates that replicated engines either have keeper path/replica name OR a cluster defined
74+
fn validate_replicated_engine_args(plan: &InfraPlan) -> Result<(), ValidationError> {
75+
use crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine;
76+
77+
for table in plan.target_infra_map.tables.values() {
78+
let needs_args = match &table.engine {
79+
Some(ClickhouseEngine::ReplicatedMergeTree {
80+
keeper_path,
81+
replica_name,
82+
}) => keeper_path.is_none() && replica_name.is_none(),
83+
Some(ClickhouseEngine::ReplicatedReplacingMergeTree {
84+
keeper_path,
85+
replica_name,
86+
..
87+
}) => keeper_path.is_none() && replica_name.is_none(),
88+
Some(ClickhouseEngine::ReplicatedAggregatingMergeTree {
89+
keeper_path,
90+
replica_name,
91+
}) => keeper_path.is_none() && replica_name.is_none(),
92+
Some(ClickhouseEngine::ReplicatedSummingMergeTree {
93+
keeper_path,
94+
replica_name,
95+
..
96+
}) => keeper_path.is_none() && replica_name.is_none(),
97+
_ => false,
98+
};
99+
100+
// If engine args are missing AND no cluster is defined, that's an error
101+
if needs_args && table.cluster_name.is_none() {
102+
return Err(ValidationError::TableValidation(format!(
103+
"Table '{}' uses a replicated engine but neither cluster nor keeper path/replica name are specified.\n\
104+
\n\
105+
You must either:\n\
106+
1. Specify a cluster in the table config: cluster = \"prod_cluster\"\n\
107+
(and define it in moose.config.toml)\n\
108+
2. Or provide explicit keeper path and replica name in the engine config\n",
109+
table.name
110+
)));
111+
}
112+
}
113+
114+
Ok(())
115+
}
116+
73117
pub fn validate(project: &Project, plan: &InfraPlan) -> Result<(), ValidationError> {
74118
stream::validate_changes(project, &plan.changes.streaming_engine_changes)?;
75119

76120
// Validate cluster references
77121
validate_cluster_references(project, plan)?;
78122

123+
// Validate replicated engine args
124+
validate_replicated_engine_args(plan)?;
125+
79126
// Check for validation errors in OLAP changes
80127
for change in &plan.changes.olap_changes {
81128
if let OlapChange::Table(TableChange::ValidationError { message, .. }) = change {
@@ -306,4 +353,177 @@ mod tests {
306353
_ => panic!("Expected ClusterValidation error"),
307354
}
308355
}
356+
357+
// Helper to create a table with a specific engine
358+
fn create_table_with_engine(
359+
name: &str,
360+
cluster_name: Option<String>,
361+
engine: Option<crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine>,
362+
) -> Table {
363+
Table {
364+
name: name.to_string(),
365+
columns: vec![Column {
366+
name: "id".to_string(),
367+
data_type: ColumnType::String,
368+
required: true,
369+
unique: false,
370+
primary_key: true,
371+
default: None,
372+
annotations: vec![],
373+
comment: None,
374+
ttl: None,
375+
}],
376+
order_by: OrderBy::Fields(vec!["id".to_string()]),
377+
partition_by: None,
378+
sample_by: None,
379+
engine,
380+
version: Some(Version::from_string("1.0.0".to_string())),
381+
source_primitive: PrimitiveSignature {
382+
name: name.to_string(),
383+
primitive_type: PrimitiveTypes::DataModel,
384+
},
385+
metadata: None,
386+
life_cycle: LifeCycle::FullyManaged,
387+
engine_params_hash: None,
388+
table_settings: None,
389+
indexes: vec![],
390+
database: None,
391+
table_ttl_setting: None,
392+
cluster_name,
393+
}
394+
}
395+
396+
#[test]
397+
fn test_replicated_engine_without_args_or_cluster_fails() {
398+
use crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine;
399+
400+
let project = create_test_project(None);
401+
let table = create_table_with_engine(
402+
"test_table",
403+
None,
404+
Some(ClickhouseEngine::ReplicatedMergeTree {
405+
keeper_path: None,
406+
replica_name: None,
407+
}),
408+
);
409+
let plan = create_test_plan(vec![table]);
410+
411+
let result = validate(&project, &plan);
412+
413+
assert!(result.is_err());
414+
match result {
415+
Err(ValidationError::TableValidation(msg)) => {
416+
assert!(msg.contains("test_table"));
417+
assert!(msg.contains("replicated engine"));
418+
assert!(msg.contains("cluster"));
419+
}
420+
_ => panic!("Expected TableValidation error"),
421+
}
422+
}
423+
424+
#[test]
425+
fn test_replicated_engine_with_cluster_but_no_args_succeeds() {
426+
use crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine;
427+
428+
let project = create_test_project(Some(vec![ClusterConfig {
429+
name: "prod_cluster".to_string(),
430+
}]));
431+
let table = create_table_with_engine(
432+
"test_table",
433+
Some("prod_cluster".to_string()),
434+
Some(ClickhouseEngine::ReplicatedMergeTree {
435+
keeper_path: None,
436+
replica_name: None,
437+
}),
438+
);
439+
let plan = create_test_plan(vec![table]);
440+
441+
let result = validate(&project, &plan);
442+
443+
assert!(result.is_ok());
444+
}
445+
446+
#[test]
447+
fn test_replicated_engine_with_args_but_no_cluster_succeeds() {
448+
use crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine;
449+
450+
let project = create_test_project(None);
451+
let table = create_table_with_engine(
452+
"test_table",
453+
None,
454+
Some(ClickhouseEngine::ReplicatedMergeTree {
455+
keeper_path: Some("/clickhouse/tables/{database}/{table}".to_string()),
456+
replica_name: Some("{replica}".to_string()),
457+
}),
458+
);
459+
let plan = create_test_plan(vec![table]);
460+
461+
let result = validate(&project, &plan);
462+
463+
assert!(result.is_ok());
464+
}
465+
466+
#[test]
467+
fn test_replicated_engine_with_both_args_and_cluster_succeeds() {
468+
use crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine;
469+
470+
let project = create_test_project(Some(vec![ClusterConfig {
471+
name: "prod_cluster".to_string(),
472+
}]));
473+
let table = create_table_with_engine(
474+
"test_table",
475+
Some("prod_cluster".to_string()),
476+
Some(ClickhouseEngine::ReplicatedMergeTree {
477+
keeper_path: Some("/clickhouse/tables/{database}/{table}".to_string()),
478+
replica_name: Some("{replica}".to_string()),
479+
}),
480+
);
481+
let plan = create_test_plan(vec![table]);
482+
483+
let result = validate(&project, &plan);
484+
485+
assert!(result.is_ok());
486+
}
487+
488+
#[test]
489+
fn test_replicated_replacing_merge_tree_without_args_or_cluster_fails() {
490+
use crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine;
491+
492+
let project = create_test_project(None);
493+
let table = create_table_with_engine(
494+
"test_table",
495+
None,
496+
Some(ClickhouseEngine::ReplicatedReplacingMergeTree {
497+
keeper_path: None,
498+
replica_name: None,
499+
ver: Some("version".to_string()),
500+
is_deleted: None,
501+
}),
502+
);
503+
let plan = create_test_plan(vec![table]);
504+
505+
let result = validate(&project, &plan);
506+
507+
assert!(result.is_err());
508+
match result {
509+
Err(ValidationError::TableValidation(msg)) => {
510+
assert!(msg.contains("test_table"));
511+
assert!(msg.contains("replicated engine"));
512+
}
513+
_ => panic!("Expected TableValidation error"),
514+
}
515+
}
516+
517+
#[test]
518+
fn test_non_replicated_engine_without_cluster_succeeds() {
519+
use crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine;
520+
521+
let project = create_test_project(None);
522+
let table = create_table_with_engine("test_table", None, Some(ClickhouseEngine::MergeTree));
523+
let plan = create_test_plan(vec![table]);
524+
525+
let result = validate(&project, &plan);
526+
527+
assert!(result.is_ok());
528+
}
309529
}

packages/py-moose-lib/moose_lib/dmv2/olap_table.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,10 @@ class OlapConfig(BaseModel):
121121
life_cycle: Determines how changes in code will propagate to the resources.
122122
settings: Optional table-level settings that can be modified with ALTER TABLE MODIFY SETTING.
123123
These are alterable settings that can be changed without recreating the table.
124+
cluster: Optional cluster name for ON CLUSTER support in ClickHouse.
125+
Use this to enable replicated tables across ClickHouse clusters.
126+
The cluster must be defined in moose.config.toml (dev environment only).
127+
Example: cluster="prod_cluster"
124128
"""
125129
order_by_fields: list[str] = []
126130
order_by_expression: Optional[str] = None
@@ -133,6 +137,8 @@ class OlapConfig(BaseModel):
133137
settings: Optional[dict[str, str]] = None
134138
# Optional table-level TTL expression (without leading 'TTL')
135139
ttl: Optional[str] = None
140+
# Optional cluster name for ON CLUSTER support in ClickHouse
141+
cluster: Optional[str] = None
136142

137143
# Optional secondary/data-skipping indexes
138144
class TableIndex(BaseModel):

packages/py-moose-lib/moose_lib/internal.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ class TableConfig(BaseModel):
198198
metadata: Optional metadata for the table.
199199
life_cycle: Lifecycle management setting for the table.
200200
table_settings: Optional table-level settings that can be modified with ALTER TABLE MODIFY SETTING.
201+
cluster: Optional cluster name for ON CLUSTER support in ClickHouse.
201202
"""
202203
model_config = model_config
203204

@@ -214,6 +215,7 @@ class TableConfig(BaseModel):
214215
indexes: list[OlapConfig.TableIndex] = []
215216
ttl: Optional[str] = None
216217
database: Optional[str] = None
218+
cluster: Optional[str] = None
217219

218220

219221
class TopicConfig(BaseModel):
@@ -696,6 +698,7 @@ def to_infra_map() -> dict:
696698
indexes=table.config.indexes,
697699
ttl=table.config.ttl,
698700
database=table.config.database,
701+
cluster=table.config.cluster,
699702
)
700703

701704
for name, stream in get_streams().items():

0 commit comments

Comments
 (0)