diff --git a/src/query/service/tests/it/sessions/queue_mgr.rs b/src/query/service/tests/it/sessions/queue_mgr.rs index 94a03677b7cb5..417de5f1d32ad 100644 --- a/src/query/service/tests/it/sessions/queue_mgr.rs +++ b/src/query/service/tests/it/sessions/queue_mgr.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::collections::HashMap; use std::sync::Arc; +use std::sync::Once; use std::time::Duration; use std::time::Instant; use std::time::SystemTime; @@ -34,6 +36,8 @@ use databend_common_grpc::RpcClientConf; use databend_common_meta_store::MetaStore; use databend_common_meta_store::MetaStoreProvider; use databend_common_sql::Planner; +use databend_common_tracing::init_logging; +use databend_common_tracing::Config; use databend_common_version::BUILD_INFO; use databend_query::interpreters::InterpreterFactory; use databend_query::sessions::QueryEntry; @@ -220,72 +224,92 @@ async fn test_serial_acquire() -> Result<()> { } #[tokio::test(flavor = "multi_thread")] -async fn test_concurrent_acquire() -> Result<()> { - for is_global in [true, false] { - let metastore = create_meta_store().await?; - let test_count = (SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_nanos() - % 5) as usize - + 5; +async fn test_concurrent_acquire_local() -> Result<()> { + do_test_concurrent_acquire(false).await +} - let ctx = format!("count={test_count}"); +#[tokio::test(flavor = "multi_thread")] +async fn test_concurrent_acquire_global() -> Result<()> { + do_test_concurrent_acquire(true).await +} - let barrier = Arc::new(tokio::sync::Barrier::new(test_count)); - let queue = QueueManager::::create(2, metastore, is_global); - let mut join_handles = Vec::with_capacity(test_count); +fn setup_test() { + static INIT: Once = Once::new(); + INIT.call_once(|| { + let mut config = Config::new_testing(); + config.file.level = "DEBUG".to_string(); - let instant = Instant::now(); - for index in 0..test_count { - join_handles.push({ - let queue = queue.clone(); - let barrier = barrier.clone(); - databend_common_base::runtime::spawn(async move { - barrier.wait().await; + let guards = init_logging("query_unittests", &config, BTreeMap::new()); + Box::leak(Box::new(guards)); + }); +} - // Time based semaphore is sensitive to time accuracy. - // Lower timestamp semaphore being inserted after higher timestamp semaphore results in both acquired. - // Thus, we have to make the gap between timestamp large enough. - tokio::time::sleep(Duration::from_millis(300 * index as u64)).await; +async fn do_test_concurrent_acquire(is_global: bool) -> Result<()> { + setup_test(); - let _guard = queue - .acquire(TestData::new( - String::from("test_concurrent_acquire"), - format!("TestData{}", index), - )) - .await?; + let metastore = create_meta_store().await?; + let test_count = (SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() + % 5) as usize + + 5; - tokio::time::sleep(Duration::from_secs(1)).await; - Result::<()>::Ok(()) - }) - }) - } + let ctx = format!("count={test_count}"); - for join_handle in join_handles { - let _ = join_handle.await; - } + let barrier = Arc::new(tokio::sync::Barrier::new(test_count)); + let queue = QueueManager::::create(2, metastore, is_global); + let mut join_handles = Vec::with_capacity(test_count); - let elapsed = instant.elapsed(); - let total = Duration::from_secs((test_count) as u64); - assert!( - elapsed >= total / 2, - "{ctx}: expect: elapsed: {:?} >= {:?}, ", - elapsed, - total / 2, - ); - let delta = Duration::from_millis(300) * test_count as u32; - assert!( - elapsed < total + delta, - "{ctx}: expect: elapsed: {:?} < {:?} + {:?}, ", - elapsed, - total, - delta, - ); + let instant = Instant::now(); + for index in 0..test_count { + join_handles.push({ + let queue = queue.clone(); + let barrier = barrier.clone(); + databend_common_base::runtime::spawn(async move { + barrier.wait().await; + + // Time based semaphore is sensitive to time accuracy. + // Lower timestamp semaphore being inserted after higher timestamp semaphore results in both acquired. + // Thus, we have to make the gap between timestamp large enough. + tokio::time::sleep(Duration::from_millis(300 * index as u64)).await; + + let _guard = queue + .acquire(TestData::new( + String::from("test_concurrent_acquire"), + format!("TestData{}", index), + )) + .await?; + + tokio::time::sleep(Duration::from_secs(1)).await; + Result::<()>::Ok(()) + }) + }) + } - assert_eq!(queue.length(), 0); + for join_handle in join_handles { + let _ = join_handle.await; } + let elapsed = instant.elapsed(); + let total = Duration::from_secs((test_count) as u64); + assert!( + elapsed >= total / 2, + "{ctx}: expect: elapsed: {:?} >= {:?}, ", + elapsed, + total / 2, + ); + let delta = Duration::from_millis(300) * test_count as u32; + assert!( + elapsed < total + delta, + "{ctx}: expect: elapsed: {:?} < {:?} + {:?}, ", + elapsed, + total, + delta, + ); + + assert_eq!(queue.length(), 0); + Ok(()) }