diff --git a/python/semantic_kernel/connectors/memory/faiss.py b/python/semantic_kernel/connectors/memory/faiss.py index 2e109c234931..434a1299ec88 100644 --- a/python/semantic_kernel/connectors/memory/faiss.py +++ b/python/semantic_kernel/connectors/memory/faiss.py @@ -2,29 +2,21 @@ import logging import sys from collections.abc import MutableMapping, Sequence -from typing import TYPE_CHECKING, Any, Generic +from typing import Any, Final, Generic import faiss import numpy as np from pydantic import Field -from semantic_kernel.connectors.memory.in_memory.in_memory_collection import ( - IN_MEMORY_SCORE_KEY, - InMemoryVectorCollection, -) +from semantic_kernel.connectors.ai.embedding_generator_base import EmbeddingGeneratorBase +from semantic_kernel.connectors.memory.in_memory import IN_MEMORY_SCORE_KEY, InMemoryCollection, InMemoryStore, TKey from semantic_kernel.data.const import DistanceFunction, IndexKind from semantic_kernel.data.record_definition import VectorStoreRecordDefinition, VectorStoreRecordVectorField from semantic_kernel.data.text_search import KernelSearchResults -from semantic_kernel.data.vector_search import VectorSearchOptions, VectorSearchResult -from semantic_kernel.data.vector_storage import TKey, TModel, VectorStore -from semantic_kernel.exceptions import ( - VectorStoreInitializationException, - VectorStoreOperationException, -) - -if TYPE_CHECKING: - from semantic_kernel.data.vector_storage import VectorStoreRecordCollection - +from semantic_kernel.data.vector_search import SearchType, VectorSearchOptions, VectorSearchResult +from semantic_kernel.data.vector_storage import TModel +from semantic_kernel.exceptions import VectorStoreInitializationException, VectorStoreOperationException +from semantic_kernel.exceptions.vector_store_exceptions import VectorStoreModelException if sys.version_info >= (3, 12): from typing import override # pragma: no cover @@ -33,8 +25,40 @@ logger = logging.getLogger(__name__) +DISTANCE_FUNCTION_MAP: Final[dict[DistanceFunction, type[faiss.Index]]] = { + DistanceFunction.EUCLIDEAN_SQUARED_DISTANCE: faiss.IndexFlatL2, + DistanceFunction.DOT_PROD: faiss.IndexFlatIP, + DistanceFunction.DEFAULT: faiss.IndexFlatL2, +} +INDEX_KIND_MAP: Final[dict[IndexKind, bool]] = { + IndexKind.FLAT: True, + IndexKind.DEFAULT: True, +} + + +def _create_index(field: VectorStoreRecordVectorField) -> faiss.Index: + """Create a Faiss index.""" + if field.index_kind not in INDEX_KIND_MAP: + raise VectorStoreInitializationException(f"Index kind {field.index_kind} is not supported.") + if field.distance_function not in DISTANCE_FUNCTION_MAP: + raise VectorStoreInitializationException(f"Distance function {field.distance_function} is not supported.") + match field.index_kind: + case IndexKind.FLAT | IndexKind.DEFAULT: + match field.distance_function: + case DistanceFunction.EUCLIDEAN_SQUARED_DISTANCE | DistanceFunction.DEFAULT: + return faiss.IndexFlatL2(field.dimensions) + case DistanceFunction.DOT_PROD: + return faiss.IndexFlatIP(field.dimensions) + case _: + raise VectorStoreInitializationException( + f"Distance function {field.distance_function} is " + f"not supported for index kind {field.index_kind}." + ) + case _: + raise VectorStoreInitializationException(f"Index with {field.index_kind} is not supported.") -class FaissCollection(InMemoryVectorCollection[TKey, TModel], Generic[TKey, TModel]): + +class FaissCollection(InMemoryCollection[TKey, TModel], Generic[TKey, TModel]): """Create a Faiss collection. The Faiss Collection builds on the InMemoryVectorCollection, @@ -46,9 +70,10 @@ class FaissCollection(InMemoryVectorCollection[TKey, TModel], Generic[TKey, TMod def __init__( self, - collection_name: str, data_model_type: type[TModel], data_model_definition: VectorStoreRecordDefinition | None = None, + collection_name: str | None = None, + embedding_generator: EmbeddingGeneratorBase | None = None, **kwargs: Any, ): """Create a Faiss Collection. @@ -67,12 +92,14 @@ def __init__( collection_name: The name of the collection. data_model_type: The type of the data model. data_model_definition: The definition of the data model. + embedding_generator: The embedding generator. kwargs: Additional arguments. """ super().__init__( data_model_type=data_model_type, data_model_definition=data_model_definition, collection_name=collection_name, + embedding_generator=embedding_generator, **kwargs, ) @@ -105,29 +132,10 @@ def _create_indexes(self, index: faiss.Index | None = None, indexes: dict[str, f self.indexes_key_map.setdefault(vector_field.name, {}) continue if vector_field.name not in self.indexes: - index = self._create_index(vector_field) - self.indexes[vector_field.name] = index + self.indexes[vector_field.name] = _create_index(vector_field) if vector_field.name not in self.indexes_key_map: self.indexes_key_map.setdefault(vector_field.name, {}) - def _create_index(self, field: VectorStoreRecordVectorField) -> faiss.Index: - """Create a Faiss index.""" - index_kind = field.index_kind or IndexKind.FLAT - distance_function = field.distance_function or DistanceFunction.EUCLIDEAN_SQUARED_DISTANCE - match index_kind: - case IndexKind.FLAT: - match distance_function: - case DistanceFunction.EUCLIDEAN_SQUARED_DISTANCE: - return faiss.IndexFlatL2(field.dimensions) - case DistanceFunction.DOT_PROD: - return faiss.IndexFlatIP(field.dimensions) - case _: - raise VectorStoreInitializationException( - f"Distance function {distance_function} is not supported for index kind {index_kind}." - ) - case _: - raise VectorStoreInitializationException(f"Index with {index_kind} is not supported.") - @override async def create_collection( self, index: faiss.Index | None = None, indexes: dict[str, faiss.Index] | None = None, **kwargs: Any @@ -149,21 +157,21 @@ async def create_collection( @override async def _inner_upsert(self, records: Sequence[Any], **kwargs: Any) -> Sequence[TKey]: """Upsert records.""" - for vector_field in self.data_model_definition.vector_field_names: - vectors_to_add = [record.get(vector_field) for record in records] + for vector_field in self.data_model_definition.vector_fields: + vectors_to_add = [record.get(vector_field.storage_property_name or vector_field.name) for record in records] vectors = np.array(vectors_to_add, dtype=np.float32) - if not self.indexes[vector_field].is_trained: + if not self.indexes[vector_field.name].is_trained: raise VectorStoreOperationException( - f"This index (of type {type(self.indexes[vector_field])}) requires training, " + f"This index (of type {type(self.indexes[vector_field.name])}) requires training, " "which is not supported. To train the index, " - f"use .indexes[{vector_field}].train, " + f"use .indexes[{vector_field.name}].train, " "see faiss docs for more details." ) - self.indexes[vector_field].add(vectors) # type: ignore[call-arg] - start = len(self.indexes_key_map[vector_field]) + self.indexes[vector_field.name].add(vectors) # type: ignore + start = len(self.indexes_key_map[vector_field.name]) for i, record in enumerate(records): key = record[self.data_model_definition.key_field.name] - self.indexes_key_map[vector_field][key] = start + i + self.indexes_key_map[vector_field.name][key] = start + i return await super()._inner_upsert(records, **kwargs) @override @@ -189,57 +197,73 @@ async def delete_collection(self, **kwargs: Any) -> None: async def does_collection_exist(self, **kwargs: Any) -> bool: return bool(self.indexes) - async def _inner_search_vectorized( + @override + async def _inner_search( self, - vector: list[float | int], + search_type: SearchType, options: VectorSearchOptions, + values: Any | None = None, + vector: Sequence[float | int] | None = None, **kwargs: Any, ) -> KernelSearchResults[VectorSearchResult[TModel]]: - field = options.vector_field_name or self.data_model_definition.vector_field_names[0] - assert isinstance(self.data_model_definition.fields.get(field), VectorStoreRecordVectorField) # nosec - if vector and field: - return_list = [] - # since the vector index works independently of the record index, - # we will need to get all records that adhere to the filter first - filtered_records = self._get_filtered_records(options) - np_vector = np.array(vector, dtype=np.float32).reshape(1, -1) - # then do the actual vector search - distances, indexes = self.indexes[field].search(np_vector, min(options.top, self.indexes[field].ntotal)) # type: ignore[call-arg] - # we then iterate through the results, the order is the order of relevance - # (less or most distance, dependant on distance metric used) - for i, index in enumerate(indexes[0]): - key = list(self.indexes_key_map[field].keys())[index] - # if the key is not in the filtered records, we ignore it - if key not in filtered_records: - continue - filtered_records[key][IN_MEMORY_SCORE_KEY] = distances[0][i] - # so we return the list in the order of the search, with the record from the inner_storage. - return_list.append(filtered_records[key]) + """Inner search method.""" + if not vector: + vector = await self._generate_vector_from_values(values, options) + field = self.data_model_definition.try_get_vector_field(options.vector_property_name) + if not field: + raise VectorStoreModelException( + f"Vector field '{options.vector_property_name}' not found in the data model definition." + ) + + return_list = [] + filtered_records = self._get_filtered_records(options) + np_vector = np.array(vector, dtype=np.float32).reshape(1, -1) + # then do the actual vector search + distances, indexes = self.indexes[field.name].search( + np_vector, min(options.top, self.indexes[field.name].ntotal) + ) # type: ignore[call-arg] + # we then iterate through the results, the order is the order of relevance + # (less or most distance, dependant on distance metric used) + for i, index in enumerate(indexes[0]): + key = list(self.indexes_key_map[field.name].keys())[index] + # if the key is not in the filtered records, we ignore it + if key not in filtered_records: + continue + filtered_records[key][IN_MEMORY_SCORE_KEY] = distances[0][i] + # so we return the list in the order of the search, with the record from the inner_storage. + return_list.append(filtered_records[key]) return KernelSearchResults( results=self._get_vector_search_results_from_results(return_list, options), total_count=len(return_list) if options and options.include_total_count else None, ) -class FaissStore(VectorStore): +class FaissStore(InMemoryStore): """Create a Faiss store.""" - @override - async def list_collection_names(self, **kwargs) -> Sequence[str]: - return list(self.vector_record_collections.keys()) + def __init__( + self, + embedding_generator: EmbeddingGeneratorBase | None = None, + **kwargs: Any, + ): + """Create a Faiss store.""" + super().__init__(embedding_generator=embedding_generator, **kwargs) @override def get_collection( self, - collection_name: str, - data_model_type: type[object], - data_model_definition=None, - **kwargs, - ) -> "VectorStoreRecordCollection": - self.vector_record_collections[collection_name] = FaissCollection( + data_model_type: type[TModel], + *, + data_model_definition: VectorStoreRecordDefinition | None = None, + collection_name: str | None = None, + embedding_generator: EmbeddingGeneratorBase | None = None, + **kwargs: Any, + ) -> FaissCollection: + """Get a Faiss collection.""" + return FaissCollection( collection_name=collection_name, data_model_type=data_model_type, data_model_definition=data_model_definition, + embedding_generator=embedding_generator or self.embedding_generator, **kwargs, ) - return self.vector_record_collections[collection_name] diff --git a/python/semantic_kernel/connectors/memory/in_memory.py b/python/semantic_kernel/connectors/memory/in_memory.py new file mode 100644 index 000000000000..9740ae0d5e02 --- /dev/null +++ b/python/semantic_kernel/connectors/memory/in_memory.py @@ -0,0 +1,331 @@ +# Copyright (c) Microsoft. All rights reserved. + +import ast +import sys +from collections.abc import AsyncIterable, Callable, Sequence +from typing import Any, ClassVar, Final, Generic, TypeVar + +from numpy import dot +from pydantic import Field +from scipy.spatial.distance import cityblock, cosine, euclidean, hamming, sqeuclidean +from typing_extensions import override + +from semantic_kernel.connectors.ai.embedding_generator_base import EmbeddingGeneratorBase +from semantic_kernel.data.const import DISTANCE_FUNCTION_DIRECTION_HELPER, DistanceFunction +from semantic_kernel.data.record_definition import VectorStoreRecordDefinition +from semantic_kernel.data.text_search import KernelSearchResults +from semantic_kernel.data.vector_search import SearchType, VectorSearch, VectorSearchOptions, VectorSearchResult +from semantic_kernel.data.vector_storage import ( + GetFilteredRecordOptions, + TModel, + VectorStore, + VectorStoreRecordCollection, +) +from semantic_kernel.exceptions import VectorSearchExecutionException, VectorStoreModelValidationError +from semantic_kernel.exceptions.vector_store_exceptions import VectorStoreModelException, VectorStoreOperationException +from semantic_kernel.kernel_types import OneOrMany +from semantic_kernel.utils.feature_stage_decorator import release_candidate +from semantic_kernel.utils.list_handler import empty_generator + +if sys.version_info >= (3, 12): + from typing import override # pragma: no cover +else: + from typing_extensions import override # pragma: no cover + +TKey = TypeVar("TKey", bound=str | int | float) + +IN_MEMORY_SCORE_KEY: Final[str] = "in_memory_search_score" +DISTANCE_FUNCTION_MAP: Final[dict[DistanceFunction | str, Callable[..., Any]]] = { + DistanceFunction.COSINE_DISTANCE: cosine, + DistanceFunction.COSINE_SIMILARITY: cosine, + DistanceFunction.EUCLIDEAN_DISTANCE: euclidean, + DistanceFunction.EUCLIDEAN_SQUARED_DISTANCE: sqeuclidean, + DistanceFunction.MANHATTAN: cityblock, + DistanceFunction.HAMMING: hamming, + DistanceFunction.DOT_PROD: dot, + DistanceFunction.DEFAULT: cosine, +} + + +TAKey = TypeVar("TAKey", bound=str) +TAValue = TypeVar("TAValue", bound=str | int | float | list[float] | None) + + +class AttributeDict(dict[TAKey, TAValue], Generic[TAKey, TAValue]): + """A dict subclass that allows attribute access to keys. + + This is used to allow the filters to work either way, using: + - `lambda x: x.key == 'id'` or `lambda x: x['key'] == 'id'` + """ + + def __getattr__(self, name) -> TAValue: + """Allow attribute-style access to dict keys.""" + try: + return self[name] + except KeyError: + raise AttributeError(name) + + def __setattr__(self, name, value) -> None: + """Allow setting dict keys via attribute access.""" + self[name] = value + + def __delattr__(self, name) -> None: + """Allow deleting dict keys via attribute access.""" + try: + del self[name] + except KeyError: + raise AttributeError(name) + + +class InMemoryCollection( + VectorStoreRecordCollection[TKey, TModel], + VectorSearch[TKey, TModel], + Generic[TKey, TModel], +): + """In Memory Collection.""" + + inner_storage: dict[TKey, AttributeDict] = Field(default_factory=dict) + supported_key_types: ClassVar[set[str] | None] = {"str", "int", "float"} + supported_search_types: ClassVar[set[SearchType]] = {SearchType.VECTOR} + + def __init__( + self, + data_model_type: type[TModel], + data_model_definition: VectorStoreRecordDefinition | None = None, + collection_name: str | None = None, + embedding_generator: EmbeddingGeneratorBase | None = None, + **kwargs: Any, + ): + """Create a In Memory Collection.""" + super().__init__( + data_model_type=data_model_type, + data_model_definition=data_model_definition, + collection_name=collection_name, + embedding_generator=embedding_generator, + **kwargs, + ) + + def _validate_data_model(self): + """Check if the In Memory Score key is not used.""" + super()._validate_data_model() + if IN_MEMORY_SCORE_KEY in self.data_model_definition.field_names: + raise VectorStoreModelValidationError(f"Field name '{IN_MEMORY_SCORE_KEY}' is reserved for internal use.") + + @override + async def _inner_delete(self, keys: Sequence[TKey], **kwargs: Any) -> None: + for key in keys: + self.inner_storage.pop(key, None) + + @override + async def _inner_get( + self, keys: Sequence[TKey] | None = None, options: GetFilteredRecordOptions | None = None, **kwargs: Any + ) -> Any | OneOrMany[TModel] | None: + if not keys: + if options is not None: + raise NotImplementedError("Get without keys is not yet implemented.") + return None + return [self.inner_storage[key] for key in keys if key in self.inner_storage] + + @override + async def _inner_upsert(self, records: Sequence[Any], **kwargs: Any) -> Sequence[TKey]: + updated_keys = [] + for record in records: + record = AttributeDict(record) + self.inner_storage[record[self._key_field_name]] = record + updated_keys.append(record[self._key_field_name]) + return updated_keys + + def _deserialize_store_models_to_dicts(self, records: Sequence[Any], **kwargs: Any) -> Sequence[dict[str, Any]]: + return records + + def _serialize_dicts_to_store_models(self, records: Sequence[dict[str, Any]], **kwargs: Any) -> Sequence[Any]: + return records + + @override + async def create_collection(self, **kwargs: Any) -> None: + pass + + @override + async def delete_collection(self, **kwargs: Any) -> None: + self.inner_storage = {} + + @override + async def does_collection_exist(self, **kwargs: Any) -> bool: + return True + + @override + async def _inner_search( + self, + search_type: SearchType, + options: VectorSearchOptions, + values: Any | None = None, + vector: Sequence[float | int] | None = None, + **kwargs: Any, + ) -> KernelSearchResults[VectorSearchResult[TModel]]: + """Inner search method.""" + if not vector: + vector = await self._generate_vector_from_values(values, options) + return_records: dict[TKey, float] = {} + field = self.data_model_definition.try_get_vector_field(options.vector_property_name) + if not field: + raise VectorStoreModelException( + f"Vector field '{options.vector_property_name}' not found in the data model definition." + ) + if field.distance_function not in DISTANCE_FUNCTION_MAP: + raise VectorSearchExecutionException( + f"Distance function '{field.distance_function}' is not supported. " + f"Supported functions are: {list(DISTANCE_FUNCTION_MAP.keys())}" + ) + distance_func = DISTANCE_FUNCTION_MAP[field.distance_function] + + for key, record in self._get_filtered_records(options).items(): + if vector and field is not None: + return_records[key] = self._calculate_vector_similarity( + vector, + record[field.storage_property_name or field.name], + distance_func, + invert_score=field.distance_function == DistanceFunction.COSINE_SIMILARITY, + ) + if field.distance_function == DistanceFunction.DEFAULT: + reverse_func = DISTANCE_FUNCTION_DIRECTION_HELPER[DistanceFunction.COSINE_DISTANCE] + else: + reverse_func = DISTANCE_FUNCTION_DIRECTION_HELPER[field.distance_function] + sorted_records = dict( + sorted( + return_records.items(), + key=lambda item: item[1], + reverse=reverse_func(1, 0), + ) + ) + if sorted_records: + return KernelSearchResults( + results=self._get_vector_search_results_from_results( + self._generate_return_list(sorted_records, options), options + ), + total_count=len(return_records) if options and options.include_total_count else None, + ) + return KernelSearchResults(results=empty_generator()) + + async def _generate_return_list( + self, return_records: dict[TKey, float], options: VectorSearchOptions | None + ) -> AsyncIterable[dict]: + top = 3 if not options else options.top + skip = 0 if not options else options.skip + returned = 0 + for idx, key in enumerate(return_records.keys()): + if idx >= skip: + returned += 1 + rec = self.inner_storage[key] + rec[IN_MEMORY_SCORE_KEY] = return_records[key] + yield rec + if returned >= top: + break + + def _get_filtered_records(self, options: VectorSearchOptions) -> dict[TKey, AttributeDict]: + if not options.filter: + return self.inner_storage + try: + callable_filters = [ + self._parse_and_validate_filter(filter) if isinstance(filter, str) else filter + for filter in ([options.filter] if not isinstance(options.filter, list) else options.filter) + ] + except Exception as e: + raise VectorStoreOperationException(f"Error evaluating filter: {e}") from e + filtered_records: dict[TKey, AttributeDict] = {} + for key, record in self.inner_storage.items(): + for filter in callable_filters: + if self._run_filter(filter, record): + filtered_records[key] = record + return filtered_records + + def _parse_and_validate_filter(self, filter_str: str) -> Callable: + """Parse and validate a string filter as a lambda expression, then return the callable.""" + forbidden_names = {"__import__", "open", "eval", "exec", "__builtins__"} + try: + tree = ast.parse(filter_str, mode="eval") + except SyntaxError as e: + raise VectorStoreOperationException(f"Filter string is not valid Python: {e}") from e + # Only allow lambda expressions + if not (isinstance(tree, ast.Expression) and isinstance(tree.body, ast.Lambda)): + raise VectorStoreOperationException( + "Filter string must be a lambda expression, e.g. 'lambda x: x.key == 1'" + ) + # Walk the AST to look for forbidden names and attribute access + for node in ast.walk(tree): + if isinstance(node, ast.Name) and node.id in forbidden_names: + raise VectorStoreOperationException(f"Use of '{node.id}' is not allowed in filter expressions.") + if isinstance(node, ast.Attribute) and node.attr in forbidden_names: + raise VectorStoreOperationException(f"Use of '{node.attr}' is not allowed in filter expressions.") + try: + code = compile(tree, filename="", mode="eval") + func = eval(code, {"__builtins__": {}}, {}) # nosec + except Exception as e: + raise VectorStoreOperationException(f"Error compiling filter: {e}") from e + if not callable(func): + raise VectorStoreOperationException("Compiled filter is not callable.") + return func + + def _run_filter(self, filter: Callable, record: AttributeDict[TAKey, TAValue]) -> bool: + """Run the filter on the record, supporting attribute access.""" + try: + return filter(record) + except Exception as e: + raise VectorStoreOperationException(f"Error running filter: {e}") from e + + @override + def _lambda_parser(self, node: ast.AST) -> Any: + """Not used by InMemoryCollection, but required by the interface.""" + pass + + def _calculate_vector_similarity( + self, + search_vector: Sequence[float | int], + record_vector: Sequence[float | int], + distance_func: Callable, + invert_score: bool = False, + ) -> float: + calc = distance_func(record_vector, search_vector) + if invert_score: + return 1.0 - float(calc) + return float(calc) + + def _get_record_from_result(self, result: Any) -> Any: + return result + + def _get_score_from_result(self, result: Any) -> float | None: + return result.get(IN_MEMORY_SCORE_KEY) + + +@release_candidate +class InMemoryStore(VectorStore): + """Create a In Memory Vector Store.""" + + def __init__( + self, + embedding_generator: EmbeddingGeneratorBase | None = None, + **kwargs: Any, + ): + """Create a In Memory Vector Store.""" + super().__init__(embedding_generator=embedding_generator, **kwargs) + + @override + async def list_collection_names(self, **kwargs) -> Sequence[str]: + return [] + + @override + def get_collection( + self, + data_model_type: type[TModel], + *, + data_model_definition: VectorStoreRecordDefinition | None = None, + collection_name: str | None = None, + embedding_generator: EmbeddingGeneratorBase | None = None, + **kwargs: Any, + ) -> InMemoryCollection: + """Get a collection.""" + return InMemoryCollection( + data_model_type=data_model_type, + data_model_definition=data_model_definition, + collection_name=collection_name, + embedding_generator=embedding_generator or self.embedding_generator, + ) diff --git a/python/semantic_kernel/connectors/memory/in_memory/__init__.py b/python/semantic_kernel/connectors/memory/in_memory/__init__.py deleted file mode 100644 index 5e4f8f93e00b..000000000000 --- a/python/semantic_kernel/connectors/memory/in_memory/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# Copyright (c) Microsoft. All rights reserved. - -from semantic_kernel.connectors.memory.in_memory.in_memory_collection import InMemoryVectorCollection -from semantic_kernel.connectors.memory.in_memory.in_memory_store import InMemoryVectorStore - -__all__ = ["InMemoryVectorCollection", "InMemoryVectorStore"] diff --git a/python/semantic_kernel/connectors/memory/in_memory/const.py b/python/semantic_kernel/connectors/memory/in_memory/const.py deleted file mode 100644 index 24fd25d7568c..000000000000 --- a/python/semantic_kernel/connectors/memory/in_memory/const.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (c) Microsoft. All rights reserved. - - -from collections.abc import Callable -from typing import Any - -from numpy import dot -from scipy.spatial.distance import cityblock, cosine, euclidean, hamming, sqeuclidean - -from semantic_kernel.data.const import DistanceFunction - -DISTANCE_FUNCTION_MAP: dict[DistanceFunction | str, Callable[..., Any]] = { - DistanceFunction.COSINE_DISTANCE: cosine, - DistanceFunction.COSINE_SIMILARITY: cosine, - DistanceFunction.EUCLIDEAN_DISTANCE: euclidean, - DistanceFunction.EUCLIDEAN_SQUARED_DISTANCE: sqeuclidean, - DistanceFunction.MANHATTAN: cityblock, - DistanceFunction.HAMMING: hamming, - DistanceFunction.DOT_PROD: dot, -} diff --git a/python/semantic_kernel/connectors/memory/in_memory/in_memory_collection.py b/python/semantic_kernel/connectors/memory/in_memory/in_memory_collection.py deleted file mode 100644 index adb49eee4224..000000000000 --- a/python/semantic_kernel/connectors/memory/in_memory/in_memory_collection.py +++ /dev/null @@ -1,242 +0,0 @@ -# Copyright (c) Microsoft. All rights reserved. - -import sys -from collections.abc import AsyncIterable, Callable, Mapping, Sequence -from typing import Any, ClassVar, Generic - -from pydantic import Field - -from semantic_kernel.connectors.memory.in_memory.const import DISTANCE_FUNCTION_MAP -from semantic_kernel.data.const import DISTANCE_FUNCTION_DIRECTION_HELPER, DistanceFunction -from semantic_kernel.data.record_definition import ( - VectorStoreRecordDefinition, - VectorStoreRecordVectorField, -) -from semantic_kernel.data.text_search import AnyTagsEqualTo, EqualTo, FilterClauseBase, KernelSearchResults -from semantic_kernel.data.vector_search import ( - VectorizedSearchMixin, - VectorSearchOptions, - VectorSearchResult, - VectorTextSearchMixin, -) -from semantic_kernel.data.vector_storage import TKey, TModel, VectorStoreRecordCollection -from semantic_kernel.exceptions import VectorSearchExecutionException, VectorStoreModelValidationError -from semantic_kernel.kernel_types import OneOrMany -from semantic_kernel.utils.list_handler import empty_generator - -if sys.version_info >= (3, 12): - from typing import override # pragma: no cover -else: - from typing_extensions import override # pragma: no cover - - -IN_MEMORY_SCORE_KEY = "in_memory_search_score" - - -class InMemoryVectorCollection( - VectorStoreRecordCollection[TKey, TModel], - VectorTextSearchMixin[TKey, TModel], - VectorizedSearchMixin[TKey, TModel], - Generic[TKey, TModel], -): - """In Memory Collection.""" - - inner_storage: dict[TKey, dict] = Field(default_factory=dict) - supported_key_types: ClassVar[list[str] | None] = ["str", "int", "float"] - - def __init__( - self, - collection_name: str, - data_model_type: type[TModel], - data_model_definition: VectorStoreRecordDefinition | None = None, - **kwargs: Any, - ): - """Create a In Memory Collection.""" - super().__init__( - data_model_type=data_model_type, - data_model_definition=data_model_definition, - collection_name=collection_name, - **kwargs, - ) - - def _validate_data_model(self): - """Check if the In Memory Score key is not used.""" - super()._validate_data_model() - if IN_MEMORY_SCORE_KEY in self.data_model_definition.field_names: - raise VectorStoreModelValidationError(f"Field name '{IN_MEMORY_SCORE_KEY}' is reserved for internal use.") - - @override - async def _inner_delete(self, keys: Sequence[TKey], **kwargs: Any) -> None: - for key in keys: - self.inner_storage.pop(key, None) - - @override - async def _inner_get(self, keys: Sequence[TKey], **kwargs: Any) -> Any | OneOrMany[TModel] | None: - return [self.inner_storage[key] for key in keys if key in self.inner_storage] - - @override - async def _inner_upsert(self, records: Sequence[Any], **kwargs: Any) -> Sequence[TKey]: - updated_keys = [] - for record in records: - key = record[self._key_field_name] if isinstance(record, Mapping) else getattr(record, self._key_field_name) - self.inner_storage[key] = record - updated_keys.append(key) - return updated_keys - - def _deserialize_store_models_to_dicts(self, records: Sequence[Any], **kwargs: Any) -> Sequence[dict[str, Any]]: - return records - - def _serialize_dicts_to_store_models(self, records: Sequence[dict[str, Any]], **kwargs: Any) -> Sequence[Any]: - return records - - @override - async def create_collection(self, **kwargs: Any) -> None: - pass - - @override - async def delete_collection(self, **kwargs: Any) -> None: - self.inner_storage = {} - - @override - async def does_collection_exist(self, **kwargs: Any) -> bool: - return True - - @override - async def _inner_search( - self, - options: VectorSearchOptions | None = None, - search_text: str | None = None, - vectorizable_text: str | None = None, - vector: list[float | int] | None = None, - **kwargs: Any, - ) -> KernelSearchResults[VectorSearchResult[TModel]]: - """Inner search method.""" - if search_text: - return await self._inner_search_text(search_text, options, **kwargs) - if vector: - if not options: - raise VectorSearchExecutionException("Options must be provided for vector search.") - return await self._inner_search_vectorized(vector, options, **kwargs) - raise VectorSearchExecutionException("Search text or vector must be provided.") - - async def _inner_search_text( - self, - search_text: str, - options: VectorSearchOptions | None = None, - **kwargs: Any, - ) -> KernelSearchResults[VectorSearchResult[TModel]]: - """Inner search method.""" - return_records: dict[TKey, float] = {} - for key, record in self._get_filtered_records(options).items(): - if self._should_add_text_search(search_text, record): - return_records[key] = 1.0 - if return_records: - return KernelSearchResults( - results=self._get_vector_search_results_from_results( - self._generate_return_list(return_records, options), options - ), - total_count=len(return_records) if options and options.include_total_count else None, - ) - return KernelSearchResults(results=None) - - async def _inner_search_vectorized( - self, - vector: list[float | int], - options: VectorSearchOptions, - **kwargs: Any, - ) -> KernelSearchResults[VectorSearchResult[TModel]]: - return_records: dict[TKey, float] = {} - field = options.vector_field_name or self.data_model_definition.vector_field_names[0] - assert isinstance(self.data_model_definition.fields.get(field), VectorStoreRecordVectorField) # nosec - distance_metric = ( - self.data_model_definition.fields.get(field).distance_function # type: ignore - or DistanceFunction.COSINE_DISTANCE - ) - distance_func = DISTANCE_FUNCTION_MAP[distance_metric] - - for key, record in self._get_filtered_records(options).items(): - if vector and field is not None: - return_records[key] = self._calculate_vector_similarity( - vector, - record[field], - distance_func, - invert_score=distance_metric == DistanceFunction.COSINE_SIMILARITY, - ) - sorted_records = dict( - sorted( - return_records.items(), - key=lambda item: item[1], - reverse=DISTANCE_FUNCTION_DIRECTION_HELPER[distance_metric](1, 0), - ) - ) - if sorted_records: - return KernelSearchResults( - results=self._get_vector_search_results_from_results( - self._generate_return_list(sorted_records, options), options - ), - total_count=len(return_records) if options and options.include_total_count else None, - ) - return KernelSearchResults(results=empty_generator()) - - async def _generate_return_list( - self, return_records: dict[TKey, float], options: VectorSearchOptions | None - ) -> AsyncIterable[dict]: - top = 3 if not options else options.top - skip = 0 if not options else options.skip - returned = 0 - for idx, key in enumerate(return_records.keys()): - if idx >= skip: - returned += 1 - rec = self.inner_storage[key] - rec[IN_MEMORY_SCORE_KEY] = return_records[key] - yield rec - if returned >= top: - break - - def _get_filtered_records(self, options: VectorSearchOptions | None) -> dict[TKey, dict]: - if options and options.filter: - for filter in options.filter.filters: - return {key: record for key, record in self.inner_storage.items() if self._apply_filter(record, filter)} - return self.inner_storage - - def _should_add_text_search(self, search_text: str, record: dict) -> bool: - for field in self.data_model_definition.fields.values(): - if not isinstance(field, VectorStoreRecordVectorField) and search_text in record.get(field.name, ""): - return True - return False - - def _calculate_vector_similarity( - self, - search_vector: list[float | int], - record_vector: list[float | int], - distance_func: Callable, - invert_score: bool = False, - ) -> float: - calc = distance_func(record_vector, search_vector) - if invert_score: - return 1.0 - float(calc) - return float(calc) - - @staticmethod - def _apply_filter(record: dict[str, Any], filter: FilterClauseBase) -> bool: - match filter: - case EqualTo(): - value = record.get(filter.field_name) - if not value: - return False - return value.lower() == filter.value.lower() - case AnyTagsEqualTo(): - tag_list = record.get(filter.field_name) - if not tag_list: - return False - if not isinstance(tag_list, list): - tag_list = [tag_list] - return filter.value in tag_list - case _: - return True - - def _get_record_from_result(self, result: Any) -> Any: - return result - - def _get_score_from_result(self, result: Any) -> float | None: - return result.get(IN_MEMORY_SCORE_KEY) diff --git a/python/semantic_kernel/connectors/memory/in_memory/in_memory_store.py b/python/semantic_kernel/connectors/memory/in_memory/in_memory_store.py deleted file mode 100644 index 291f0694b510..000000000000 --- a/python/semantic_kernel/connectors/memory/in_memory/in_memory_store.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright (c) Microsoft. All rights reserved. - -import logging -import sys -from collections.abc import Sequence -from typing import Any, TypeVar - -from semantic_kernel.connectors.memory.in_memory.in_memory_collection import InMemoryVectorCollection -from semantic_kernel.data import VectorStore, VectorStoreRecordCollection, VectorStoreRecordDefinition -from semantic_kernel.utils.feature_stage_decorator import experimental - -if sys.version_info >= (3, 12): - from typing import override # pragma: no cover -else: - from typing_extensions import override # pragma: no cover - -logger: logging.Logger = logging.getLogger(__name__) - -TModel = TypeVar("TModel") - - -@experimental -class InMemoryVectorStore(VectorStore): - """Create a In Memory Vector Store.""" - - @override - async def list_collection_names(self, **kwargs) -> Sequence[str]: - return list(self.vector_record_collections.keys()) - - @override - def get_collection( - self, - collection_name: str, - data_model_type: type[TModel], - data_model_definition: VectorStoreRecordDefinition | None = None, - **kwargs: Any, - ) -> "VectorStoreRecordCollection": - if collection_name not in self.vector_record_collections: - self.vector_record_collections[collection_name] = InMemoryVectorCollection( - data_model_type=data_model_type, - data_model_definition=data_model_definition, - collection_name=collection_name, - ) - return self.vector_record_collections[collection_name] diff --git a/python/tests/unit/connectors/memory/test_faiss.py b/python/tests/unit/connectors/memory/test_faiss.py new file mode 100644 index 000000000000..155979240723 --- /dev/null +++ b/python/tests/unit/connectors/memory/test_faiss.py @@ -0,0 +1,191 @@ +# Copyright (c) Microsoft. All rights reserved. + +import faiss +from pytest import fixture, mark, raises + +from semantic_kernel.connectors.memory.faiss import FaissCollection, FaissStore +from semantic_kernel.data import ( + VectorStoreRecordDataField, + VectorStoreRecordDefinition, + VectorStoreRecordKeyField, + VectorStoreRecordVectorField, +) +from semantic_kernel.data.const import DistanceFunction +from semantic_kernel.exceptions import VectorStoreInitializationException + + +@fixture(scope="function") +def data_model_def() -> VectorStoreRecordDefinition: + return VectorStoreRecordDefinition( + fields=[ + VectorStoreRecordKeyField(name="id"), + VectorStoreRecordDataField(name="content"), + VectorStoreRecordVectorField( + name="vector", + dimensions=5, + index_kind="flat", + distance_function="dot_prod", + property_type="float", + ), + ] + ) + + +@fixture(scope="function") +def store() -> FaissStore: + return FaissStore() + + +@fixture(scope="function") +def faiss_collection(data_model_def): + return FaissCollection(data_model_type=dict, data_model_definition=data_model_def, collection_name="test") + + +async def test_store_get_collection(store, data_model_def): + collection = store.get_collection(dict, data_model_definition=data_model_def, collection_name="test") + assert collection.collection_name == "test" + assert collection.data_model_type is dict + assert collection.data_model_definition == data_model_def + assert collection.inner_storage == {} + + +@mark.parametrize( + "dist", + [ + DistanceFunction.EUCLIDEAN_SQUARED_DISTANCE, + DistanceFunction.DOT_PROD, + ], +) +async def test_create_collection(store, data_model_def, dist): + for field in data_model_def.fields: + if field.name == "vector": + field.distance_function = dist + collection = store.get_collection( + collection_name="test", data_model_type=dict, data_model_definition=data_model_def + ) + await collection.create_collection() + assert collection.inner_storage == {} + assert collection.indexes + assert collection.indexes["vector"] is not None + + +async def test_create_collection_incompatible_dist(store, data_model_def): + for field in data_model_def.fields: + if field.name == "vector": + field.distance_function = "cosine_distance" + collection = store.get_collection( + collection_name="test", data_model_type=dict, data_model_definition=data_model_def + ) + with raises(VectorStoreInitializationException): + await collection.create_collection() + + +async def test_create_collection_custom(store, data_model_def): + index = faiss.IndexFlat(5) + collection = store.get_collection( + collection_name="test", data_model_type=dict, data_model_definition=data_model_def + ) + await collection.create_collection(index=index) + assert collection.inner_storage == {} + assert collection.indexes + assert collection.indexes["vector"] is not None + assert collection.indexes["vector"] == index + assert collection.indexes["vector"].is_trained is True + await collection.delete_collection() + + +async def test_create_collection_custom_untrained(store, data_model_def): + index = faiss.IndexIVFFlat(faiss.IndexFlat(5), 5, 10) + collection = store.get_collection( + collection_name="test", data_model_type=dict, data_model_definition=data_model_def + ) + with raises(VectorStoreInitializationException): + await collection.create_collection(index=index) + del index + + +async def test_create_collection_custom_dict(store, data_model_def): + index = faiss.IndexFlat(5) + collection = store.get_collection( + collection_name="test", data_model_type=dict, data_model_definition=data_model_def + ) + await collection.create_collection(indexes={"vector": index}) + assert collection.inner_storage == {} + assert collection.indexes + assert collection.indexes["vector"] is not None + assert collection.indexes["vector"] == index + await collection.delete_collection() + + +async def test_upsert(faiss_collection): + await faiss_collection.create_collection() + record = {"id": "testid", "content": "test content", "vector": [0.1, 0.2, 0.3, 0.4, 0.5]} + key = await faiss_collection.upsert(record) + assert key == "testid" + assert faiss_collection.inner_storage == {"testid": record} + await faiss_collection.delete_collection() + + +async def test_get(faiss_collection): + await faiss_collection.create_collection() + record = {"id": "testid", "content": "test content", "vector": [0.1, 0.2, 0.3, 0.4, 0.5]} + await faiss_collection.upsert(record) + result = await faiss_collection.get("testid") + assert result["id"] == record["id"] + assert result["content"] == record["content"] + await faiss_collection.delete_collection() + + +async def test_get_missing(faiss_collection): + await faiss_collection.create_collection() + result = await faiss_collection.get("testid") + assert result is None + await faiss_collection.delete_collection() + + +async def test_delete(faiss_collection): + await faiss_collection.create_collection() + record = {"id": "testid", "content": "test content", "vector": [0.1, 0.2, 0.3, 0.4, 0.5]} + await faiss_collection.upsert(record) + await faiss_collection.delete("testid") + assert faiss_collection.inner_storage == {} + await faiss_collection.delete_collection() + + +async def test_does_collection_exist(faiss_collection): + assert await faiss_collection.does_collection_exist() is False + await faiss_collection.create_collection() + assert await faiss_collection.does_collection_exist() is True + await faiss_collection.delete_collection() + + +async def test_delete_collection(faiss_collection): + await faiss_collection.create_collection() + record = {"id": "testid", "content": "test content", "vector": [0.1, 0.2, 0.3, 0.4, 0.5]} + await faiss_collection.upsert(record) + assert faiss_collection.inner_storage == {"testid": record} + await faiss_collection.delete_collection() + assert faiss_collection.inner_storage == {} + + +@mark.parametrize("dist", [DistanceFunction.EUCLIDEAN_SQUARED_DISTANCE, DistanceFunction.DOT_PROD]) +async def test_create_collection_and_search(faiss_collection, dist): + for field in faiss_collection.data_model_definition.fields: + if field.name == "vector": + field.distance_function = dist + await faiss_collection.create_collection() + record1 = {"id": "testid1", "content": "test content", "vector": [1.0, 1.0, 1.0, 1.0, 1.0]} + record2 = {"id": "testid2", "content": "test content", "vector": [-1.0, -1.0, -1.0, -1.0, -1.0]} + await faiss_collection.upsert([record1, record2]) + results = await faiss_collection.search( + vector=[0.9, 0.9, 0.9, 0.9, 0.9], + vector_property_name="vector", + include_total_count=True, + include_vectors=True, + ) + assert results.total_count == 2 + idx = 0 + async for res in results.results: + assert res.record == record1 if idx == 0 else record2 + idx += 1 + await faiss_collection.delete_collection() diff --git a/python/tests/unit/connectors/memory/test_in_memory.py b/python/tests/unit/connectors/memory/test_in_memory.py new file mode 100644 index 000000000000..373256fade20 --- /dev/null +++ b/python/tests/unit/connectors/memory/test_in_memory.py @@ -0,0 +1,176 @@ +# Copyright (c) Microsoft. All rights reserved. + +from pytest import fixture, mark, raises + +from semantic_kernel.connectors.memory.in_memory import InMemoryCollection, InMemoryStore +from semantic_kernel.data.const import DistanceFunction +from semantic_kernel.exceptions.vector_store_exceptions import VectorStoreOperationException + + +@fixture +def collection(data_model_definition): + return InMemoryCollection(collection_name="test", data_model_type=dict, data_model_definition=data_model_definition) + + +def test_store_init(): + store = InMemoryStore() + assert store is not None + + +def test_store_get_collection(data_model_definition): + store = InMemoryStore() + collection = store.get_collection( + collection_name="test", data_model_type=dict, data_model_definition=data_model_definition + ) + assert collection.collection_name == "test" + assert collection.data_model_type is dict + assert collection.data_model_definition == data_model_definition + + +async def test_upsert(collection): + record = {"id": "testid", "content": "test content", "vector": [0.1, 0.2, 0.3, 0.4, 0.5]} + key = await collection.upsert(record) + assert key == "testid" + assert collection.inner_storage == {"testid": record} + + +async def test_get(collection): + record = {"id": "testid", "content": "test content", "vector": [0.1, 0.2, 0.3, 0.4, 0.5]} + await collection.upsert(record) + result = await collection.get("testid") + assert result["id"] == record["id"] + assert result["content"] == record["content"] + + +async def test_get_missing(collection): + result = await collection.get("testid") + assert result is None + + +async def test_delete(collection): + record = {"id": "testid", "content": "test content", "vector": [0.1, 0.2, 0.3, 0.4, 0.5]} + await collection.upsert(record) + await collection.delete("testid") + assert collection.inner_storage == {} + + +async def test_does_collection_exist(collection): + assert await collection.does_collection_exist() is True + + +async def test_delete_collection(collection): + record = {"id": "testid", "content": "test content", "vector": [0.1, 0.2, 0.3, 0.4, 0.5]} + await collection.upsert(record) + assert collection.inner_storage == {"testid": record} + await collection.delete_collection() + assert collection.inner_storage == {} + + +async def test_create_collection(collection): + await collection.create_collection() + + +@mark.parametrize( + "distance_function", + [ + DistanceFunction.COSINE_DISTANCE, + DistanceFunction.COSINE_SIMILARITY, + DistanceFunction.EUCLIDEAN_DISTANCE, + DistanceFunction.MANHATTAN, + DistanceFunction.EUCLIDEAN_SQUARED_DISTANCE, + DistanceFunction.DOT_PROD, + DistanceFunction.HAMMING, + ], +) +async def test_vectorized_search_similar(collection, distance_function): + for field in collection.data_model_definition.fields: + if field.name == "vector": + field.distance_function = distance_function + record1 = {"id": "testid1", "content": "test content", "vector": [1.0, 1.0, 1.0, 1.0, 1.0]} + record2 = {"id": "testid2", "content": "test content", "vector": [-1.0, -1.0, -1.0, -1.0, -1.0]} + await collection.upsert([record1, record2]) + results = await collection.search( + vector=[0.9, 0.9, 0.9, 0.9, 0.9], + vector_property_name="vector", + include_total_count=True, + include_vectors=True, + ) + assert results.total_count == 2 + idx = 0 + async for res in results.results: + assert res.record == record1 if idx == 0 else record2 + idx += 1 + + +async def test_valid_lambda_filter(collection): + record1 = {"id": "1", "vector": [1, 2, 3, 4, 5]} + record2 = {"id": "2", "vector": [5, 4, 3, 2, 1]} + await collection.upsert([record1, record2]) + # Filter to select only record with id == '1' + results = collection._get_filtered_records(type("opt", (), {"filter": "lambda x: x.id == '1'"})()) + assert len(results) == 1 + assert "1" in results + + +async def test_valid_lambda_filter_attribute_access(collection): + record1 = {"id": "1", "vector": [1, 2, 3, 4, 5]} + record2 = {"id": "2", "vector": [5, 4, 3, 2, 1]} + await collection.upsert([record1, record2]) + # Filter to select only record with id == '2' using attribute access + results = collection._get_filtered_records(type("opt", (), {"filter": "lambda x: x['id'] == '2'"})()) + assert len(results) == 1 + assert "2" in results + + +async def test_invalid_filter_not_lambda(collection): + with raises(VectorStoreOperationException, match="must be a lambda expression"): + collection._get_filtered_records(type("opt", (), {"filter": "x.id == '1'"})()) + + +async def test_invalid_filter_syntax(collection): + with raises(VectorStoreOperationException, match="not valid Python"): + collection._get_filtered_records(type("opt", (), {"filter": "lambda x: x.id == '1' and"})()) + + +async def test_malicious_filter_import(collection): + # Should not allow import statement + with raises(VectorStoreOperationException): + collection._get_filtered_records( + type("opt", (), {"filter": "lambda x: __import__('os').system('echo malicious')"})() + ) + + +async def test_malicious_filter_exec(collection): + # Should not allow exec or similar + with raises(VectorStoreOperationException): + collection._get_filtered_records(type("opt", (), {"filter": "lambda x: exec('print(1)')"})()) + + +async def test_malicious_filter_builtins(collection): + # Should not allow access to builtins + with raises(VectorStoreOperationException): + collection._get_filtered_records( + type("opt", (), {"filter": "lambda x: __builtins__.__import__('os').system('echo malicious')"})() + ) + + +async def test_malicious_filter_open(collection): + # Should not allow open() + with raises(VectorStoreOperationException): + collection._get_filtered_records(type("opt", (), {"filter": "lambda x: open('somefile.txt', 'w')"})()) + + +async def test_malicious_filter_eval(collection): + # Should not allow eval() + with raises(VectorStoreOperationException): + collection._get_filtered_records(type("opt", (), {"filter": "lambda x: eval('2+2')"})()) + + +async def test_multiple_filters(collection): + record1 = {"id": "1", "vector": [1, 2, 3, 4, 5]} + record2 = {"id": "2", "vector": [5, 4, 3, 2, 1]} + await collection.upsert([record1, record2]) + filters = ["lambda x: x.id == '1'", "lambda x: x.vector[0] == 1"] + results = collection._get_filtered_records(type("opt", (), {"filter": filters})()) + assert len(results) == 1 + assert "1" in results