Skip to content

Commit 4f54d65

Browse files
committed
Add placement affinity on the number of existing shards
1 parent 70990e0 commit 4f54d65

File tree

1 file changed

+76
-52
lines changed

1 file changed

+76
-52
lines changed

quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs

Lines changed: 76 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::cmp::Reverse;
15+
use std::cmp::{Ordering, Reverse};
1616
use std::collections::BTreeMap;
1717
use std::collections::btree_map::Entry;
1818

19-
use itertools::Itertools;
2019
use quickwit_proto::indexing::CpuCapacity;
2120

2221
use super::scheduling_logic_model::*;
@@ -229,21 +228,61 @@ fn assert_enforce_nodes_cpu_capacity_post_condition(
229228
// If this algorithm fails to place all remaining shards, we inflate
230229
// the node capacities by 20% in the scheduling problem and start from the beginning.
231230

231+
#[derive(Debug, PartialEq, Eq, Ord)]
232+
struct PlacementCandidate {
233+
indexer_ord: IndexerOrd,
234+
current_num_shards: u32,
235+
available_capacity: CpuCapacity,
236+
affinity: u32,
237+
}
238+
239+
impl PartialOrd for PlacementCandidate {
240+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
241+
// Higher affinity is better
242+
match self.affinity.cmp(&other.affinity) {
243+
Ordering::Equal => {}
244+
ordering => return Some(ordering.reverse()),
245+
}
246+
// If tie, pick the node with shards already assigned first
247+
match self.current_num_shards.cmp(&other.current_num_shards) {
248+
Ordering::Equal => {}
249+
ordering => return Some(ordering.reverse()),
250+
}
251+
// If tie, pick the node with the highest available capacity
252+
match self.available_capacity.cmp(&other.available_capacity) {
253+
Ordering::Equal => {}
254+
ordering => return Some(ordering.reverse()),
255+
}
256+
// Final tie-breaker: indexer ID for deterministic ordering
257+
Some(self.indexer_ord.cmp(&other.indexer_ord).reverse())
258+
}
259+
}
260+
232261
fn attempt_place_unassigned_shards(
233262
unassigned_shards: &[Source],
234263
problem: &SchedulingProblem,
235264
partial_solution: &SchedulingSolution,
236265
) -> Result<SchedulingSolution, NotEnoughCapacity> {
237266
let mut solution = partial_solution.clone();
238267
for source in unassigned_shards {
239-
let indexers_with_most_available_capacity =
240-
compute_indexer_available_capacity(problem, &solution)
241-
.sorted_by_key(|(indexer_ord, capacity)| Reverse((*capacity, *indexer_ord)));
242-
place_unassigned_shards_single_source(
243-
source,
244-
indexers_with_most_available_capacity,
245-
&mut solution,
246-
)?;
268+
let mut placements: Vec<PlacementCandidate> = solution
269+
.indexer_assignments
270+
.iter()
271+
.map(|indexer_assignment: &IndexerAssignment| {
272+
let available_capacity = indexer_assignment.indexer_available_capacity(problem);
273+
assert!(available_capacity >= 0i32);
274+
let available_capacity = CpuCapacity::from_cpu_millis(available_capacity as u32);
275+
let current_num_shards = indexer_assignment.num_shards(source.source_ord);
276+
PlacementCandidate {
277+
affinity: 0,
278+
current_num_shards,
279+
available_capacity,
280+
indexer_ord: indexer_assignment.indexer_ord,
281+
}
282+
})
283+
.collect();
284+
placements.sort();
285+
place_unassigned_shards_single_source(source, &placements, &mut solution)?;
247286
}
248287
assert_place_unassigned_shards_post_condition(problem, &solution);
249288
Ok(solution)
@@ -259,27 +298,26 @@ fn place_unassigned_shards_with_affinity(
259298
Reverse(load)
260299
});
261300
for source in &unassigned_shards {
262-
// List of indexer with a non-null affinity and some available capacity, sorted by
263-
// (affinity, available capacity) in that order.
264-
let indexers_with_affinity_and_available_capacity = source
301+
let mut placements: Vec<PlacementCandidate> = source
265302
.affinities
266303
.iter()
267304
.filter(|&(_, &affinity)| affinity != 0u32)
268-
.map(|(&indexer_ord, affinity)| {
305+
.map(|(&indexer_ord, &affinity)| {
269306
let available_capacity =
270307
solution.indexer_assignments[indexer_ord].indexer_available_capacity(problem);
271-
let capacity = CpuCapacity::from_cpu_millis(available_capacity as u32);
272-
(indexer_ord, affinity, capacity)
273-
})
274-
.sorted_by_key(|(indexer_ord, affinity, capacity)| {
275-
Reverse((*affinity, *capacity, *indexer_ord))
308+
let available_capacity = CpuCapacity::from_cpu_millis(available_capacity as u32);
309+
let current_num_shards =
310+
solution.indexer_assignments[indexer_ord].num_shards(source.source_ord);
311+
PlacementCandidate {
312+
affinity,
313+
current_num_shards,
314+
available_capacity,
315+
indexer_ord,
316+
}
276317
})
277-
.map(|(indexer_ord, _, capacity)| (indexer_ord, capacity));
278-
let _ = place_unassigned_shards_single_source(
279-
source,
280-
indexers_with_affinity_and_available_capacity,
281-
solution,
282-
);
318+
.collect();
319+
placements.sort();
320+
let _ = place_unassigned_shards_single_source(source, &placements, solution);
283321
}
284322
}
285323

@@ -348,22 +386,27 @@ struct NotEnoughCapacity;
348386
/// amongst the node with their given node capacity.
349387
fn place_unassigned_shards_single_source(
350388
source: &Source,
351-
mut indexer_with_capacities: impl Iterator<Item = (IndexerOrd, CpuCapacity)>,
389+
sorted_candidates: &[PlacementCandidate],
352390
solution: &mut SchedulingSolution,
353391
) -> Result<(), NotEnoughCapacity> {
354392
let mut num_shards = source.num_shards;
355-
while num_shards > 0 {
356-
let Some((indexer_ord, available_capacity)) = indexer_with_capacities.next() else {
357-
return Err(NotEnoughCapacity);
358-
};
393+
for PlacementCandidate {
394+
indexer_ord,
395+
available_capacity,
396+
..
397+
} in sorted_candidates
398+
{
359399
let num_placable_shards = available_capacity.cpu_millis() / source.load_per_shard;
360400
let num_shards_to_place = num_placable_shards.min(num_shards);
361401
// Update the solution, the shard load, and the number of shards to place.
362-
solution.indexer_assignments[indexer_ord]
402+
solution.indexer_assignments[*indexer_ord]
363403
.add_shards(source.source_ord, num_shards_to_place);
364404
num_shards -= num_shards_to_place;
405+
if num_shards == 0 {
406+
return Ok(());
407+
}
365408
}
366-
Ok(())
409+
Err(NotEnoughCapacity)
367410
}
368411

369412
/// Compute the sources/shards that have not been assigned to any indexer yet.
@@ -392,30 +435,11 @@ fn compute_unassigned_sources(
392435
unassigned_sources.into_values().collect()
393436
}
394437

395-
/// Builds a BinaryHeap with the different indexer capacities.
396-
///
397-
/// Panics if one of the indexer is over-assigned.
398-
fn compute_indexer_available_capacity<'a>(
399-
problem: &'a SchedulingProblem,
400-
solution: &'a SchedulingSolution,
401-
) -> impl Iterator<Item = (IndexerOrd, CpuCapacity)> + 'a {
402-
solution
403-
.indexer_assignments
404-
.iter()
405-
.map(|indexer_assignment| {
406-
let available_capacity: i32 = indexer_assignment.indexer_available_capacity(problem);
407-
assert!(available_capacity >= 0i32);
408-
(
409-
indexer_assignment.indexer_ord,
410-
CpuCapacity::from_cpu_millis(available_capacity as u32),
411-
)
412-
})
413-
}
414-
415438
#[cfg(test)]
416439
mod tests {
417440
use std::num::NonZeroU32;
418441

442+
use itertools::Itertools;
419443
use proptest::prelude::*;
420444
use quickwit_proto::indexing::mcpu;
421445

0 commit comments

Comments
 (0)