Skip to content

Commit c0bcd88

Browse files
committed
Handle valid_from=None in configuration version selection
1 parent 7a49f76 commit c0bcd88

File tree

2 files changed

+1159
-13
lines changed

2 files changed

+1159
-13
lines changed

quixstreams/dataframe/joins/lookups/quix_configuration_service/models.py

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class EventMetadata(TypedDict):
3030

3131
type: str
3232
target_key: str
33-
valid_from: str
33+
valid_from: Optional[str]
3434
category: str
3535
version: int
3636
created_at: str
@@ -132,7 +132,7 @@ class ConfigurationVersion:
132132
version: int
133133
contentUrl: str
134134
sha256sum: str
135-
valid_from: float # timestamp ms
135+
valid_from: Optional[float] # timestamp ms
136136
retry_count: int = dataclasses.field(default=0, hash=False, init=False)
137137
retry_at: int = dataclasses.field(default=sys.maxsize, hash=False, init=False)
138138

@@ -145,15 +145,25 @@ def from_event(cls, event: Event) -> "ConfigurationVersion":
145145
146146
:returns: ConfigurationVersion: The created configuration version.
147147
"""
148+
149+
raw_valid_from = event["metadata"]["valid_from"]
150+
if raw_valid_from is None:
151+
valid_from: Optional[float] = None
152+
else:
153+
# TODO python 3.11: Use `datetime.fromisoformat` when additional format are available
154+
try:
155+
parsed = datetime.strptime(raw_valid_from, "%Y-%m-%dT%H:%M:%S.%f%z")
156+
except ValueError:
157+
parsed = datetime.strptime(raw_valid_from, "%Y-%m-%dT%H:%M:%S%z")
158+
159+
valid_from = parsed.timestamp() * 1000
160+
148161
return cls(
149162
id=event["id"],
150163
version=event["metadata"]["version"],
151164
contentUrl=event["contentUrl"],
152165
sha256sum=event["metadata"]["sha256sum"],
153-
valid_from=datetime.fromisoformat(
154-
event["metadata"]["valid_from"]
155-
).timestamp()
156-
* 1000,
166+
valid_from=valid_from,
157167
)
158168

159169
def success(self) -> None:
@@ -235,15 +245,25 @@ def find_valid_version(self, timestamp: int) -> Optional[ConfigurationVersion]:
235245
self._find_versions(timestamp)
236246
)
237247
return self.version
238-
if self.next_version and self.next_version.valid_from <= timestamp:
248+
if (
249+
self.next_version
250+
and self.next_version.valid_from
251+
and self.next_version.valid_from <= timestamp
252+
):
239253
self.previous_version, self.version, self.next_version = (
240254
self._find_versions(timestamp)
241255
)
242256
return self.version
243-
if self.version and self.version.valid_from <= timestamp:
244-
return self.version
245-
if self.previous_version and self.previous_version.valid_from <= timestamp:
246-
return self.previous_version
257+
if self.version:
258+
if self.version.valid_from is None:
259+
return self.version
260+
if self.version.valid_from <= timestamp:
261+
return self.version
262+
if self.previous_version:
263+
if self.previous_version.valid_from is None:
264+
return self.previous_version
265+
if self.previous_version.valid_from <= timestamp:
266+
return self.previous_version
247267

248268
self.previous_version, self.version, self.next_version = self._find_versions(
249269
timestamp
@@ -276,10 +296,19 @@ def _find_versions(
276296
for _, version in sorted(
277297
self.versions.items(), reverse=True, key=lambda x: x[0]
278298
):
279-
if version.valid_from > timestamp:
299+
if version.valid_from is None:
300+
if current_version is None:
301+
current_version = version
302+
elif previous_version is None:
303+
previous_version = version
304+
return previous_version, current_version, next_version
305+
elif version.valid_from > timestamp:
280306
if next_version is None:
281307
next_version = version
282-
elif version.valid_from < next_version.valid_from:
308+
elif (
309+
next_version.valid_from is not None
310+
and version.valid_from < next_version.valid_from
311+
):
283312
next_version = version
284313
elif current_version is None:
285314
current_version = version

0 commit comments

Comments
 (0)