Skip to content

Commit c37875c

Browse files
adiaybguomerdor001
authored andcommitted
Adding locks to ARC and WTINYLFU caches and adding concurrency unit tests
Co-authored-by: omerdor001 <omerdo@post.bgu.ac.il> Co-authored-by: adiaybgu <adiay@post.bgu.ac.il>
1 parent 512c5b1 commit c37875c

File tree

9 files changed

+772
-307
lines changed

9 files changed

+772
-307
lines changed

modelcache/cache.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
#==================== Cache class definition =========================#
2828
#=====================================================================#
2929

30-
executor = ThreadPoolExecutor(max_workers=6)
30+
executor = ThreadPoolExecutor(max_workers=2)
3131

3232
def response_text(cache_resp):
3333
return cache_resp['data']
@@ -300,8 +300,8 @@ async def init(
300300
config=vector_config,
301301
metric_type=similarity_metric_type,
302302
),
303-
eviction='WTINYLFU',
304-
max_size=100000,
303+
eviction='ARC',
304+
max_size=10000,
305305
normalize=normalize,
306306
)
307307

modelcache/manager/data_manager.py

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,7 @@ def __init__(
182182
self.eviction_base = MemoryCacheEviction(
183183
policy=policy,
184184
maxsize=max_size,
185-
clean_size=clean_size,
186-
on_evict=self._evict_ids)
185+
clean_size=clean_size)
187186

188187
def save(self, questions: List[any], answers: List[any], embedding_datas: List[any], **kwargs):
189188
model = kwargs.pop("model", None)
@@ -314,31 +313,6 @@ def truncate(self, model):
314313
'ScalarDB': 'truncate scalar data failed, please check! e: {}'.format(e)}
315314
return {'status': 'success', 'VectorDB': 'rebuild', 'ScalarDB': 'delete_count: ' + str(delete_count)}
316315

317-
# added
318-
def _evict_ids(self, ids, **kwargs):
319-
model = kwargs.get("model")
320-
if not ids or any(i is None for i in ids):
321-
modelcache_log.warning("Skipping eviction for invalid IDs: %s", ids)
322-
return
323-
324-
if isinstance(ids,str):
325-
ids = [ids]
326-
327-
for _id in ids:
328-
self.eviction_base.get_cache(model).pop(_id, None)
329-
330-
try:
331-
self.s.mark_deleted(ids)
332-
modelcache_log.info("Evicted from scalar storage: %s", ids)
333-
except Exception as e:
334-
modelcache_log.error("Failed to delete from scalar storage: %s", str(e))
335-
336-
try:
337-
self.v.delete(ids, model=model)
338-
modelcache_log.info("Evicted from vector storage (model=%s): %s", model, ids)
339-
except Exception as e:
340-
modelcache_log.error("Failed to delete from vector storage (model=%s): %s", model, str(e))
341-
342316
def flush(self):
343317
self.s.flush()
344318
self.v.flush()

modelcache/manager/eviction/arc_cache.py

Lines changed: 59 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,18 @@
11
from cachetools import Cache
22
from collections import OrderedDict
3+
from readerwriterlock import rwlock
34

4-
class ARC(Cache):
5-
"""
6-
Adaptive Replacement Cache (ARC) implementation with on_evict callback.
7-
Balances recency and frequency via two active lists (T1, T2) and two ghost lists (B1, B2).
8-
Calls on_evict([key]) whenever an item is evicted from the active cache.
9-
"""
5+
_sentinel = object()
106

11-
def __init__(self, maxsize, getsizeof=None, on_evict=None):
12-
"""
13-
Args:
14-
maxsize (int): Maximum cache size.
15-
getsizeof (callable, optional): Sizing function for items.
16-
on_evict (callable, optional): Callback called as on_evict([key]) when a key is evicted.
17-
"""
7+
class ARC(Cache):
8+
def __init__(self, maxsize, getsizeof=None):
189
super().__init__(maxsize, getsizeof)
1910
self.t1 = OrderedDict()
2011
self.t2 = OrderedDict()
2112
self.b1 = OrderedDict()
2213
self.b2 = OrderedDict()
23-
self.p = 0 # Adaptive target for T1 size.
24-
self.on_evict = on_evict
14+
self.p = 0
15+
self._rw_lock = rwlock.RWLockWrite()
2516

2617
def __len__(self):
2718
return len(self.t1) + len(self.t2)
@@ -30,96 +21,80 @@ def __contains__(self, key):
3021
return key in self.t1 or key in self.t2
3122

3223
def _evict_internal(self):
33-
"""
34-
Evicts items from T1 or T2 if cache is over capacity, and prunes ghost lists.
35-
Calls on_evict for each evicted key.
36-
"""
37-
# Evict from T1 or T2 if active cache > maxsize
3824
while len(self.t1) + len(self.t2) > self.maxsize:
3925
if len(self.t1) > self.p or (len(self.t1) == 0 and len(self.t2) > 0):
4026
key, value = self.t1.popitem(last=False)
4127
self.b1[key] = value
42-
if self.on_evict:
43-
self.on_evict([key])
4428
else:
4529
key, value = self.t2.popitem(last=False)
4630
self.b2[key] = value
47-
if self.on_evict:
48-
self.on_evict([key])
49-
# Prune ghost lists to their max lengths
5031
while len(self.b1) > (self.maxsize - self.p):
5132
self.b1.popitem(last=False)
5233
while len(self.b2) > self.p:
5334
self.b2.popitem(last=False)
5435

5536
def __setitem__(self, key, value):
56-
# Remove from all lists before re-inserting
57-
for l in (self.t1, self.t2, self.b1, self.b2):
58-
l.pop(key, None)
59-
self.t1[key] = value
60-
self.t1.move_to_end(key)
61-
self._evict_internal()
37+
with self._rw_lock.gen_wlock():
38+
for l in (self.t1, self.t2, self.b1, self.b2):
39+
l.pop(key, None)
40+
self.t1[key] = value
41+
self.t1.move_to_end(key)
42+
self._evict_internal()
6243

6344
def __getitem__(self, key):
64-
# Case 1: Hit in T1 → promote to T2
65-
if key in self.t1:
66-
value = self.t1.pop(key)
67-
self.t2[key] = value
68-
self.t2.move_to_end(key)
69-
self.p = max(0, self.p - 1)
70-
self._evict_internal()
71-
return value
72-
# Case 2: Hit in T2 → refresh in T2
73-
if key in self.t2:
74-
value = self.t2.pop(key)
75-
self.t2[key] = value
76-
self.t2.move_to_end(key)
77-
self.p = min(self.maxsize, self.p + 1)
78-
self._evict_internal()
79-
return value
80-
# Case 3: Hit in B1 (ghost) → fetch and promote to T2
81-
if key in self.b1:
82-
self.b1.pop(key)
83-
self.p = min(self.maxsize, self.p + 1)
84-
self._evict_internal()
85-
value = super().__missing__(key)
86-
self.t2[key] = value
87-
self.t2.move_to_end(key)
88-
return value
89-
# Case 4: Hit in B2 (ghost) → fetch and promote to T2
90-
if key in self.b2:
91-
self.b2.pop(key)
92-
self.p = max(0, self.p - 1)
93-
self._evict_internal()
94-
value = super().__missing__(key)
95-
self.t2[key] = value
96-
self.t2.move_to_end(key)
97-
return value
98-
# Case 5: Cold miss → handled by Cache base class (calls __setitem__ after __missing__)
99-
return super().__getitem__(key)
45+
with self._rw_lock.gen_wlock():
46+
if key in self.t1:
47+
value = self.t1.pop(key)
48+
self.t2[key] = value
49+
self.t2.move_to_end(key)
50+
self.p = max(0, self.p - 1)
51+
self._evict_internal()
52+
return value
53+
if key in self.t2:
54+
value = self.t2.pop(key)
55+
self.t2[key] = value
56+
self.t2.move_to_end(key)
57+
self.p = min(self.maxsize, self.p + 1)
58+
self._evict_internal()
59+
return value
60+
if key in self.b1:
61+
self.b1.pop(key)
62+
self.p = min(self.maxsize, self.p + 1)
63+
self._evict_internal()
64+
value = super().__missing__(key)
65+
self.t2[key] = value
66+
self.t2.move_to_end(key)
67+
return value
68+
if key in self.b2:
69+
self.b2.pop(key)
70+
self.p = max(0, self.p - 1)
71+
self._evict_internal()
72+
value = super().__missing__(key)
73+
self.t2[key] = value
74+
self.t2.move_to_end(key)
75+
return value
76+
return super().__getitem__(key)
10077

10178
def __missing__(self, key):
102-
"""
103-
Override this in a subclass, or rely on direct assignment (cache[key] = value).
104-
"""
10579
raise KeyError(key)
10680

107-
def pop(self, key, default=None):
108-
"""
109-
Remove key from all lists.
110-
"""
111-
for l in (self.t1, self.t2, self.b1, self.b2):
112-
if key in l:
113-
return l.pop(key)
114-
return default
81+
def pop(self, key, default=_sentinel):
82+
with self._rw_lock.gen_wlock():
83+
for l in (self.t1, self.t2, self.b1, self.b2):
84+
if key in l:
85+
return l.pop(key)
86+
if default is _sentinel:
87+
raise KeyError(key)
88+
return default
11589

11690
def clear(self):
117-
self.t1.clear()
118-
self.t2.clear()
119-
self.b1.clear()
120-
self.b2.clear()
121-
self.p = 0
122-
super().clear()
91+
with self._rw_lock.gen_wlock():
92+
self.t1.clear()
93+
self.t2.clear()
94+
self.b1.clear()
95+
self.b2.clear()
96+
self.p = 0
97+
super().clear()
12398

12499
def __iter__(self):
125100
yield from self.t1

modelcache/manager/eviction/memory_cache.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,14 @@ def wrapper(*args, **kwargs):
1919

2020

2121
class MemoryCacheEviction(EvictionBase):
22-
def __init__(self, policy: str, maxsize: int, clean_size: int, on_evict: Callable[[List[Any]], None], **kwargs):
22+
def __init__(self, policy: str, maxsize: int, clean_size: int, **kwargs):
2323
self._policy = policy.upper()
2424
self.model_to_cache = dict()
2525
self.maxsize = maxsize
2626
self.clean_size = clean_size
27-
self.on_evict = on_evict
2827
self.kwargs = kwargs
2928

3029
def create_cache(self, model: str):
31-
32-
################# Not integrated with on_evict yet #######################
3330
if self._policy == "LRU":
3431
cache = cachetools.LRUCache(maxsize=self.maxsize, **self.kwargs)
3532
elif self._policy == "LFU":
@@ -38,18 +35,14 @@ def create_cache(self, model: str):
3835
cache = cachetools.FIFOCache(maxsize=self.maxsize, **self.kwargs)
3936
elif self._policy == "RR":
4037
cache = cachetools.RRCache(maxsize=self.maxsize, **self.kwargs)
41-
###########################################################################
42-
4338
elif self._policy == "WTINYLFU":
44-
cache = W2TinyLFU(maxsize=self.maxsize, on_evict=lambda x: self.on_evict(x,model=model))
39+
cache = W2TinyLFU(maxsize=self.maxsize)
4540
elif self._policy == "ARC":
46-
cache = ARC(maxsize=self.maxsize, on_evict=lambda x: self.on_evict(x,model=model))
41+
cache = ARC(maxsize=self.maxsize)
4742
else:
4843
raise ValueError(f"Unknown policy {self.policy}")
49-
cache.popitem = popitem_wrapper(cache.popitem, self.on_evict, self.clean_size)
5044
return cache
5145

52-
5346
def put(self, objs: List[Tuple[Any, Any]], model: str):
5447
cache = self.get_cache(model)
5548
for key, value in objs:

0 commit comments

Comments
 (0)