Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@
Quickwit needs to assign indexing tasks to a set of indexers nodes.
We call the result of this decision the indexing physical plan.

This needs to be done under the constraints of:
- not exceeding the maximum load of each node. (O)

We also want to observe some interesting properties such as:
- (A) we want to avoid moving indexing tasks from one indexer to another one needlessly.
- (B) we want a source to be spread amongst as few nodes as possible
- (C) we prefer to respect some margin on the capacity of all nodes.
- (C) we want to balance the load between nodes as soon as the load is significatively (>30%) higher than the average (target) load
- (D) when we are working with the Ingest API source, we prefer to colocate indexers on
the ingesters holding the data.

Expand Down Expand Up @@ -50,24 +47,29 @@ And indexer has:
- a maximum total load (that we will need to measure or configure).

The problem is now greatly simplified.
A solution is a sparse matrix of `(num_indexers, num_sources)` that holds a number of shards to be run.
A solution is a sparse matrix of `(num_indexers, num_sources)` that holds a number of shards to be indexed.
The different constraint and wanted properties can all be re-expressed. For instance:
- We want the dot product of the load per shard vector with each row, to be lower than the maximum load
of each node. (O)
- We want the dot product of the load per shard vector with each row, to be close to the average load of each node (C)
- We do not want a large distance between the two solution matrixes (A)
- We want that matrix as sparse as possible (B).
- We want that matrix as sparse as possible (B)

Note that the constraint (C) is enforced differently depending on the load:
- shards can be placed freely on nodes up to 30% of their capacity
- above this threshold, we try to assign shards to indexers so that the total load on each indexer is close to the average load

To express the affinity constraint (D) we could similarly define a matrix of `(num_indexers, num_sources)` with affinity scores and compute a distance with the solution matrix.

The actual cost function we would craft is however not linear. For instance, the benefit of keeping
some free capacity for a given node is clearly not a linear function. In fact, keeping some imbalance
could be a good thing.
The actual cost function we would craft is however not linear, it is the combination of multiple distances like those discribed above.

# The heuristic

We use the following heuristic.

While assigning shards to node, we try to ensure that workloads are balanced (except for very small cluster loads). This is achieved by calculating a virtual capacity for each indexer. We calculate 120% of the total load on the entire cluster then divide it up proportionally between indexers according to their capacity. By respecting this virtual capacity when assigning shards to indexers, we make sure that all indexers have a load close to the average load.

## Phase 1: Remove extraneous shards

Starting from the existing solution, we first reduce it to make sure we do not have too many shards assigned.
Starting from the existing solution, we first reduce it to make sure we do not have too many shards assigned. This happens when a source was scaled down or deleted.
This is done by reducing the number of shard wherever needed, picking in priority nodes with few shards.

We call the resulting solution "reduced solution". The reduced solution is usually not a valid solution as some shard
Expand All @@ -78,18 +80,21 @@ previous solution.

## Phase 2: Enforce nodes maximum load

We then remove entire sources, in order to match the constraint (O).
We then remove entire sources from nodes where the load is higher than the capcity (load <30%) or virtual capacity (load >30%).
For every given node, we remove in priority sources that have an overall small load on the node.

Matrix-wise, note that phase 1 and phase 2 creates a matrix lower or equal to the previous solution.

## Phase 3: Greedy assignment

At this point we have reach a solution that fits on the cluster, but we possibly have several missing shards.
At this point we have reached a solution that fits on the cluster, but we possibly has several missing shards.
We therefore use a greedy algorithm to allocate these shard. We assign the shards source by source, in the order of decreasing total load.
We assign the source to the node with largest remaining load capacity.

If this phase fails, it is ok to log an error, and stop assigning sources.
We try assigning shards to indexers while trying to respect their virtual capacity. Because of the uneven size of shards and the greedy approach, this problem might not have a solution. In that case we iteratively grow the virtual capacity by 20% until the solution fits.

Shards for each source are placed in two steps:
- in a first iteration we assign shards that have affinity scores (D)
- in a second iteration we assign the rest of the shards starting with the node having the highest capacity

## Phase 4: Optimization

Expand Down