Skip to content

Commit 1d73f9f

Browse files
committed
Merge remote-tracking branch 'origin/meiravg_bufferlimit_bm' into meiravg_bufferlimit_bm
2 parents c09129a + 282ea6a commit 1d73f9f

File tree

8 files changed

+64
-1653
lines changed

8 files changed

+64
-1653
lines changed

src/VecSim/algorithms/hnsw/hnsw_tiered.h

Lines changed: 17 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ class TieredHNSWIndex : public VecSimTieredIndex<DataType, DistType> {
112112
// ownership (we do it right after we update the HNSW global data and receive the new state).
113113
template <bool releaseFlatGuard>
114114
void insertVectorToHNSW(HNSWIndex<DataType, DistType> *hnsw_index, labelType label,
115-
const void *blob);
115+
DataType *blob);
116116

117117
#ifdef BUILD_TESTS
118118
#include "VecSim/algorithms/hnsw/hnsw_tiered_tests_friends.h"
@@ -366,7 +366,7 @@ void TieredHNSWIndex<DataType, DistType>::updateInsertJobInternalId(idType prev_
366366
template <typename DataType, typename DistType>
367367
template <bool releaseFlatGuard>
368368
void TieredHNSWIndex<DataType, DistType>::insertVectorToHNSW(
369-
HNSWIndex<DataType, DistType> *hnsw_index, labelType label, const void *blob) {
369+
HNSWIndex<DataType, DistType> *hnsw_index, labelType label, DataType *blob) {
370370
// Acquire the index data lock, so we know what is the exact index size at this time. Acquire
371371
// the main r/w lock before to avoid deadlocks.
372372
AddVectorCtx state = {0};
@@ -430,6 +430,9 @@ void TieredHNSWIndex<DataType, DistType>::insertVectorToHNSW(
430430
/******************** Job's callbacks **********************************/
431431
template <typename DataType, typename DistType>
432432
void TieredHNSWIndex<DataType, DistType>::executeInsertJob(HNSWInsertJob *job) {
433+
// Note: this method had not been tested with yet overwriting scenarios, where job may
434+
// have been invalidate before it is executed (TODO in the future).
435+
HNSWIndex<DataType, DistType> *hnsw_index = this->getHNSWIndex();
433436
// Note that accessing the job fields should occur with flat index guard held (here and later).
434437
this->flatIndexGuard.lock_shared();
435438
if (job->id == INVALID_JOB_ID) {
@@ -438,7 +441,6 @@ void TieredHNSWIndex<DataType, DistType>::executeInsertJob(HNSWInsertJob *job) {
438441
return;
439442
}
440443

441-
HNSWIndex<DataType, DistType> *hnsw_index = this->getHNSWIndex();
442444
// Copy the vector blob from the flat buffer, so we can release the flat lock while we are
443445
// indexing the vector into HNSW index.
444446
DataType blob_copy[this->frontendIndex->getDim()];
@@ -593,35 +595,18 @@ size_t TieredHNSWIndex<DataType, DistType>::indexLabelCount() const {
593595
template <typename DataType, typename DistType>
594596
int TieredHNSWIndex<DataType, DistType>::addVector(const void *blob, labelType label,
595597
void *auxiliaryCtx) {
596-
int ret = 1;
597-
auto hnsw_index = this->getHNSWIndex();
598-
if (this->getWriteMode() == VecSim_WriteInPlace) {
599-
this->mainIndexGuard.lock();
600-
// Internally, we may overwrite (delete the previous vector stored under this label), and
601-
// may need to increase the capacity when we append the new vector afterwards.
602-
ret = hnsw_index->addVector(blob, label);
603-
this->mainIndexGuard.unlock();
598+
599+
if (this->getWriteMode() == VecSim_WriteInPlace ||
600+
this->frontendIndex->indexSize() >= this->flatBufferLimit) {
601+
auto hnsw_index = this->getHNSWIndex();
602+
// Insert vector directly to HNSW (since flat buffer guard was not held, no need to release
603+
// it internally).
604+
this->insertVectorToHNSW<false>(hnsw_index, label, (DataType *)blob);
604605
this->UpdateIndexMemory(this->memoryCtx, this->getAllocationSize());
605-
return ret;
606-
}
607-
if (this->frontendIndex->indexSize() >= this->flatBufferLimit) {
608-
// Handle overwrite situation.
609-
if (!this->backendIndex->isMultiValue()) {
610-
// This will do nothing (and return 0) if this label doesn't exist. Otherwise, it may
611-
// remove vector from the flat buffer and/or the HNSW index.
612-
ret -= this->deleteVector(label);
613-
}
614-
if (this->frontendIndex->indexSize() >= this->flatBufferLimit) {
615-
// We didn't remove a vector from flat buffer due to overwrite, insert the new vector
616-
// directly to HNSW. Since flat buffer guard was not held, no need to release it
617-
// internally.
618-
this->insertVectorToHNSW<false>(hnsw_index, label, blob);
619-
this->UpdateIndexMemory(this->memoryCtx, this->getAllocationSize());
620-
return ret;
621-
}
622-
// Otherwise, we fall back to the "regular" insertion into the flat buffer
623-
// (since it is not full anymore after removing the previous vector stored under the label).
606+
return 1;
624607
}
608+
609+
/* Note: this currently doesn't support overriding (assuming that the label doesn't exist)! */
625610
this->flatIndexGuard.lock();
626611
idType new_flat_id = this->frontendIndex->indexSize();
627612
if (this->frontendIndex->isLabelExists(label) && !this->frontendIndex->isMultiValue()) {
@@ -713,7 +698,9 @@ int TieredHNSWIndex<DataType, DistType>::deleteVector(labelType label) {
713698
// Note that we may remove the same vector that has been removed from the flat index, if it was
714699
// being ingested at that time.
715700
if (this->getWriteMode() == VecSim_WriteAsync) {
701+
this->mainIndexGuard.lock_shared();
716702
num_deleted_vectors += this->deleteLabelFromHNSW(label);
703+
this->mainIndexGuard.unlock_shared();
717704
// Apply ready swap jobs if number of deleted vectors reached the threshold
718705
// (under exclusive lock of the main index guard).
719706
this->executeReadySwapJobs();

src/VecSim/algorithms/hnsw/hnsw_tiered_tests_friends.h

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,10 @@ INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTest_alternateInsertDeleteAsync_Test)
1616
INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTest_swapJobBasic_Test)
1717
INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTest_swapJobBasic2_Test)
1818
INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTest_deleteVectorsAndSwapSync_Test)
19-
INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTest_BatchIterator_Test)
20-
INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTest_BatchIteratorAdvanced_Test)
21-
INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTest_BatchIteratorSize1_Test)
22-
INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTest_BatchIteratorReset_Test)
23-
INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTest_BatchIteratorWithOverlaps_Test)
24-
INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTest_parallelBatchIteratorSearch_Test)
25-
INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTest_testInfo_Test)
26-
INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTest_testInfoIterator_Test)
2719
INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTest_writeInPlaceMode_Test)
28-
INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTest_switchWriteModes_Test)
20+
INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTest_burstThreadsScenario_Test)
2921
INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTest_bufferLimit_Test)
3022
INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTest_bufferLimitAsync_Test)
31-
INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTest_RangeSearch_Test)
32-
INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTest_parallelRangeSearch_Test)
3323

3424
INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTestBasic_insertJobAsync_Test)
3525
INDEX_TEST_FRIEND_CLASS(HNSWTieredIndexTestBasic_insertJobAsyncMulti_Test)

src/VecSim/vec_sim.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ extern "C" void VecSim_SetLogCallbackFunction(logCallbackFunction callback) {
2222
VecSimIndex::setLogCallbackFunction(callback);
2323
}
2424

25-
extern "C" void VecSim_SetWriteMode(VecSimWriteMode mode) { VecSimIndex::setWriteMode(mode); }
25+
extern "C" void VecSim_SetInPlaceWriteMode(VecSimWriteMode mode) {
26+
VecSimIndex::setWriteMode(mode);
27+
}
2628

2729
static VecSimResolveCode _ResolveParams_EFRuntime(VecSimAlgo index_type, VecSimRawParam rparam,
2830
VecSimQueryParams *qparams,

src/VecSim/vec_sim.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,9 +209,9 @@ void VecSim_SetLogCallbackFunction(logCallbackFunction callback);
209209
* @brief Allow 3rd party to set the write mode for tiered index - async insert/delete using
210210
* background jobs, or insert/delete inplace.
211211
*
212-
* @param mode VecSimWriteMode the mode in which we add/remove vectors (async or in-place).
212+
* @param mode VecSimWriteMode to mode to write in.
213213
*/
214-
void VecSim_SetWriteMode(VecSimWriteMode mode);
214+
void VecSim_SetInPlaceWriteMode(VecSimWriteMode mode);
215215

216216
#ifdef __cplusplus
217217
}

src/VecSim/vec_sim_index.h

Lines changed: 1 addition & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ struct VecSimIndexAbstract : public VecSimIndexInterface {
133133
delete[] buf;
134134
}
135135
}
136+
136137
#ifdef BUILD_TESTS
137138
// Set new log context to be sent to the log callback.
138139
// Returns the previous logctx.
@@ -142,92 +143,4 @@ struct VecSimIndexAbstract : public VecSimIndexInterface {
142143
return prev_logCtx;
143144
}
144145
#endif
145-
146-
void addCommonInfoToIterator(VecSimInfoIterator *infoIterator, const CommonInfo &info) const {
147-
infoIterator->addInfoField(VecSim_InfoField{
148-
.fieldName = VecSimCommonStrings::TYPE_STRING,
149-
.fieldType = INFOFIELD_STRING,
150-
.fieldValue = {FieldValue{.stringValue = VecSimType_ToString(info.type)}}});
151-
infoIterator->addInfoField(
152-
VecSim_InfoField{.fieldName = VecSimCommonStrings::DIMENSION_STRING,
153-
.fieldType = INFOFIELD_UINT64,
154-
.fieldValue = {FieldValue{.uintegerValue = info.dim}}});
155-
infoIterator->addInfoField(VecSim_InfoField{
156-
.fieldName = VecSimCommonStrings::METRIC_STRING,
157-
.fieldType = INFOFIELD_STRING,
158-
.fieldValue = {FieldValue{.stringValue = VecSimMetric_ToString(info.metric)}}});
159-
infoIterator->addInfoField(
160-
VecSim_InfoField{.fieldName = VecSimCommonStrings::IS_MULTI_STRING,
161-
.fieldType = INFOFIELD_UINT64,
162-
.fieldValue = {FieldValue{.uintegerValue = info.isMulti}}});
163-
infoIterator->addInfoField(
164-
VecSim_InfoField{.fieldName = VecSimCommonStrings::INDEX_SIZE_STRING,
165-
.fieldType = INFOFIELD_UINT64,
166-
.fieldValue = {FieldValue{.uintegerValue = info.indexSize}}});
167-
infoIterator->addInfoField(
168-
VecSim_InfoField{.fieldName = VecSimCommonStrings::INDEX_LABEL_COUNT_STRING,
169-
.fieldType = INFOFIELD_UINT64,
170-
.fieldValue = {FieldValue{.uintegerValue = info.indexLabelCount}}});
171-
infoIterator->addInfoField(
172-
VecSim_InfoField{.fieldName = VecSimCommonStrings::BLOCK_SIZE_STRING,
173-
.fieldType = INFOFIELD_UINT64,
174-
.fieldValue = {FieldValue{.uintegerValue = info.blockSize}}});
175-
infoIterator->addInfoField(
176-
VecSim_InfoField{.fieldName = VecSimCommonStrings::MEMORY_STRING,
177-
.fieldType = INFOFIELD_UINT64,
178-
.fieldValue = {FieldValue{.uintegerValue = info.memory}}});
179-
infoIterator->addInfoField(VecSim_InfoField{
180-
.fieldName = VecSimCommonStrings::SEARCH_MODE_STRING,
181-
.fieldType = INFOFIELD_STRING,
182-
.fieldValue = {FieldValue{.stringValue = VecSimSearchMode_ToString(info.last_mode)}}});
183-
}
184-
const void *processBlob(const void *original_blob, void *processed_blob) const {
185-
// if the metric is cosine, we need to normalize
186-
if (this->metric == VecSimMetric_Cosine) {
187-
// copy original blob to the output blob
188-
memcpy(processed_blob, original_blob, this->data_size);
189-
// normalize the copy in place
190-
normalize_func(processed_blob, this->dim);
191-
192-
return processed_blob;
193-
}
194-
195-
// Else no process is needed, return the original blob
196-
return original_blob;
197-
}
198-
199-
protected:
200-
virtual int addVectorWrapper(const void *blob, labelType label, void *auxiliaryCtx) override {
201-
char processed_blob[this->data_size];
202-
const void *vector_to_add = processBlob(blob, processed_blob);
203-
204-
return this->addVector(vector_to_add, label, auxiliaryCtx);
205-
}
206-
207-
virtual VecSimQueryResult_List topKQueryWrapper(const void *queryBlob, size_t k,
208-
VecSimQueryParams *queryParams) override {
209-
char processed_blob[this->data_size];
210-
const void *query_to_send = processBlob(queryBlob, processed_blob);
211-
212-
return this->topKQuery(query_to_send, k, queryParams);
213-
}
214-
215-
virtual VecSimQueryResult_List rangeQueryWrapper(const void *queryBlob, double radius,
216-
VecSimQueryParams *queryParams,
217-
VecSimQueryResult_Order order) override {
218-
char processed_blob[this->data_size];
219-
const void *query_to_send = processBlob(queryBlob, processed_blob);
220-
221-
return this->rangeQuery(query_to_send, radius, queryParams, order);
222-
}
223-
224-
virtual VecSimBatchIterator *
225-
newBatchIteratorWrapper(const void *queryBlob, VecSimQueryParams *queryParams) const override {
226-
char processed_blob[this->data_size];
227-
const void *query_to_send = processBlob(queryBlob, processed_blob);
228-
229-
return this->newBatchIterator(query_to_send, queryParams);
230-
}
231-
232-
233146
};

src/VecSim/vec_sim_interface.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ struct VecSimIndexInterface : public VecsimBaseObject {
234234
* @brief Allow 3rd party to set the write mode for tiered index - async insert/delete using
235235
* background jobs, or insert/delete inplace.
236236
*
237-
* @param mode VecSimWriteMode the mode in which we add/remove vectors (async or in-place).
237+
* @param mode VecSimWriteMode to mode to write in.
238238
*/
239239
static VecSimWriteMode asyncWriteMode;
240240
inline static void setWriteMode(VecSimWriteMode mode) {

src/VecSim/vec_sim_tiered_index.h

Lines changed: 1 addition & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -66,66 +66,11 @@ class VecSimTieredIndex : public VecSimIndexInterface {
6666
VecSimQueryResult_List topKQuery(const void *queryBlob, size_t k,
6767
VecSimQueryParams *queryParams) override;
6868

69-
virtual inline int64_t getAllocationSize() const override {
70-
return this->allocator->getAllocationSize() + this->backendIndex->getAllocationSize() +
71-
this->frontendIndex->getAllocationSize();
72-
}
73-
74-
virtual VecSimIndexInfo info() const override;
75-
virtual VecSimInfoIterator *infoIterator() const override;
76-
77-
VecSimQueryResult_List rangeQuery(const void *queryBlob, double radius,
78-
VecSimQueryParams *queryParams,
79-
VecSimQueryResult_Order order) override;
80-
81-
bool preferAdHocSearch(size_t subsetSize, size_t k, bool initial_check) override {
82-
// For now, decide according to the bigger index.
83-
return this->backendIndex->indexSize() > this->frontendIndex->indexSize()
84-
? this->backendIndex->preferAdHocSearch(subsetSize, k, initial_check)
85-
: this->frontendIndex->preferAdHocSearch(subsetSize, k, initial_check);
86-
}
69+
static VecSimWriteMode getWriteMode() { return asyncWriteMode; }
8770

88-
// Return the current state of the global write mode (async/in-place).
89-
static VecSimWriteMode getWriteMode() { return VecSimIndexInterface::asyncWriteMode; }
9071
#ifdef BUILD_TESTS
9172
inline VecSimIndexAbstract<DistType> *getFlatbufferIndex() { return this->frontendIndex; }
9273
#endif
93-
private:
94-
virtual int addVectorWrapper(const void *blob, labelType label, void *auxiliaryCtx) override {
95-
// Will be used only if a processing stage is needed
96-
char processed_blob[this->backendIndex->getDataSize()];
97-
const void *vector_to_add = this->backendIndex->processBlob(blob, processed_blob);
98-
return this->addVector(vector_to_add, label, auxiliaryCtx);
99-
}
100-
101-
virtual VecSimQueryResult_List topKQueryWrapper(const void *queryBlob, size_t k,
102-
VecSimQueryParams *queryParams) override {
103-
// Will be used only if a processing stage is needed
104-
char processed_blob[this->backendIndex->getDataSize()];
105-
const void *query_to_send = this->backendIndex->processBlob(queryBlob, processed_blob);
106-
return this->topKQuery(query_to_send, k, queryParams);
107-
}
108-
109-
virtual VecSimQueryResult_List rangeQueryWrapper(const void *queryBlob, double radius,
110-
VecSimQueryParams *queryParams,
111-
VecSimQueryResult_Order order) override {
112-
// Will be used only if a processing stage is needed
113-
char processed_blob[this->backendIndex->getDataSize()];
114-
const void *query_to_send = this->backendIndex->processBlob(queryBlob, processed_blob);
115-
116-
return this->rangeQuery(query_to_send, radius, queryParams, order);
117-
}
118-
119-
virtual VecSimBatchIterator *
120-
newBatchIteratorWrapper(const void *queryBlob, VecSimQueryParams *queryParams) const override {
121-
// Will be used only if a processing stage is needed
122-
char processed_blob[this->backendIndex->getDataSize()];
123-
const void *query_to_send = this->backendIndex->processBlob(queryBlob, processed_blob);
124-
125-
return this->newBatchIterator(query_to_send, queryParams);
126-
}
127-
128-
12974
};
13075

13176
template <typename DataType, typename DistType>

0 commit comments

Comments
 (0)