Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 79 additions & 55 deletions src/query/service/tests/it/sessions/queue_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Once;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
Expand All @@ -34,6 +36,8 @@ use databend_common_grpc::RpcClientConf;
use databend_common_meta_store::MetaStore;
use databend_common_meta_store::MetaStoreProvider;
use databend_common_sql::Planner;
use databend_common_tracing::init_logging;
use databend_common_tracing::Config;
use databend_common_version::BUILD_INFO;
use databend_query::interpreters::InterpreterFactory;
use databend_query::sessions::QueryEntry;
Expand Down Expand Up @@ -220,72 +224,92 @@ async fn test_serial_acquire() -> Result<()> {
}

#[tokio::test(flavor = "multi_thread")]
async fn test_concurrent_acquire() -> Result<()> {
for is_global in [true, false] {
let metastore = create_meta_store().await?;
let test_count = (SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos()
% 5) as usize
+ 5;
async fn test_concurrent_acquire_local() -> Result<()> {
do_test_concurrent_acquire(false).await
}

let ctx = format!("count={test_count}");
#[tokio::test(flavor = "multi_thread")]
async fn test_concurrent_acquire_global() -> Result<()> {
do_test_concurrent_acquire(true).await
}

let barrier = Arc::new(tokio::sync::Barrier::new(test_count));
let queue = QueueManager::<TestData>::create(2, metastore, is_global);
let mut join_handles = Vec::with_capacity(test_count);
fn setup_test() {
static INIT: Once = Once::new();
INIT.call_once(|| {
let mut config = Config::new_testing();
config.file.level = "DEBUG".to_string();

let instant = Instant::now();
for index in 0..test_count {
join_handles.push({
let queue = queue.clone();
let barrier = barrier.clone();
databend_common_base::runtime::spawn(async move {
barrier.wait().await;
let guards = init_logging("query_unittests", &config, BTreeMap::new());
Box::leak(Box::new(guards));
});
}

// Time based semaphore is sensitive to time accuracy.
// Lower timestamp semaphore being inserted after higher timestamp semaphore results in both acquired.
// Thus, we have to make the gap between timestamp large enough.
tokio::time::sleep(Duration::from_millis(300 * index as u64)).await;
async fn do_test_concurrent_acquire(is_global: bool) -> Result<()> {
setup_test();

let _guard = queue
.acquire(TestData::new(
String::from("test_concurrent_acquire"),
format!("TestData{}", index),
))
.await?;
let metastore = create_meta_store().await?;
let test_count = (SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos()
% 5) as usize
+ 5;

tokio::time::sleep(Duration::from_secs(1)).await;
Result::<()>::Ok(())
})
})
}
let ctx = format!("count={test_count}");

for join_handle in join_handles {
let _ = join_handle.await;
}
let barrier = Arc::new(tokio::sync::Barrier::new(test_count));
let queue = QueueManager::<TestData>::create(2, metastore, is_global);
let mut join_handles = Vec::with_capacity(test_count);

let elapsed = instant.elapsed();
let total = Duration::from_secs((test_count) as u64);
assert!(
elapsed >= total / 2,
"{ctx}: expect: elapsed: {:?} >= {:?}, ",
elapsed,
total / 2,
);
let delta = Duration::from_millis(300) * test_count as u32;
assert!(
elapsed < total + delta,
"{ctx}: expect: elapsed: {:?} < {:?} + {:?}, ",
elapsed,
total,
delta,
);
let instant = Instant::now();
for index in 0..test_count {
join_handles.push({
let queue = queue.clone();
let barrier = barrier.clone();
databend_common_base::runtime::spawn(async move {
barrier.wait().await;

// Time based semaphore is sensitive to time accuracy.
// Lower timestamp semaphore being inserted after higher timestamp semaphore results in both acquired.
// Thus, we have to make the gap between timestamp large enough.
tokio::time::sleep(Duration::from_millis(300 * index as u64)).await;

let _guard = queue
.acquire(TestData::new(
String::from("test_concurrent_acquire"),
format!("TestData{}", index),
))
.await?;

tokio::time::sleep(Duration::from_secs(1)).await;
Result::<()>::Ok(())
})
})
}

assert_eq!(queue.length(), 0);
for join_handle in join_handles {
let _ = join_handle.await;
}

let elapsed = instant.elapsed();
let total = Duration::from_secs((test_count) as u64);
assert!(
elapsed >= total / 2,
"{ctx}: expect: elapsed: {:?} >= {:?}, ",
elapsed,
total / 2,
);
let delta = Duration::from_millis(300) * test_count as u32;
assert!(
elapsed < total + delta,
"{ctx}: expect: elapsed: {:?} < {:?} + {:?}, ",
elapsed,
total,
delta,
);

assert_eq!(queue.length(), 0);

Ok(())
}

Expand Down