Skip to content

Commit f1204eb

Browse files
committed
adjust herd and shard with mset, mget
1 parent 75ca1f1 commit f1204eb

File tree

2 files changed

+39
-5
lines changed

2 files changed

+39
-5
lines changed

django_valkey/client/herd.py

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
import random
22
import socket
33
import time
4-
from collections import OrderedDict
5-
from typing import Tuple, Any
4+
from typing import Tuple, Any, Iterable
65

76
from django.conf import settings
87
from valkey import Valkey
98
from valkey.exceptions import ConnectionError, ResponseError, TimeoutError
109
from valkey.typing import KeyT, EncodableT
1110

12-
from django_valkey.base_client import DEFAULT_TIMEOUT
11+
from django_valkey.base_client import DEFAULT_TIMEOUT, Backend
1312
from django_valkey.client.default import DefaultClient
1413
from django_valkey.exceptions import ConnectionInterrupted
1514

@@ -113,13 +112,16 @@ def get_many(self, keys, version=None, client=None):
113112
if not keys:
114113
return {}
115114

116-
recovered_data = OrderedDict()
115+
recovered_data = {}
117116

118117
new_keys = [self.make_key(key, version=version) for key in keys]
119118
map_keys = dict(zip(new_keys, keys))
120119

121120
try:
122-
results = client.mget(*new_keys)
121+
pipeline = client.pipeline()
122+
for key in new_keys:
123+
pipeline.get(key)
124+
results = pipeline.execute()
123125
except _main_exceptions as e:
124126
raise ConnectionInterrupted(connection=client) from e
125127

@@ -132,6 +134,32 @@ def get_many(self, keys, version=None, client=None):
132134

133135
return recovered_data
134136

137+
def mget(
138+
self,
139+
keys: Iterable[KeyT],
140+
version: int | None = None,
141+
client: Backend | Any | None = None,
142+
) -> dict:
143+
client = self._get_client(write=False, client=client)
144+
if not keys:
145+
return {}
146+
147+
recovered_data = {}
148+
149+
new_keys = [self.make_key(key, version=version) for key in keys]
150+
151+
try:
152+
results = client.mget(new_keys)
153+
except _main_exceptions as e:
154+
raise ConnectionInterrupted(connection=client) from e
155+
156+
for key, value in zip(keys, results):
157+
if value is None:
158+
continue
159+
val, refresh = self._unpack(self.decode(value))
160+
recovered_data[key] = None if refresh else val
161+
return recovered_data
162+
135163
def set_many(
136164
self, data, timeout=DEFAULT_TIMEOUT, version=None, client=None, herd=True
137165
):

django_valkey/client/sharded.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@ def get_many(
9898
recovered_data[map_keys[key]] = value
9999
return recovered_data
100100

101+
def mget(self, *args, **kwargs):
102+
raise NotImplementedError
103+
101104
def set(
102105
self,
103106
key: KeyT,
@@ -142,6 +145,9 @@ def set_many(
142145
for key, value in data.items():
143146
self.set(key, value, timeout, version=version, client=client)
144147

148+
def mset(self, *args, **kwargs):
149+
raise NotImplementedError
150+
145151
def has_key(
146152
self, key: KeyT, version: int | None = None, client: Valkey | Any | None = None
147153
) -> bool:

0 commit comments

Comments
 (0)