From c57bad2674343a304117fc715333d25b5f0cbbbd Mon Sep 17 00:00:00 2001 From: Paulo Sousa Date: Wed, 23 Jul 2025 12:32:11 +0100 Subject: [PATCH] perf: fix multiprocessing timing measurement - Move vector-to-bytes conversion outside timing measurements - Track actual worker start times for accurate parallel timing - Refactor worker function for compatibility with newer Python versions --- engine/base_client/search.py | 58 +++++++++++++++-------------- engine/clients/vectorsets/search.py | 3 +- 2 files changed, 31 insertions(+), 30 deletions(-) diff --git a/engine/base_client/search.py b/engine/base_client/search.py index c50b3391..f63047b1 100644 --- a/engine/base_client/search.py +++ b/engine/base_client/search.py @@ -75,8 +75,12 @@ def search_all( search_one = functools.partial(self.__class__._search_one, top=top) # Convert queries to a list for potential reuse - queries_list = list(queries) - + # Also, converts query vectors to bytes beforehand, preparing them for sending to client without affecting search time measurements + queries_list = [] + for query in queries: + query.vector = np.array(query.vector).astype(np.float32).tobytes() + queries_list.append(query) + # Handle MAX_QUERIES environment variable if MAX_QUERIES > 0: queries_list = queries_list[:MAX_QUERIES] @@ -114,12 +118,12 @@ def cycling_query_generator(queries, total_count): total_query_count = len(used_queries) if parallel == 1: - # Single-threaded execution - start = time.perf_counter() - # Create a progress bar with the correct total pbar = tqdm.tqdm(total=total_query_count, desc="Processing queries", unit="queries") + # Single-threaded execution + start = time.perf_counter() + # Process queries with progress updates results = [] for query in used_queries: @@ -148,42 +152,32 @@ def cycling_query_generator(queries, total_count): # For lists, we can use the chunked_iterable function query_chunks = list(chunked_iterable(used_queries, chunk_size)) - # Function to be executed by each worker process - def worker_function(chunk, result_queue): - self.__class__.init_client( - self.host, - distance, - self.connection_params, - self.search_params, - ) - self.setup_search() - results = process_chunk(chunk, search_one) - result_queue.put(results) - # Create a queue to collect results result_queue = Queue() # Create worker processes processes = [] for chunk in query_chunks: - process = Process(target=worker_function, args=(chunk, result_queue)) + process = Process(target=worker_function, args=(self, distance, search_one, chunk, result_queue)) processes.append(process) - # Start measuring time for the critical work - start = time.perf_counter() - # Start worker processes for process in processes: process.start() # Collect results from all worker processes results = [] + min_start_time = time.perf_counter() for _ in processes: - chunk_results = result_queue.get() + proc_start_time, chunk_results = result_queue.get() results.extend(chunk_results) + + # Update min_start_time if necessary + if proc_start_time < min_start_time: + min_start_time = proc_start_time # Stop measuring time for the critical work - total_time = time.perf_counter() - start + total_time = time.perf_counter() - min_start_time # Wait for all worker processes to finish for process in processes: @@ -226,13 +220,21 @@ def chunked_iterable(iterable, size): while chunk := list(islice(it, size)): yield chunk +# Function to be executed by each worker process +def worker_function(self, distance, search_one, chunk, result_queue): + self.init_client( + self.host, + distance, + self.connection_params, + self.search_params, + ) + self.setup_search() + + start_time = time.perf_counter() + results = process_chunk(chunk, search_one) + result_queue.put((start_time, results)) def process_chunk(chunk, search_one): """Process a chunk of queries using the search_one function.""" # No progress bar in worker processes to avoid cluttering the output return [search_one(query) for query in chunk] - - -def process_chunk_wrapper(chunk, search_one): - """Wrapper to process a chunk of queries.""" - return process_chunk(chunk, search_one) diff --git a/engine/clients/vectorsets/search.py b/engine/clients/vectorsets/search.py index 836a128d..6f45e1fc 100644 --- a/engine/clients/vectorsets/search.py +++ b/engine/clients/vectorsets/search.py @@ -1,7 +1,6 @@ import random from typing import List, Tuple -import numpy as np from redis import Redis, RedisCluster @@ -42,7 +41,7 @@ def init_client(cls, host, distance, connection_params: dict, search_params: dic @classmethod def search_one(cls, vector, meta_conditions, top) -> List[Tuple[int, float]]: ef = cls.search_params["search_params"]["ef"] - response = cls.client.execute_command("VSIM", "idx", "FP32", np.array(vector).astype(np.float32).tobytes(), "WITHSCORES", "COUNT", top, "EF", ef) + response = cls.client.execute_command("VSIM", "idx", "FP32", vector, "WITHSCORES", "COUNT", top, "EF", ef) # decode responses # every even cell is id, every odd is the score # scores needs to be 1 - scores since on vector sets 1 is identical, 0 is opposite vector