@@ -635,9 +635,13 @@ async fn search_partial_hits_phase_with_scroll(
635
635
/// metadata count.
636
636
///
637
637
/// This is done by exclusion, so we will need to keep it up to date if fields are added.
638
- pub fn is_metadata_count_request ( request : & SearchRequest ) -> bool {
638
+ pub fn is_metadata_count_request ( request : & SearchRequest , split : & SplitMetadata ) -> bool {
639
639
let query_ast: QueryAst = serde_json:: from_str ( & request. query_ast ) . unwrap ( ) ;
640
- is_metadata_count_request_with_ast ( & query_ast, request)
640
+
641
+ let start_time = split. time_range . as_ref ( ) . map ( |x| x. start ( ) ) . copied ( ) ;
642
+ let end_time = split. time_range . as_ref ( ) . map ( |x| x. end ( ) ) . copied ( ) ;
643
+
644
+ is_metadata_count_request_with_ast ( & query_ast, request, start_time, end_time)
641
645
}
642
646
643
647
/// Check if the request is a count request without any filters, so we can just return the split
@@ -646,42 +650,47 @@ pub fn is_metadata_count_request(request: &SearchRequest) -> bool {
646
650
/// This is done by exclusion, so we will need to keep it up to date if fields are added.
647
651
///
648
652
/// The passed query_ast should match the serialized on in request.
649
- pub fn is_metadata_count_request_with_ast ( query_ast : & QueryAst , request : & SearchRequest ) -> bool {
653
+ pub fn is_metadata_count_request_with_ast (
654
+ query_ast : & QueryAst ,
655
+ request : & SearchRequest ,
656
+ split_start_timestamp : Option < i64 > ,
657
+ split_end_timestamp : Option < i64 > ,
658
+ ) -> bool {
650
659
if query_ast != & QueryAst :: MatchAll {
651
660
return false ;
652
661
}
653
662
if request. max_hits != 0 {
654
663
return false ;
655
664
}
656
665
657
- // If the start and end timestamp encompass the whole split, it is still a count query.
658
- // We remove this currently on the leaf level, but not yet on the root level.
659
- // There's a small advantage when we would do this on the root level, since we have the
660
- // counts available on the split. On the leaf it is currently required to open the split
661
- // to get the count.
662
- if request. start_timestamp . is_some ( ) || request. end_timestamp . is_some ( ) {
663
- return false ;
666
+ match ( request. start_timestamp , split_start_timestamp) {
667
+ ( Some ( request_start) , Some ( split_start) ) if split_start >= request_start => { }
668
+ ( Some ( _) , _) => return false ,
669
+ ( None , _) => { }
664
670
}
671
+ match ( request. end_timestamp , split_end_timestamp) {
672
+ ( Some ( request_end) , Some ( split_end) ) if split_end < request_end => { }
673
+ ( Some ( _) , _) => return false ,
674
+ ( None , _) => { }
675
+ }
676
+
665
677
if request. aggregation_request . is_some ( ) || !request. snippet_fields . is_empty ( ) {
666
678
return false ;
667
679
}
668
680
true
669
681
}
670
682
671
683
/// Get a leaf search response that returns the num_docs of the split
672
- pub fn get_count_from_metadata ( split_metadatas : & [ SplitMetadata ] ) -> Vec < LeafSearchResponse > {
673
- split_metadatas
674
- . iter ( )
675
- . map ( |metadata| LeafSearchResponse {
676
- num_hits : metadata. num_docs as u64 ,
677
- partial_hits : Vec :: new ( ) ,
678
- failed_splits : Vec :: new ( ) ,
679
- num_attempted_splits : 1 ,
680
- num_successful_splits : 1 ,
681
- intermediate_aggregation_result : None ,
682
- resource_stats : None ,
683
- } )
684
- . collect ( )
684
+ pub fn get_count_from_metadata ( metadata : & SplitMetadata ) -> LeafSearchResponse {
685
+ LeafSearchResponse {
686
+ num_hits : metadata. num_docs as u64 ,
687
+ partial_hits : Vec :: new ( ) ,
688
+ failed_splits : Vec :: new ( ) ,
689
+ num_attempted_splits : 1 ,
690
+ num_successful_splits : 1 ,
691
+ intermediate_aggregation_result : None ,
692
+ resource_stats : None ,
693
+ }
685
694
}
686
695
687
696
/// Returns true if the query is particularly memory intensive.
@@ -729,26 +738,31 @@ pub(crate) async fn search_partial_hits_phase(
729
738
split_metadatas : & [ SplitMetadata ] ,
730
739
cluster_client : & ClusterClient ,
731
740
) -> crate :: Result < LeafSearchResponse > {
732
- let leaf_search_responses: Vec < LeafSearchResponse > =
733
- if is_metadata_count_request ( search_request) {
734
- get_count_from_metadata ( split_metadatas)
741
+ let mut leaf_search_responses: Vec < LeafSearchResponse > =
742
+ Vec :: with_capacity ( split_metadatas. len ( ) ) ;
743
+ let mut leaf_search_jobs = Vec :: new ( ) ;
744
+ for split in split_metadatas {
745
+ if is_metadata_count_request ( search_request, split) {
746
+ leaf_search_responses. push ( get_count_from_metadata ( split) ) ;
735
747
} else {
736
- let jobs: Vec < SearchJob > = split_metadatas. iter ( ) . map ( SearchJob :: from) . collect ( ) ;
737
- let assigned_leaf_search_jobs = cluster_client
738
- . search_job_placer
739
- . assign_jobs ( jobs, & HashSet :: default ( ) )
740
- . await ?;
741
- let mut leaf_request_tasks = Vec :: new ( ) ;
742
- for ( client, client_jobs) in assigned_leaf_search_jobs {
743
- let leaf_request = jobs_to_leaf_request (
744
- search_request,
745
- indexes_metas_for_leaf_search,
746
- client_jobs,
747
- ) ?;
748
- leaf_request_tasks. push ( cluster_client. leaf_search ( leaf_request, client. clone ( ) ) ) ;
749
- }
750
- try_join_all ( leaf_request_tasks) . await ?
751
- } ;
748
+ leaf_search_jobs. push ( SearchJob :: from ( split) ) ;
749
+ }
750
+ }
751
+
752
+ if !leaf_search_jobs. is_empty ( ) {
753
+ let assigned_leaf_search_jobs = cluster_client
754
+ . search_job_placer
755
+ . assign_jobs ( leaf_search_jobs, & HashSet :: default ( ) )
756
+ . await ?;
757
+ let mut leaf_request_tasks = Vec :: new ( ) ;
758
+ for ( client, client_jobs) in assigned_leaf_search_jobs {
759
+ let leaf_request =
760
+ jobs_to_leaf_request ( search_request, indexes_metas_for_leaf_search, client_jobs) ?;
761
+ leaf_request_tasks. push ( cluster_client. leaf_search ( leaf_request, client. clone ( ) ) ) ;
762
+ }
763
+ let executed_leaf_search_responses = try_join_all ( leaf_request_tasks) . await ?;
764
+ leaf_search_responses. extend ( executed_leaf_search_responses) ;
765
+ }
752
766
753
767
// Creates a collector which merges responses into one
754
768
let merge_collector =
0 commit comments