Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
name = "log_history"
target = ""
create = "CREATE TABLE IF NOT EXISTS system_history.log_history (timestamp TIMESTAMP NULL, path STRING NULL, target STRING NULL, log_level STRING NULL, cluster_id STRING NULL, node_id STRING NULL, warehouse_id STRING NULL, query_id STRING NULL, message STRING NULL, fields VARIANT NULL, batch_number Int64) CLUSTER BY LINEAR(timestamp, query_id)"
transform = "settings (timezone='Etc/UTC') COPY INTO system_history.log_history FROM (SELECT timestamp, path, target, log_level, cluster_id,node_id, warehouse_id, query_id, message, fields, {batch_number} FROM @{stage_name}) file_format = (TYPE = PARQUET) PURGE = TRUE"
transform = "settings (timezone='Etc/UTC') COPY INTO system_history.log_history FROM (SELECT timestamp, path, target, log_level, cluster_id,node_id, warehouse_id, query_id, message, fields, {batch_number} FROM @{stage_name}) file_format = (TYPE = PARQUET) PURGE = TRUE MAX_FILES = 5000"
delete = "DELETE FROM system_history.log_history WHERE timestamp < subtract_hours(NOW(), {retention_hours})"

[[tables]]
Expand Down
61 changes: 49 additions & 12 deletions src/query/service/src/history_tables/global_history_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::min;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand Down Expand Up @@ -46,6 +47,7 @@ use futures_util::future::join_all;
use futures_util::TryStreamExt;
use log::error;
use log::info;
use log::warn;
use opendal::raw::normalize_root;
use parking_lot::Mutex;
use rand::random;
Expand Down Expand Up @@ -161,30 +163,46 @@ impl GlobalHistoryLog {
let meta_key = format!("{}/history_log_transform", self.tenant_id).clone();
let log = GlobalHistoryLog::instance();
let handle = spawn(async move {
let mut consecutive_error = 0;
let mut persistent_error_cnt = 0;
let mut temp_error_cnt = 0;
loop {
match log.transform(&table_clone, &meta_key).await {
Ok(acquired_lock) => {
if acquired_lock {
consecutive_error = 0;
persistent_error_cnt = 0;
temp_error_cnt = 0;
}
sleep(sleep_time).await;
}
Err(e) => {
error!(
"[HISTORY-TABLES] {} log transform failed due to {}, retry {}",
table_clone.name, e, consecutive_error
);
consecutive_error += 1;
if consecutive_error > 3 {
if is_temp_error(&e) {
// If the error is temporary, we will retry with exponential backoff
// The max backoff time is 10 minutes
let backoff_second =
min(2u64.saturating_pow(temp_error_cnt), 10 * 60);
temp_error_cnt += 1;
warn!(
"[HISTORY-TABLES] {} log transform failed with temporary error {}, count {}, next retry in {} seconds",
table_clone.name, e, temp_error_cnt, backoff_second
);
sleep(Duration::from_secs(backoff_second)).await;
} else {
error!(
"[HISTORY-TABLES] {} log transform failed too many times, exit",
table_clone.name
"[HISTORY-TABLES] {} log transform failed with persistent error {}, retry count {}",
table_clone.name, e, persistent_error_cnt
);
break;
persistent_error_cnt += 1;
if persistent_error_cnt > 3 {
error!(
"[HISTORY-TABLES] {} log transform failed too many times, giving up",
table_clone.name
);
return;
}
sleep(sleep_time).await;
}
}
}
sleep(sleep_time).await;
}
});
handles.push(handle);
Expand Down Expand Up @@ -433,3 +451,22 @@ pub async fn setup_operator(params: &Option<StorageParams>) -> Result<()> {
GlobalLogger::instance().set_operator(op).await;
Ok(())
}

/// Check if the error is a temporary error,
/// We will use this to determine if we should retry the operation.
fn is_temp_error(e: &ErrorCode) -> bool {
let code = e.code();
let message = e.message();
// Storage and I/O errors are considered temporary errors
let storage = code == ErrorCode::STORAGE_NOT_FOUND
|| code == ErrorCode::STORAGE_PERMISSION_DENIED
|| code == ErrorCode::STORAGE_UNAVAILABLE
|| code == ErrorCode::STORAGE_UNSUPPORTED
|| code == ErrorCode::STORAGE_INSECURE
|| code == ErrorCode::INVALID_OPERATION
|| code == ErrorCode::STORAGE_OTHER;
// If acquire semaphore failed, we consider it a temporary error
let meta = code == ErrorCode::INTERNAL && message.contains("acquire semaphore failed");
let transaction = code == ErrorCode::UNRESOLVABLE_CONFLICT;
storage || transaction || meta
}
2 changes: 1 addition & 1 deletion src/query/service/src/history_tables/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl HistoryMetaHandle {
Duration::from_secs(3),
))
.await
.map_err(|_e| "acquire semaphore failed from GlobalHistoryLog")?;
.map_err(|e| format!("acquire semaphore failed from GlobalHistoryLog {}", e))?;
if interval == 0 {
return Ok(Some(acquired_guard));
}
Expand Down