Skip to content

Commit c4c0ba7

Browse files
authored
fix(setup): restore check for setup up-to-date before evaluation (#672)
1 parent 331dcbc commit c4c0ba7

File tree

6 files changed

+48
-19
lines changed

6 files changed

+48
-19
lines changed

examples/code_embedding/main.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ def code_embedding_flow(
6767
location=chunk["location"],
6868
code=chunk["text"],
6969
embedding=chunk["embedding"],
70+
start=chunk["start"],
7071
)
7172

7273
code_embeddings.export(

src/execution/live_updater.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,16 +60,16 @@ impl SharedAckFn {
6060
}
6161

6262
async fn update_source(
63-
flow_ctx: Arc<FlowContext>,
63+
flow: Arc<builder::AnalyzedFlow>,
6464
plan: Arc<plan::ExecutionPlan>,
65+
execution_ctx: Arc<tokio::sync::OwnedRwLockReadGuard<crate::lib_context::ExecutionContext>>,
6566
source_update_stats: Arc<stats::UpdateStats>,
6667
source_idx: usize,
6768
pool: PgPool,
6869
options: FlowLiveUpdaterOptions,
6970
) -> Result<()> {
70-
let execution_ctx = flow_ctx.execution_ctx.read().await;
7171
let source_context = execution_ctx
72-
.get_source_indexing_context(&flow_ctx.flow, source_idx, &pool)
72+
.get_source_indexing_context(&flow, source_idx, &pool)
7373
.await?;
7474

7575
let import_op = &plan.import_ops[source_idx];
@@ -97,15 +97,9 @@ async fn update_source(
9797
delta
9898
};
9999
if options.print_stats {
100-
println!(
101-
"{}.{}: {}",
102-
flow_ctx.flow.flow_instance.name, import_op.name, delta
103-
);
100+
println!("{}.{}: {}", flow.flow_instance.name, import_op.name, delta);
104101
} else {
105-
trace!(
106-
"{}.{}: {}",
107-
flow_ctx.flow.flow_instance.name, import_op.name, delta
108-
);
102+
trace!("{}.{}: {}", flow.flow_instance.name, import_op.name, delta);
109103
}
110104
};
111105

@@ -219,14 +213,16 @@ impl FlowLiveUpdater {
219213
options: FlowLiveUpdaterOptions,
220214
) -> Result<Self> {
221215
let plan = flow_ctx.flow.get_execution_plan().await?;
216+
let execution_ctx = Arc::new(flow_ctx.use_owned_execution_ctx().await?);
222217

223218
let mut tasks = JoinSet::new();
224219
let sources_update_stats = (0..plan.import_ops.len())
225220
.map(|source_idx| {
226221
let source_update_stats = Arc::new(stats::UpdateStats::default());
227222
tasks.spawn(update_source(
228-
flow_ctx.clone(),
223+
flow_ctx.flow.clone(),
229224
plan.clone(),
225+
execution_ctx.clone(),
230226
source_update_stats.clone(),
231227
source_idx,
232228
pool.clone(),

src/lib_context.rs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::builder::AnalyzedFlow;
44
use crate::execution::source_indexer::SourceIndexingContext;
55
use crate::service::error::ApiError;
66
use crate::settings;
7+
use crate::setup::ObjectSetupStatus;
78
use axum::http::StatusCode;
89
use sqlx::PgPool;
910
use sqlx::postgres::PgConnectOptions;
@@ -67,21 +68,52 @@ impl ExecutionContext {
6768
}
6869
pub struct FlowContext {
6970
pub flow: Arc<AnalyzedFlow>,
70-
pub execution_ctx: tokio::sync::RwLock<ExecutionContext>,
71+
execution_ctx: Arc<tokio::sync::RwLock<ExecutionContext>>,
7172
}
7273

7374
impl FlowContext {
7475
pub async fn new(
7576
flow: Arc<AnalyzedFlow>,
7677
existing_flow_ss: Option<&setup::FlowSetupState<setup::ExistingMode>>,
7778
) -> Result<Self> {
78-
let execution_ctx =
79-
tokio::sync::RwLock::new(ExecutionContext::new(&flow, existing_flow_ss).await?);
79+
let execution_ctx = Arc::new(tokio::sync::RwLock::new(
80+
ExecutionContext::new(&flow, existing_flow_ss).await?,
81+
));
8082
Ok(Self {
8183
flow,
8284
execution_ctx,
8385
})
8486
}
87+
88+
pub async fn use_execution_ctx(
89+
&self,
90+
) -> Result<tokio::sync::RwLockReadGuard<ExecutionContext>> {
91+
let execution_ctx = self.execution_ctx.read().await;
92+
if !execution_ctx.flow_setup_status.is_up_to_date() {
93+
api_bail!(
94+
"Flow setup is not up-to-date. Please run `cocoindex setup` to update the setup."
95+
);
96+
}
97+
Ok(execution_ctx)
98+
}
99+
100+
pub async fn use_owned_execution_ctx(
101+
&self,
102+
) -> Result<tokio::sync::OwnedRwLockReadGuard<ExecutionContext>> {
103+
let execution_ctx = self.execution_ctx.clone().read_owned().await;
104+
if !execution_ctx.flow_setup_status.is_up_to_date() {
105+
api_bail!(
106+
"Flow setup is not up-to-date. Please run `cocoindex setup` to update the setup."
107+
);
108+
}
109+
Ok(execution_ctx)
110+
}
111+
112+
pub async fn get_execution_ctx_for_setup(
113+
&self,
114+
) -> tokio::sync::RwLockReadGuard<ExecutionContext> {
115+
self.execution_ctx.read().await
116+
}
85117
}
86118

87119
static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());

src/py/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ impl Flow {
202202
.block_on(async {
203203
let exec_plan = self.0.flow.get_execution_plan().await?;
204204
let lib_context = get_lib_context()?;
205-
let execution_ctx = self.0.execution_ctx.read().await;
205+
let execution_ctx = self.0.use_execution_ctx().await?;
206206
execution::dumper::evaluate_and_dump(
207207
&exec_plan,
208208
&execution_ctx.setup_execution_context,

src/service/flows.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ pub async fn evaluate_data(
189189
) -> Result<Json<EvaluateDataResponse>, ApiError> {
190190
let flow_ctx = lib_context.get_flow_context(&flow_name)?;
191191
let source_row_key_ctx = SourceRowKeyContextHolder::create(&flow_ctx, query).await?;
192-
let execution_ctx = flow_ctx.execution_ctx.read().await;
192+
let execution_ctx = flow_ctx.use_execution_ctx().await?;
193193
let evaluate_output = row_indexer::evaluate_source_entry_with_memory(
194194
&source_row_key_ctx.as_context(),
195195
&execution_ctx.setup_execution_context,
@@ -239,7 +239,7 @@ pub async fn get_row_indexing_status(
239239
let flow_ctx = lib_context.get_flow_context(&flow_name)?;
240240
let source_row_key_ctx = SourceRowKeyContextHolder::create(&flow_ctx, query).await?;
241241

242-
let execution_ctx = flow_ctx.execution_ctx.read().await;
242+
let execution_ctx = flow_ctx.use_execution_ctx().await?;
243243
let indexing_status = indexing_status::get_source_row_indexing_status(
244244
&source_row_key_ctx.as_context(),
245245
&execution_ctx.setup_execution_context,

src/setup/driver.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ pub async fn sync_setup(
366366
let mut flow_setup_status = BTreeMap::new();
367367
for (flow_name, flow_context) in flows {
368368
let existing_state = all_setup_state.flows.get(flow_name);
369-
let execution_ctx = flow_context.execution_ctx.read().await;
369+
let execution_ctx = flow_context.get_execution_ctx_for_setup().await;
370370
flow_setup_status.insert(
371371
flow_name.clone(),
372372
check_flow_setup_status(

0 commit comments

Comments
 (0)