Skip to content

Python: Update InMemory and Faiss #12117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: feature-python-vector-stores-preb
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 102 additions & 78 deletions python/semantic_kernel/connectors/memory/faiss.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,21 @@
import logging
import sys
from collections.abc import MutableMapping, Sequence
from typing import TYPE_CHECKING, Any, Generic
from typing import Any, Final, Generic

import faiss
import numpy as np
from pydantic import Field

from semantic_kernel.connectors.memory.in_memory.in_memory_collection import (
IN_MEMORY_SCORE_KEY,
InMemoryVectorCollection,
)
from semantic_kernel.connectors.ai.embedding_generator_base import EmbeddingGeneratorBase
from semantic_kernel.connectors.memory.in_memory import IN_MEMORY_SCORE_KEY, InMemoryCollection, InMemoryStore, TKey
from semantic_kernel.data.const import DistanceFunction, IndexKind
from semantic_kernel.data.record_definition import VectorStoreRecordDefinition, VectorStoreRecordVectorField
from semantic_kernel.data.text_search import KernelSearchResults
from semantic_kernel.data.vector_search import VectorSearchOptions, VectorSearchResult
from semantic_kernel.data.vector_storage import TKey, TModel, VectorStore
from semantic_kernel.exceptions import (
VectorStoreInitializationException,
VectorStoreOperationException,
)

if TYPE_CHECKING:
from semantic_kernel.data.vector_storage import VectorStoreRecordCollection

from semantic_kernel.data.vector_search import SearchType, VectorSearchOptions, VectorSearchResult
from semantic_kernel.data.vector_storage import TModel
from semantic_kernel.exceptions import VectorStoreInitializationException, VectorStoreOperationException
from semantic_kernel.exceptions.vector_store_exceptions import VectorStoreModelException

if sys.version_info >= (3, 12):
from typing import override # pragma: no cover
Expand All @@ -33,8 +25,40 @@

logger = logging.getLogger(__name__)

DISTANCE_FUNCTION_MAP: Final[dict[DistanceFunction, type[faiss.Index]]] = {
DistanceFunction.EUCLIDEAN_SQUARED_DISTANCE: faiss.IndexFlatL2,
DistanceFunction.DOT_PROD: faiss.IndexFlatIP,
DistanceFunction.DEFAULT: faiss.IndexFlatL2,
}
INDEX_KIND_MAP: Final[dict[IndexKind, bool]] = {
IndexKind.FLAT: True,
IndexKind.DEFAULT: True,
}


def _create_index(field: VectorStoreRecordVectorField) -> faiss.Index:
"""Create a Faiss index."""
if field.index_kind not in INDEX_KIND_MAP:
raise VectorStoreInitializationException(f"Index kind {field.index_kind} is not supported.")
if field.distance_function not in DISTANCE_FUNCTION_MAP:
raise VectorStoreInitializationException(f"Distance function {field.distance_function} is not supported.")
match field.index_kind:
case IndexKind.FLAT | IndexKind.DEFAULT:
match field.distance_function:
case DistanceFunction.EUCLIDEAN_SQUARED_DISTANCE | DistanceFunction.DEFAULT:
return faiss.IndexFlatL2(field.dimensions)
case DistanceFunction.DOT_PROD:
return faiss.IndexFlatIP(field.dimensions)
case _:
raise VectorStoreInitializationException(
f"Distance function {field.distance_function} is "
f"not supported for index kind {field.index_kind}."
)
case _:
raise VectorStoreInitializationException(f"Index with {field.index_kind} is not supported.")

class FaissCollection(InMemoryVectorCollection[TKey, TModel], Generic[TKey, TModel]):

class FaissCollection(InMemoryCollection[TKey, TModel], Generic[TKey, TModel]):
"""Create a Faiss collection.

The Faiss Collection builds on the InMemoryVectorCollection,
Expand All @@ -46,9 +70,10 @@ class FaissCollection(InMemoryVectorCollection[TKey, TModel], Generic[TKey, TMod

def __init__(
self,
collection_name: str,
data_model_type: type[TModel],
data_model_definition: VectorStoreRecordDefinition | None = None,
collection_name: str | None = None,
embedding_generator: EmbeddingGeneratorBase | None = None,
**kwargs: Any,
):
"""Create a Faiss Collection.
Expand All @@ -67,12 +92,14 @@ def __init__(
collection_name: The name of the collection.
data_model_type: The type of the data model.
data_model_definition: The definition of the data model.
embedding_generator: The embedding generator.
kwargs: Additional arguments.
"""
super().__init__(
data_model_type=data_model_type,
data_model_definition=data_model_definition,
collection_name=collection_name,
embedding_generator=embedding_generator,
**kwargs,
)

Expand Down Expand Up @@ -105,29 +132,10 @@ def _create_indexes(self, index: faiss.Index | None = None, indexes: dict[str, f
self.indexes_key_map.setdefault(vector_field.name, {})
continue
if vector_field.name not in self.indexes:
index = self._create_index(vector_field)
self.indexes[vector_field.name] = index
self.indexes[vector_field.name] = _create_index(vector_field)
if vector_field.name not in self.indexes_key_map:
self.indexes_key_map.setdefault(vector_field.name, {})

def _create_index(self, field: VectorStoreRecordVectorField) -> faiss.Index:
"""Create a Faiss index."""
index_kind = field.index_kind or IndexKind.FLAT
distance_function = field.distance_function or DistanceFunction.EUCLIDEAN_SQUARED_DISTANCE
match index_kind:
case IndexKind.FLAT:
match distance_function:
case DistanceFunction.EUCLIDEAN_SQUARED_DISTANCE:
return faiss.IndexFlatL2(field.dimensions)
case DistanceFunction.DOT_PROD:
return faiss.IndexFlatIP(field.dimensions)
case _:
raise VectorStoreInitializationException(
f"Distance function {distance_function} is not supported for index kind {index_kind}."
)
case _:
raise VectorStoreInitializationException(f"Index with {index_kind} is not supported.")

@override
async def create_collection(
self, index: faiss.Index | None = None, indexes: dict[str, faiss.Index] | None = None, **kwargs: Any
Expand All @@ -149,21 +157,21 @@ async def create_collection(
@override
async def _inner_upsert(self, records: Sequence[Any], **kwargs: Any) -> Sequence[TKey]:
"""Upsert records."""
for vector_field in self.data_model_definition.vector_field_names:
vectors_to_add = [record.get(vector_field) for record in records]
for vector_field in self.data_model_definition.vector_fields:
vectors_to_add = [record.get(vector_field.storage_property_name or vector_field.name) for record in records]
vectors = np.array(vectors_to_add, dtype=np.float32)
if not self.indexes[vector_field].is_trained:
if not self.indexes[vector_field.name].is_trained:
raise VectorStoreOperationException(
f"This index (of type {type(self.indexes[vector_field])}) requires training, "
f"This index (of type {type(self.indexes[vector_field.name])}) requires training, "
"which is not supported. To train the index, "
f"use <collection>.indexes[{vector_field}].train, "
f"use <collection>.indexes[{vector_field.name}].train, "
"see faiss docs for more details."
)
self.indexes[vector_field].add(vectors) # type: ignore[call-arg]
start = len(self.indexes_key_map[vector_field])
self.indexes[vector_field.name].add(vectors) # type: ignore
start = len(self.indexes_key_map[vector_field.name])
for i, record in enumerate(records):
key = record[self.data_model_definition.key_field.name]
self.indexes_key_map[vector_field][key] = start + i
self.indexes_key_map[vector_field.name][key] = start + i
return await super()._inner_upsert(records, **kwargs)

@override
Expand All @@ -189,57 +197,73 @@ async def delete_collection(self, **kwargs: Any) -> None:
async def does_collection_exist(self, **kwargs: Any) -> bool:
return bool(self.indexes)

async def _inner_search_vectorized(
@override
async def _inner_search(
self,
vector: list[float | int],
search_type: SearchType,
options: VectorSearchOptions,
values: Any | None = None,
vector: Sequence[float | int] | None = None,
**kwargs: Any,
) -> KernelSearchResults[VectorSearchResult[TModel]]:
field = options.vector_field_name or self.data_model_definition.vector_field_names[0]
assert isinstance(self.data_model_definition.fields.get(field), VectorStoreRecordVectorField) # nosec
if vector and field:
return_list = []
# since the vector index works independently of the record index,
# we will need to get all records that adhere to the filter first
filtered_records = self._get_filtered_records(options)
np_vector = np.array(vector, dtype=np.float32).reshape(1, -1)
# then do the actual vector search
distances, indexes = self.indexes[field].search(np_vector, min(options.top, self.indexes[field].ntotal)) # type: ignore[call-arg]
# we then iterate through the results, the order is the order of relevance
# (less or most distance, dependant on distance metric used)
for i, index in enumerate(indexes[0]):
key = list(self.indexes_key_map[field].keys())[index]
# if the key is not in the filtered records, we ignore it
if key not in filtered_records:
continue
filtered_records[key][IN_MEMORY_SCORE_KEY] = distances[0][i]
# so we return the list in the order of the search, with the record from the inner_storage.
return_list.append(filtered_records[key])
"""Inner search method."""
if not vector:
vector = await self._generate_vector_from_values(values, options)
field = self.data_model_definition.try_get_vector_field(options.vector_property_name)
if not field:
raise VectorStoreModelException(
f"Vector field '{options.vector_property_name}' not found in the data model definition."
)

return_list = []
filtered_records = self._get_filtered_records(options)
np_vector = np.array(vector, dtype=np.float32).reshape(1, -1)
# then do the actual vector search
distances, indexes = self.indexes[field.name].search(
np_vector, min(options.top, self.indexes[field.name].ntotal)
) # type: ignore[call-arg]
# we then iterate through the results, the order is the order of relevance
# (less or most distance, dependant on distance metric used)
for i, index in enumerate(indexes[0]):
key = list(self.indexes_key_map[field.name].keys())[index]
# if the key is not in the filtered records, we ignore it
if key not in filtered_records:
continue
filtered_records[key][IN_MEMORY_SCORE_KEY] = distances[0][i]
# so we return the list in the order of the search, with the record from the inner_storage.
return_list.append(filtered_records[key])
return KernelSearchResults(
results=self._get_vector_search_results_from_results(return_list, options),
total_count=len(return_list) if options and options.include_total_count else None,
)


class FaissStore(VectorStore):
class FaissStore(InMemoryStore):
"""Create a Faiss store."""

@override
async def list_collection_names(self, **kwargs) -> Sequence[str]:
return list(self.vector_record_collections.keys())
def __init__(
self,
embedding_generator: EmbeddingGeneratorBase | None = None,
**kwargs: Any,
):
"""Create a Faiss store."""
super().__init__(embedding_generator=embedding_generator, **kwargs)

@override
def get_collection(
self,
collection_name: str,
data_model_type: type[object],
data_model_definition=None,
**kwargs,
) -> "VectorStoreRecordCollection":
self.vector_record_collections[collection_name] = FaissCollection(
data_model_type: type[TModel],
*,
data_model_definition: VectorStoreRecordDefinition | None = None,
collection_name: str | None = None,
embedding_generator: EmbeddingGeneratorBase | None = None,
**kwargs: Any,
) -> FaissCollection:
"""Get a Faiss collection."""
return FaissCollection(
collection_name=collection_name,
data_model_type=data_model_type,
data_model_definition=data_model_definition,
embedding_generator=embedding_generator or self.embedding_generator,
**kwargs,
)
return self.vector_record_collections[collection_name]
Loading
Loading