Skip to content
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ build_exceptions! {
TaskScheduleAndAfterConflict(2615),
/// Task when condition not met
TaskWhenConditionNotMet(2616),
/// Task Running when modifying after
TaskRunningWhenModifyingAfter(2617),
}

// Search and External Service Errors [1901-1903, 1910]
Expand Down
6 changes: 6 additions & 0 deletions src/meta/app/src/principal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ pub mod procedure_identity;
pub mod procedure_name_ident;
pub mod stage_file_ident;
pub mod task;
pub mod task_dependent_ident;
pub mod task_ident;
pub mod task_message_ident;
pub mod task_state_ident;
pub mod tenant_ownership_object_ident;
pub mod tenant_user_ident;
pub mod user_defined_file_format_ident;
Expand Down Expand Up @@ -89,13 +91,17 @@ pub use role_info::RoleInfo;
pub use role_info::RoleInfoSerdeError;
pub use stage_file_ident::StageFileIdent;
pub use stage_file_path::StageFilePath;
pub use task::DependentType;
pub use task::ScheduleOptions;
pub use task::ScheduleType;
pub use task::State;
pub use task::Status;
pub use task::Task;
pub use task::TaskDependentKey;
pub use task::TaskDependentValue;
pub use task::TaskMessage;
pub use task::TaskRun;
pub use task::TaskState;
pub use task::WarehouseOptions;
pub use task_ident::TaskIdent;
pub use task_ident::TaskIdentRaw;
Expand Down
61 changes: 61 additions & 0 deletions src/meta/app/src/principal/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::BTreeSet;

use chrono::DateTime;
use chrono::Utc;
Expand Down Expand Up @@ -172,3 +173,63 @@ impl TaskMessage {
}
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct TaskState {
pub is_succeeded: bool,
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum DependentType {
After = 0,
Before = 1,
}

#[derive(Debug, Clone, PartialEq)]
pub struct TaskDependentKey {
pub ty: DependentType,
pub source: String,
}

impl TaskDependentKey {
pub fn new(ty: DependentType, source: String) -> Self {
Self { ty, source }
}
}

#[derive(Debug, Clone, PartialEq, Default)]
pub struct TaskDependentValue(pub BTreeSet<String>);

mod kvapi_key_impl {
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::KeyError;

use crate::principal::DependentType;
use crate::principal::TaskDependentKey;

impl kvapi::KeyCodec for TaskDependentKey {
fn encode_key(&self, b: kvapi::KeyBuilder) -> kvapi::KeyBuilder {
match self.ty {
DependentType::After => b.push_str("After"),
DependentType::Before => b.push_str("Before"),
}
.push_str(self.source.as_str())
}

fn decode_key(parser: &mut kvapi::KeyParser) -> Result<Self, kvapi::KeyError> {
let ty = match parser.next_str()?.as_str() {
"After" => DependentType::After,
"Before" => DependentType::Before,
str => {
return Err(KeyError::InvalidId {
s: str.to_string(),
reason: "Invalid Dependent Type".to_string(),
})
}
};
let source = parser.next_str()?;

Ok(Self { ty, source })
}
}
}
45 changes: 45 additions & 0 deletions src/meta/app/src/principal/task_dependent_ident.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::tenant_key::ident::TIdent;

pub type TaskDependentIdent = TIdent<TaskDependentResource, TaskDependentKey>;

pub use kvapi_impl::TaskDependentResource;

use crate::principal::TaskDependentKey;

mod kvapi_impl {
use databend_common_meta_kvapi::kvapi;

use crate::principal::task::TaskDependentValue;
use crate::principal::task_dependent_ident::TaskDependentIdent;
use crate::tenant_key::resource::TenantResource;

pub struct TaskDependentResource;
impl TenantResource for TaskDependentResource {
const PREFIX: &'static str = "__fd_task_dependents";
const TYPE: &'static str = "TaskDependentIdent";
const HAS_TENANT: bool = true;
type ValueType = TaskDependentValue;
}

impl kvapi::Value for TaskDependentValue {
type KeyType = TaskDependentIdent;

fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator<Item = String> {
[]
}
}
}
45 changes: 45 additions & 0 deletions src/meta/app/src/principal/task_state_ident.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::tenant_key::ident::TIdent;

pub type TaskStateIdent = TIdent<Resource>;

pub type TaskStateIdentRaw = TIdent<Resource>;

pub use kvapi_impl::Resource;

mod kvapi_impl {
use databend_common_meta_kvapi::kvapi;

use crate::principal::task::TaskState;
use crate::principal::task_state_ident::TaskStateIdent;
use crate::tenant_key::resource::TenantResource;

pub struct Resource;
impl TenantResource for Resource {
const PREFIX: &'static str = "__fd_task_states";
const TYPE: &'static str = "TaskStateIdent";
const HAS_TENANT: bool = true;
type ValueType = TaskState;
}

impl kvapi::Value for TaskState {
type KeyType = TaskStateIdent;

fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator<Item = String> {
[]
}
}
}
46 changes: 46 additions & 0 deletions src/meta/proto-conv/src/task_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeSet;

use chrono::DateTime;
use chrono::Utc;
use databend_common_meta_app::principal as mt;
Expand Down Expand Up @@ -195,3 +197,47 @@ impl FromToProto for mt::TaskMessage {
})
}
}

impl FromToProto for mt::TaskDependentValue {
type PB = pb::TaskDependentValue;

fn get_pb_ver(p: &Self::PB) -> u64 {
p.ver
}

fn from_pb(p: Self::PB) -> Result<Self, Incompatible>
where Self: Sized {
Ok(Self(BTreeSet::from_iter(p.names)))
}

fn to_pb(&self) -> Result<Self::PB, Incompatible> {
Ok(pb::TaskDependentValue {
ver: VER,
min_reader_ver: MIN_READER_VER,
names: Vec::from_iter(self.0.iter().cloned()),
})
}
}

impl FromToProto for mt::TaskState {
type PB = pb::TaskState;

fn get_pb_ver(p: &Self::PB) -> u64 {
p.ver
}

fn from_pb(p: Self::PB) -> Result<Self, Incompatible>
where Self: Sized {
Ok(Self {
is_succeeded: p.is_succeeded,
})
}

fn to_pb(&self) -> Result<Self::PB, Incompatible> {
Ok(pb::TaskState {
ver: VER,
min_reader_ver: MIN_READER_VER,
is_succeeded: self.is_succeeded,
})
}
}
1 change: 1 addition & 0 deletions src/meta/proto-conv/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
(138, "2025-07-23: Add: TableStatistics add index size"),
(139, "2025-07-25: Add: Grant/OwnershipSequenceObject and UserPrivilegeType AccessSequence, AccessSequence"),
(140, "2025-07-24: Add: TaskMessage::Delete add WarehouseOptions"),
(141, "2025-07-27: Add: TaskState and TaskDependent"),
// Dear developer:
// If you're gonna add a new metadata version, you'll have to add a test for it.
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,4 @@ mod v137_add_grant_object_connection;
mod v138_table_statistics;
mod v139_add_grant_ownership_object_sequence;
mod v140_task_message;
mod v141_task_state;
50 changes: 50 additions & 0 deletions src/meta/proto-conv/tests/it/v141_task_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2023 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeSet;

use databend_common_meta_app::principal as mt;
use fastrace::func_name;

use crate::common;

#[test]
fn test_decode_v141_task_state() -> anyhow::Result<()> {
let task_state_v141 = vec![8, 1, 160, 6, 141, 1, 168, 6, 24];

let want = || mt::TaskState { is_succeeded: true };
common::test_pb_from_to(func_name!(), want())?;
common::test_load_old(func_name!(), task_state_v141.as_slice(), 141, want())?;

Ok(())
}

#[test]
fn test_decode_v141_task_dependent() -> anyhow::Result<()> {
let task_dependent_value_v141 = vec![10, 1, 97, 10, 1, 98, 160, 6, 141, 1, 168, 6, 24];
let want = || mt::TaskDependentValue(BTreeSet::from([s("a"), s("b")]));
common::test_pb_from_to(func_name!(), want())?;
common::test_load_old(
func_name!(),
task_dependent_value_v141.as_slice(),
141,
want(),
)?;

Ok(())
}

fn s(ss: impl ToString) -> String {
ss.to_string()
}
26 changes: 26 additions & 0 deletions src/meta/protos/proto/task.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,29 @@ message Task {
optional string error_integration = 20;
string owner_user = 21;
}

message TaskState {
uint64 ver = 100;
uint64 min_reader_ver = 101;

bool is_succeeded = 1;
}

message TaskDependentKey {
uint64 ver = 100;
uint64 min_reader_ver = 101;

enum DependentType {
After = 0;
Before = 1;
}
DependentType ty = 1;
string source = 2;
}

message TaskDependentValue {
uint64 ver = 100;
uint64 min_reader_ver = 101;

repeated string names = 1;
}
Loading