Skip to content

Commit 8e87354

Browse files
WIP on pgvector improvements
1 parent 9ad5b1a commit 8e87354

File tree

4 files changed

+129
-6
lines changed

4 files changed

+129
-6
lines changed

engine/clients/pgvector/configure.py

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,80 @@ def recreate(self, dataset: Dataset, collection_params):
3939
self.conn.execute("ALTER TABLE items ALTER COLUMN embedding SET STORAGE PLAIN")
4040

4141
try:
42-
hnsw_distance_type = self.DISTANCE_MAPPING[dataset.config.distance]
42+
distance_type = self.DISTANCE_MAPPING[dataset.config.distance]
4343
except KeyError:
4444
raise IncompatibilityError(
4545
f"Unsupported distance metric: {dataset.config.distance}"
4646
)
4747

48-
self.conn.execute(
49-
f"CREATE INDEX on items USING hnsw(embedding {hnsw_distance_type}) WITH (m = {collection_params['hnsw_config']['m']}, ef_construction = {collection_params['hnsw_config']['ef_construct']})"
50-
)
48+
# Check if we should create HNSW index or use FLAT (no index for full scan)
49+
if "hnsw_config" in collection_params:
50+
# Auto-detect core count and set parallel workers for faster index builds (pgvector 0.7.0+)
51+
max_parallel_workers = collection_params['hnsw_config'].get('max_parallel_workers', 'auto')
52+
53+
if max_parallel_workers == 'auto':
54+
# Try to get actual CPU core count from PostgreSQL
55+
try:
56+
# Get max_worker_processes setting as baseline
57+
worker_result = self.conn.execute("SELECT setting FROM pg_settings WHERE name = 'max_worker_processes'").fetchone()
58+
available_workers = int(worker_result[0]) if worker_result else 8
59+
60+
# Try to get actual CPU cores if available (PostgreSQL 13+)
61+
try:
62+
cpu_cores_result = self.conn.execute("SELECT setting FROM pg_settings WHERE name = 'max_parallel_workers'").fetchone()
63+
if cpu_cores_result:
64+
available_workers = min(available_workers, int(cpu_cores_result[0]))
65+
except:
66+
pass # Fallback to max_worker_processes
67+
68+
# Use AWS recommendation: total cores - 2 (but at least 1)
69+
max_parallel_workers = max(1, available_workers - 2)
70+
print(f"Auto-detected {available_workers} worker processes, using {max_parallel_workers} parallel workers")
71+
72+
except Exception as e:
73+
print(f"Failed to auto-detect workers, using default of 4: {e}")
74+
max_parallel_workers = 8
75+
76+
if max_parallel_workers > 0:
77+
self.conn.execute(f"SET max_parallel_workers = {max_parallel_workers}")
78+
self.conn.execute(f"SET max_parallel_workers_per_gather = {max_parallel_workers}")
79+
self.conn.execute(f"SET max_parallel_maintenance_workers = {max_parallel_workers}")
80+
81+
# Create HNSW index with optimized parameters
82+
self.conn.execute(
83+
f"CREATE INDEX on items USING hnsw(embedding {distance_type}) WITH (m = {collection_params['hnsw_config']['m']}, ef_construction = {collection_params['hnsw_config']['ef_construct']})"
84+
)
85+
elif "flat_config" in collection_params:
86+
# For FLAT, configure parallel workers for faster query execution during full scans
87+
max_parallel_workers = collection_params['flat_config'].get('max_parallel_workers', 'auto')
88+
89+
if max_parallel_workers == 'auto':
90+
# Try to get actual CPU core count from PostgreSQL
91+
try:
92+
# Get max_worker_processes setting as baseline
93+
worker_result = self.conn.execute("SELECT setting FROM pg_settings WHERE name = 'max_worker_processes'").fetchone()
94+
available_workers = int(worker_result[0]) if worker_result else 8
95+
96+
# Try to get actual CPU cores if available (PostgreSQL 13+)
97+
try:
98+
cpu_cores_result = self.conn.execute("SELECT setting FROM pg_settings WHERE name = 'max_parallel_workers'").fetchone()
99+
if cpu_cores_result:
100+
available_workers = min(available_workers, int(cpu_cores_result[0]))
101+
except:
102+
pass # Fallback to max_worker_processes
103+
104+
# Use AWS recommendation: total cores - 2 (but at least 1)
105+
max_parallel_workers = max(1, available_workers - 2)
106+
print(f"Auto-detected {available_workers} worker processes, using {max_parallel_workers} parallel workers for FLAT queries")
107+
108+
except Exception as e:
109+
print(f"Failed to auto-detect workers for FLAT, using default of 8: {e}")
110+
max_parallel_workers = 8
111+
112+
if max_parallel_workers > 0:
113+
self.conn.execute(f"SET max_parallel_workers = {max_parallel_workers}")
114+
self.conn.execute(f"SET max_parallel_workers_per_gather = {max_parallel_workers}")
115+
# For FLAT, we don't create any index - PostgreSQL will do a full table scan with parallel workers
51116

52117
self.conn.close()
53118

engine/clients/pgvector/search.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,16 @@ def init_client(cls, host, distance, connection_params: dict, search_params: dic
2626
cls.distance = distance
2727
cls.search_params = search_params["search_params"]
2828

29+
# For FLAT searches, disable index usage to force full scan
30+
if "force_flat" in cls.search_params and cls.search_params["force_flat"]:
31+
cls.cur.execute("SET enable_indexscan = off")
32+
cls.cur.execute("SET enable_bitmapscan = off")
33+
2934
@classmethod
3035
def search_one(cls, vector, meta_conditions, top) -> List[Tuple[int, float]]:
31-
cls.cur.execute(f"SET hnsw.ef_search = {cls.search_params['hnsw_ef']}")
36+
# Set HNSW ef_search parameter only if using HNSW index
37+
if "hnsw_ef" in cls.search_params:
38+
cls.cur.execute(f"SET hnsw.ef_search = {cls.search_params['hnsw_ef']}")
3239

3340
if cls.distance == Distance.COSINE:
3441
query = f"SELECT id, embedding <=> %s AS _score FROM items ORDER BY _score LIMIT {top};"
@@ -46,5 +53,12 @@ def search_one(cls, vector, meta_conditions, top) -> List[Tuple[int, float]]:
4653
@classmethod
4754
def delete_client(cls):
4855
if cls.cur:
56+
# Reset index settings if they were disabled for FLAT searches
57+
if "force_flat" in cls.search_params and cls.search_params["force_flat"]:
58+
try:
59+
cls.cur.execute("SET enable_indexscan = on")
60+
cls.cur.execute("SET enable_bitmapscan = on")
61+
except:
62+
pass # Connection might be closed already
4963
cls.cur.close()
5064
cls.conn.close()

engine/clients/pgvector/upload.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from engine.clients.pgvector.config import get_db_config
99

1010

11-
class PgvectorUploader(BaseUploader):
11+
class PgVectorUploader(BaseUploader):
1212
conn = None
1313
cur = None
1414
upload_params = {}
@@ -20,6 +20,32 @@ def init_client(cls, host, distance, connection_params, upload_params):
2020
cls.cur = cls.conn.cursor()
2121
cls.upload_params = upload_params
2222

23+
# Auto-detect core count for parallel maintenance workers
24+
try:
25+
# Get max_worker_processes setting as baseline
26+
worker_result = cls.conn.execute("SELECT setting FROM pg_settings WHERE name = 'max_worker_processes'").fetchone()
27+
available_workers = int(worker_result[0]) if worker_result else 8
28+
29+
# Try to get actual CPU cores if available (PostgreSQL 13+)
30+
try:
31+
cpu_cores_result = cls.conn.execute("SELECT setting FROM pg_settings WHERE name = 'max_parallel_workers'").fetchone()
32+
if cpu_cores_result:
33+
available_workers = min(available_workers, int(cpu_cores_result[0]))
34+
except:
35+
pass # Fallback to max_worker_processes
36+
37+
# Use AWS recommendation: total cores - 2 (but at least 1, max 16 for maintenance)
38+
max_maintenance_workers = min(16, max(1, available_workers - 2))
39+
print(f"Auto-detected {available_workers} worker processes, using {max_maintenance_workers} parallel maintenance workers for uploads")
40+
41+
except Exception as e:
42+
print(f"Failed to auto-detect workers for uploads, using default of 8: {e}")
43+
max_maintenance_workers = 8
44+
45+
# Optimize memory settings for large uploads based on AWS recommendations
46+
cls.conn.execute("SET maintenance_work_mem = '2GB'")
47+
cls.conn.execute(f"SET max_parallel_maintenance_workers = {max_maintenance_workers}")
48+
2349
@classmethod
2450
def upload_batch(
2551
cls, ids: List[int], vectors: List[list], metadata: Optional[List[dict]]

experiments/configurations/pgvector-single-node.json

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,5 +102,23 @@
102102
{ "parallel": 100, "search_params": { "hnsw_ef": 64 } }, { "parallel": 100, "search_params": { "hnsw_ef": 128 } }, { "parallel": 100, "search_params": { "hnsw_ef": 256 } }, { "parallel": 100, "search_params": { "hnsw_ef": 512 } }
103103
],
104104
"upload_params": { "parallel": 16 }
105+
},
106+
{
107+
"name": "pgvector-flat",
108+
"engine": "pgvector",
109+
"connection_params": {},
110+
"collection_params": {
111+
"flat_config": {
112+
"create_index": false,
113+
"max_parallel_workers": "auto"
114+
}
115+
},
116+
"search_params": [
117+
{ "parallel": 1, "search_params": { "force_flat": true } },
118+
{ "parallel": 8, "search_params": { "force_flat": true } },
119+
{ "parallel": 16, "search_params": { "force_flat": true } },
120+
{ "parallel": 32, "search_params": { "force_flat": true } }
121+
],
122+
"upload_params": { "parallel": 16 }
105123
}
106124
]

0 commit comments

Comments
 (0)