Skip to content

Commit 5cc2844

Browse files
committed
Handle valid_from=None in configuration version selection
1 parent 7a49f76 commit 5cc2844

File tree

1 file changed

+31
-10
lines changed
  • quixstreams/dataframe/joins/lookups/quix_configuration_service

1 file changed

+31
-10
lines changed

quixstreams/dataframe/joins/lookups/quix_configuration_service/models.py

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class EventMetadata(TypedDict):
3030

3131
type: str
3232
target_key: str
33-
valid_from: str
33+
valid_from: Optional[str]
3434
category: str
3535
version: int
3636
created_at: str
@@ -132,7 +132,7 @@ class ConfigurationVersion:
132132
version: int
133133
contentUrl: str
134134
sha256sum: str
135-
valid_from: float # timestamp ms
135+
valid_from: Optional[float] # timestamp ms
136136
retry_count: int = dataclasses.field(default=0, hash=False, init=False)
137137
retry_at: int = dataclasses.field(default=sys.maxsize, hash=False, init=False)
138138

@@ -153,7 +153,9 @@ def from_event(cls, event: Event) -> "ConfigurationVersion":
153153
valid_from=datetime.fromisoformat(
154154
event["metadata"]["valid_from"]
155155
).timestamp()
156-
* 1000,
156+
* 1000
157+
if event["metadata"]["valid_from"]
158+
else None,
157159
)
158160

159161
def success(self) -> None:
@@ -235,15 +237,25 @@ def find_valid_version(self, timestamp: int) -> Optional[ConfigurationVersion]:
235237
self._find_versions(timestamp)
236238
)
237239
return self.version
238-
if self.next_version and self.next_version.valid_from <= timestamp:
240+
if (
241+
self.next_version
242+
and self.next_version.valid_from
243+
and self.next_version.valid_from <= timestamp
244+
):
239245
self.previous_version, self.version, self.next_version = (
240246
self._find_versions(timestamp)
241247
)
242248
return self.version
243-
if self.version and self.version.valid_from <= timestamp:
244-
return self.version
245-
if self.previous_version and self.previous_version.valid_from <= timestamp:
246-
return self.previous_version
249+
if self.version:
250+
if self.version.valid_from is None:
251+
return self.version
252+
if self.version.valid_from <= timestamp:
253+
return self.version
254+
if self.previous_version:
255+
if self.previous_version.valid_from is None:
256+
return self.previous_version
257+
if self.previous_version.valid_from <= timestamp:
258+
return self.previous_version
247259

248260
self.previous_version, self.version, self.next_version = self._find_versions(
249261
timestamp
@@ -276,10 +288,19 @@ def _find_versions(
276288
for _, version in sorted(
277289
self.versions.items(), reverse=True, key=lambda x: x[0]
278290
):
279-
if version.valid_from > timestamp:
291+
if version.valid_from is None:
292+
if current_version is None:
293+
current_version = version
294+
elif previous_version is None:
295+
previous_version = version
296+
return previous_version, current_version, next_version
297+
elif version.valid_from > timestamp:
280298
if next_version is None:
281299
next_version = version
282-
elif version.valid_from < next_version.valid_from:
300+
elif (
301+
next_version.valid_from is not None
302+
and version.valid_from < next_version.valid_from
303+
):
283304
next_version = version
284305
elif current_version is None:
285306
current_version = version

0 commit comments

Comments
 (0)