Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -46,6 +46,7 @@
import org.opensearch.plugin.insights.core.service.grouper.MinMaxHeapQueryGrouper;
import org.opensearch.plugin.insights.core.service.grouper.QueryGrouper;
import org.opensearch.plugin.insights.core.utils.ExporterReaderUtils;
import org.opensearch.plugin.insights.core.utils.IndexedPriorityQueue;
import org.opensearch.plugin.insights.rules.model.AggregationType;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.GroupingType;
Expand Down Expand Up @@ -99,7 +100,7 @@ public class TopQueriesService {
/**
* The internal thread-safe store that holds the top n queries insight data
*/
private final PriorityBlockingQueue<SearchQueryRecord> topQueriesStore;
private final IndexedPriorityQueue<String, SearchQueryRecord> topQueriesStore;

/**
* The AtomicReference of a snapshot of the current window top queries for getters to consume
Expand Down Expand Up @@ -128,6 +129,8 @@ public class TopQueriesService {

private final QueryGrouper queryGrouper;

private final AtomicLong insertSequence = new AtomicLong();

TopQueriesService(
final Client client,
final MetricType metricType,
Expand All @@ -144,7 +147,7 @@ public class TopQueriesService {
this.topNSize = QueryInsightsSettings.DEFAULT_TOP_N_SIZE;
this.windowSize = QueryInsightsSettings.DEFAULT_WINDOW_SIZE;
this.windowStart = -1L;
topQueriesStore = new PriorityBlockingQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType));
topQueriesStore = new IndexedPriorityQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType));
topQueriesCurrentSnapshot = new AtomicReference<>(new ArrayList<>());
topQueriesHistorySnapshot = new AtomicReference<>(new ArrayList<>());
queryGrouper = new MinMaxHeapQueryGrouper(
Expand Down Expand Up @@ -411,7 +414,7 @@ void consumeRecords(final List<SearchQueryRecord> records) {
// add records in current window, if there are any, to the top n store
addToTopNStore(recordsInThisWindow);
// update the current window snapshot for getters to consume
final List<SearchQueryRecord> newSnapShot = new ArrayList<>(topQueriesStore);
final List<SearchQueryRecord> newSnapShot = new ArrayList<>(topQueriesStore.getAllValues());
newSnapShot.sort((a, b) -> SearchQueryRecord.compare(a, b, metricType));
topQueriesCurrentSnapshot.set(newSnapShot);
}
Expand All @@ -422,7 +425,10 @@ private void addToTopNStore(final List<SearchQueryRecord> records) {
queryGrouper.add(record);
}
} else {
topQueriesStore.addAll(records);
for (SearchQueryRecord record : records) {
String uniqueKey = String.valueOf(insertSequence.getAndIncrement());
topQueriesStore.insert(uniqueKey, record);
}
// remove top elements for fix sizing priority queue
while (topQueriesStore.size() > topNSize) {
topQueriesStore.poll();
Expand All @@ -442,7 +448,7 @@ private void rotateWindowIfNecessary(final long newWindowStart) {
final List<SearchQueryRecord> history = new ArrayList<>();
// rotate the current window to history store only if the data belongs to the last window
if (windowStart == newWindowStart - windowSize.getMillis()) {
history.addAll(topQueriesStore);
history.addAll(topQueriesStore.getAllValues());
}
topQueriesHistorySnapshot.set(history);
topQueriesStore.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.collect.Tuple;
import org.opensearch.plugin.insights.core.utils.IndexedPriorityQueue;
import org.opensearch.plugin.insights.rules.model.AggregationType;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.GroupingType;
Expand Down Expand Up @@ -58,14 +58,14 @@ public class MinMaxHeapQueryGrouper implements QueryGrouper {
/**
* Min heap to keep track of the Top N query groups and is passed from TopQueriesService as the topQueriesStore
*/
private final PriorityBlockingQueue<SearchQueryRecord> minHeapTopQueriesStore;
private final IndexedPriorityQueue<String, SearchQueryRecord> minHeapTopQueriesStore;
/**
* The Max heap is an overflow data structure used to manage records that exceed the capacity of the Min heap.
* It stores all records not included in the Top N query results. When the aggregate measurement for one of these
* records is updated and it now qualifies as part of the Top N, the record is moved from the Max heap to the Min heap,
* and the records are rearranged accordingly.
*/
private final PriorityBlockingQueue<SearchQueryRecord> maxHeapQueryStore;
private final IndexedPriorityQueue<String, SearchQueryRecord> maxHeapQueryStore;

/**
* Top N size based on the configuration set
Expand All @@ -84,7 +84,7 @@ public MinMaxHeapQueryGrouper(
final MetricType metricType,
final GroupingType groupingType,
final AggregationType aggregationType,
final PriorityBlockingQueue<SearchQueryRecord> topQueriesStore,
final IndexedPriorityQueue<String, SearchQueryRecord> topQueriesStore,
final int topNSize
) {
this.groupingType = groupingType;
Expand All @@ -94,7 +94,7 @@ public MinMaxHeapQueryGrouper(
this.minHeapTopQueriesStore = topQueriesStore;
this.topNSize = topNSize;
this.maxGroups = QueryInsightsSettings.DEFAULT_GROUPS_EXCLUDING_TOPN_LIMIT;
this.maxHeapQueryStore = new PriorityBlockingQueue<>(maxGroups, (a, b) -> SearchQueryRecord.compare(b, a, metricType));
this.maxHeapQueryStore = new IndexedPriorityQueue<>(maxGroups, (a, b) -> SearchQueryRecord.compare(b, a, metricType));
}

/**
Expand Down Expand Up @@ -137,9 +137,9 @@ public SearchQueryRecord add(final SearchQueryRecord searchQueryRecord) {
aggregateSearchQueryRecord = groupIdToAggSearchQueryRecord.get(groupId).v1();
boolean isPresentInMinPQ = groupIdToAggSearchQueryRecord.get(groupId).v2();
if (isPresentInMinPQ) {
minHeapTopQueriesStore.remove(aggregateSearchQueryRecord);
minHeapTopQueriesStore.remove(groupId);
} else {
maxHeapQueryStore.remove(aggregateSearchQueryRecord);
maxHeapQueryStore.remove(groupId);
}
addAndPromote(searchQueryRecord, aggregateSearchQueryRecord, groupId);
}
Expand Down Expand Up @@ -207,7 +207,7 @@ public void updateTopNSize(final int newSize) {
}

private void addToMinPQ(final SearchQueryRecord searchQueryRecord, final String groupId) {
minHeapTopQueriesStore.add(searchQueryRecord);
minHeapTopQueriesStore.insert(searchQueryRecord.getGroupingId(), searchQueryRecord);
groupIdToAggSearchQueryRecord.put(groupId, new Tuple<>(searchQueryRecord, true));
overflow();
}
Expand All @@ -223,17 +223,21 @@ private void addAndPromote(
if (maxHeapQueryStore.isEmpty()) {
return;
}
if (SearchQueryRecord.compare(maxHeapQueryStore.peek(), minHeapTopQueriesStore.peek(), metricType) > 0) {
SearchQueryRecord recordMovedFromMaxToMin = maxHeapQueryStore.poll();
addToMinPQ(recordMovedFromMaxToMin, recordMovedFromMaxToMin.getGroupingId());
IndexedPriorityQueue.Entry<String, SearchQueryRecord> maxPeek = maxHeapQueryStore.peek();
IndexedPriorityQueue.Entry<String, SearchQueryRecord> minPeek = minHeapTopQueriesStore.peek();
if (maxPeek != null && minPeek != null && SearchQueryRecord.compare(maxPeek.value, minPeek.value, metricType) > 0) {
IndexedPriorityQueue.Entry<String, SearchQueryRecord> entryMovedFromMaxToMin = maxHeapQueryStore.pollEntry();
addToMinPQ(entryMovedFromMaxToMin.value, entryMovedFromMaxToMin.key);
}
}

private void overflow() {
if (minHeapTopQueriesStore.size() > topNSize) {
SearchQueryRecord recordMovedFromMinToMax = minHeapTopQueriesStore.poll();
maxHeapQueryStore.add(recordMovedFromMinToMax);
groupIdToAggSearchQueryRecord.put(recordMovedFromMinToMax.getGroupingId(), new Tuple<>(recordMovedFromMinToMax, false));
IndexedPriorityQueue.Entry<String, SearchQueryRecord> movedEntry = minHeapTopQueriesStore.pollEntry();
if (movedEntry != null) {
maxHeapQueryStore.insert(movedEntry.key, movedEntry.value);
groupIdToAggSearchQueryRecord.put(movedEntry.key, new Tuple<>(movedEntry.value, false));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.utils;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class IndexedPriorityQueue<K, V> {

private static final Logger logger = LogManager.getLogger(IndexedPriorityQueue.class);

public static class Entry<K, V> {
public final K key;
public V value;

Entry(K key, V value) {
this.key = key;
this.value = value;
}
}

private final List<Entry<K, V>> heap;
private final Map<K, Integer> indexMap;
private final Comparator<? super V> comparator;

private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock read = lock.readLock();
private final Lock write = lock.writeLock();

public IndexedPriorityQueue(int initialCapacity, Comparator<? super V> comparator) {
this.comparator = comparator;
this.heap = new ArrayList<>(initialCapacity);
this.indexMap = new HashMap<>(initialCapacity);
}

public boolean insert(K key, V value) {
write.lock();
try {
if (indexMap.containsKey(key)) {
logger.debug("Key {} already exists in queue, skipping insert", key);
return false;
}
heap.add(new Entry<>(key, value));
int idx = heap.size() - 1;
indexMap.put(key, idx);
siftUp(idx);
logger.debug("Successfully inserted key {} at index {}", key, idx);
return true;
} finally {
write.unlock();
}
}

public boolean remove(K key) {
write.lock();
try {
Integer idx = indexMap.remove(key);
if (idx == null) {
logger.debug("Key {} not found in queue, nothing to remove", key);
return false;
}

int lastIdx = heap.size() - 1;
if (idx != lastIdx) {
Entry<K, V> lastItem = heap.get(lastIdx);
heap.set(idx, lastItem);
indexMap.put(lastItem.key, idx);

boolean needSiftUp = false;
boolean needSiftDown = false;

if (idx > 0) {
int parentIdx = (idx - 1) >>> 1;
if (comparator.compare(lastItem.value, heap.get(parentIdx).value) < 0) {
needSiftUp = true;
}
}

if (!needSiftUp && idx < heap.size() >>> 1) {
int left = (idx << 1) + 1;
if (comparator.compare(lastItem.value, heap.get(left).value) > 0) {
needSiftDown = true;
} else if (left + 1 < heap.size() && comparator.compare(lastItem.value, heap.get(left + 1).value) > 0) {
needSiftDown = true;
}
}

if (needSiftUp) {
siftUp(idx);
} else if (needSiftDown) {
siftDown(idx);
}
}
heap.remove(lastIdx);
logger.debug("Successfully removed key {} from index {}", key, idx);
return true;
} finally {
write.unlock();
}
}

public V poll() {
Entry<K, V> e = pollEntry();
return e == null ? null : e.value;
}

public Entry<K, V> pollEntry() {
write.lock();
try {
if (heap.isEmpty()) {
return null;
}
Entry<K, V> head = heap.get(0);
remove(head.key);
return head;
} finally {
write.unlock();
}
}

public Entry<K, V> peek() {
read.lock();
try {
return heap.isEmpty() ? null : heap.get(0);
} finally {
read.unlock();
}
}

public int size() {
read.lock();
try {
return heap.size();
} finally {
read.unlock();
}
}

public List<V> getAllValues() {
read.lock();
try {
List<V> values = new ArrayList<>();
for (Entry<K, V> entry : heap) {
values.add(entry.value);
}
return values;
} finally {
read.unlock();
}
}

public void clear() {
write.lock();
try {
heap.clear();
indexMap.clear();
} finally {
write.unlock();
}
}

private void siftUp(int idx) {
Entry<K, V> item = heap.get(idx);
while (idx > 0) {
int parentIdx = (idx - 1) >>> 1;
Entry<K, V> parent = heap.get(parentIdx);
if (comparator.compare(item.value, parent.value) >= 0) break;
heap.set(idx, parent);
indexMap.put(parent.key, idx);
idx = parentIdx;
}
heap.set(idx, item);
indexMap.put(item.key, idx);
}

private void siftDown(int idx) {
int half = heap.size() >>> 1;
Entry<K, V> item = heap.get(idx);
while (idx < half) {
int left = (idx << 1) + 1;
int right = left + 1;
int smallest = left;

if (right < heap.size() && comparator.compare(heap.get(right).value, heap.get(left).value) < 0) {
smallest = right;
}

Entry<K, V> smallestItem = heap.get(smallest);
if (comparator.compare(item.value, smallestItem.value) <= 0) break;

heap.set(idx, smallestItem);
indexMap.put(smallestItem.key, idx);
idx = smallest;
}
heap.set(idx, item);
indexMap.put(item.key, idx);
}

public boolean isEmpty() {
read.lock();
try {
return heap.isEmpty();
} finally {
read.unlock();
}
}
}
Loading