Skip to content

Commit 41dea63

Browse files
committed
Fixed callbacks' first iteration bug with scheduler refactor to use APScheduler
1 parent 26d230d commit 41dea63

File tree

7 files changed

+169
-200
lines changed

7 files changed

+169
-200
lines changed

dynatrace_extension/sdk/callback.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def __init__(
3333
self.interval: timedelta = interval
3434
self.logger = logger
3535
self.running: bool = False
36-
self.status = Status(StatusValue.OK)
36+
self.status = IgnoreStatus()
3737
self.executions_total = 0 # global counter
3838
self.executions_per_interval = 0 # counter per interval = 1 min by default
3939
self.duration = 0 # global counter
@@ -45,7 +45,6 @@ def __init__(
4545
self.ok_count = 0 # counter per interval = 1 min by default
4646
self.timeouts_count = 0 # counter per interval = 1 min by default
4747
self.exception_count = 0 # counter per interval = 1 min by default
48-
self.iterations = 0 # how many times we ran the callback iterator for this callback
4948

5049
def get_current_time_with_cluster_diff(self):
5150
return datetime.now() + timedelta(milliseconds=self.cluster_time_diff)
@@ -142,13 +141,3 @@ def clear_sfm_metrics(self):
142141
self.duration_interval_total = 0
143142
self.exception_count = 0
144143
self.executions_per_interval = 0
145-
146-
def get_next_execution_timestamp(self) -> float:
147-
"""
148-
Get the timestamp for the next execution of the callback
149-
This is done using execution total, the interval and the start timestamp
150-
:return: datetime
151-
"""
152-
return (
153-
self.start_timestamp + timedelta(seconds=self.interval.total_seconds() * (self.iterations or 1))
154-
).timestamp()

dynatrace_extension/sdk/extension.py

Lines changed: 55 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
import sys
99
import threading
1010
import time
11+
from apscheduler.schedulers.background import BackgroundScheduler
12+
from apscheduler.executors.pool import ThreadPoolExecutor
13+
from apscheduler.triggers.interval import IntervalTrigger
1114
from argparse import ArgumentParser
1215
from collections.abc import Callable
13-
from concurrent.futures import ThreadPoolExecutor
1416
from datetime import datetime, timedelta, timezone
1517
from enum import Enum
1618
from itertools import chain
@@ -172,6 +174,12 @@ def _add_sfm_metric(metric: Metric, sfm_metrics: list[Metric] | None = None):
172174
sfm_metrics.append(metric)
173175

174176

177+
class ExecutorType(str, Enum):
178+
CALLBACKS = 'callbacks'
179+
INTERNAL = 'internal'
180+
HEARTBEAT = 'heartbeat'
181+
182+
175183
class Extension:
176184
"""Base class for Python extensions.
177185
@@ -240,21 +248,12 @@ def __init__(self, name: str = "") -> None:
240248
self._running_callbacks: dict[int, WrappedCallback] = {}
241249
self._running_callbacks_lock: Lock = Lock()
242250

243-
self._scheduler = sched.scheduler(time.time, time.sleep)
244-
245-
# Timestamps for scheduling of internal callbacks
246-
self._next_internal_callbacks_timestamps: dict[str, datetime] = {
247-
"timediff": datetime.now() + TIME_DIFF_INTERVAL,
248-
"heartbeat": datetime.now() + HEARTBEAT_INTERVAL,
249-
"metrics": datetime.now() + METRIC_SENDING_INTERVAL,
250-
"events": datetime.now() + METRIC_SENDING_INTERVAL,
251-
"sfm_metrics": datetime.now() + SFM_METRIC_SENDING_INTERVAL,
252-
}
253-
254-
# Executors for the callbacks and internal methods
255-
self._callbacks_executor = ThreadPoolExecutor(max_workers=CALLBACKS_THREAD_POOL_SIZE)
256-
self._internal_executor = ThreadPoolExecutor(max_workers=INTERNAL_THREAD_POOL_SIZE)
257-
self._heartbeat_executor = ThreadPoolExecutor(max_workers=HEARTBEAT_THREAD_POOL_SIZE)
251+
# Scheduler and executors for the callbacks and internal methods
252+
self._scheduler = BackgroundScheduler(executors={
253+
ExecutorType.CALLBACKS: ThreadPoolExecutor(max_workers=CALLBACKS_THREAD_POOL_SIZE),
254+
ExecutorType.INTERNAL: ThreadPoolExecutor(max_workers=INTERNAL_THREAD_POOL_SIZE),
255+
ExecutorType.HEARTBEAT: ThreadPoolExecutor(max_workers=HEARTBEAT_THREAD_POOL_SIZE)
256+
})
258257

259258
# Extension metrics
260259
self._metrics_lock = RLock()
@@ -376,7 +375,12 @@ def _schedule_callback(self, callback: WrappedCallback):
376375
callback.cluster_time_diff = self._cluster_time_diff
377376
callback.running_in_sim = self._running_in_sim
378377
self._scheduled_callbacks.append(callback)
379-
self._scheduler.enter(callback.initial_wait_time(), 1, self._callback_iteration, (callback,))
378+
379+
self._scheduler.add_job(self._run_callback, args=[callback],
380+
executor=ExecutorType.CALLBACKS,
381+
trigger=IntervalTrigger(seconds=callback.interval.total_seconds()),
382+
next_run_time=datetime.now() + timedelta(seconds=callback.initial_wait_time())
383+
)
380384

381385
def schedule(
382386
self,
@@ -809,7 +813,10 @@ def _parse_args(self):
809813

810814
if not self._is_fastcheck:
811815
try:
812-
self._heartbeat_iteration()
816+
# TODO: is it surely okay to schedule hearbeat this way? Originally it was scheduled in the very same scheduler, which would starve heartbeat if any callback took too long
817+
# On the other hand, those callbacks inserted specific potentially risky jobs to different executors, so it should be okay?
818+
# Why did heartbeat have a different priority (higher or lower?)
819+
self._scheduler.add_job(self._heartbeat, executor=ExecutorType.HEARTBEAT, trigger=IntervalTrigger(seconds=HEARTBEAT_INTERVAL.total_seconds()))
813820
self.initialize()
814821
if not self.is_helper:
815822
self.schedule(self.query, timedelta(minutes=1))
@@ -863,48 +870,39 @@ def _run_callback(self, callback: WrappedCallback):
863870
with self._running_callbacks_lock:
864871
self._running_callbacks.pop(current_thread_id, None)
865872

866-
def _callback_iteration(self, callback: WrappedCallback):
867-
self._callbacks_executor.submit(self._run_callback, callback)
868-
callback.iterations += 1
869-
next_timestamp = callback.get_next_execution_timestamp()
870-
self._scheduler.enterabs(next_timestamp, 1, self._callback_iteration, (callback,))
871873

872874
def _start_extension_loop(self):
873875
api_logger.debug(f"Starting main loop for monitoring configuration: '{self.monitoring_config_name}'")
874876

875877
# These were scheduled before the extension started, schedule them now
876878
for callback in self._scheduled_callbacks_before_run:
877879
self._schedule_callback(callback)
878-
self._metrics_iteration()
879-
self._events_iteration()
880-
self._sfm_metrics_iteration()
881-
self._timediff_iteration()
882-
self._scheduler.run()
883-
884-
def _timediff_iteration(self):
885-
self._internal_executor.submit(self._update_cluster_time_diff)
886-
next_timestamp = self._get_and_set_next_internal_callback_timestamp("timediff", TIME_DIFF_INTERVAL)
887-
self._scheduler.enterabs(next_timestamp, 1, self._timediff_iteration)
888-
889-
def _heartbeat_iteration(self):
890-
self._heartbeat_executor.submit(self._heartbeat)
891-
next_timestamp = self._get_and_set_next_internal_callback_timestamp("heartbeat", HEARTBEAT_INTERVAL)
892-
self._scheduler.enterabs(next_timestamp, 2, self._heartbeat_iteration)
893-
894-
def _metrics_iteration(self):
895-
self._internal_executor.submit(self._send_metrics)
896-
next_timestamp = self._get_and_set_next_internal_callback_timestamp("metrics", METRIC_SENDING_INTERVAL)
897-
self._scheduler.enterabs(next_timestamp, 1, self._metrics_iteration)
898-
899-
def _events_iteration(self):
900-
self._internal_executor.submit(self._send_buffered_events)
901-
next_timestamp = self._get_and_set_next_internal_callback_timestamp("events", METRIC_SENDING_INTERVAL)
902-
self._scheduler.enterabs(next_timestamp, 1, self._events_iteration)
903-
904-
def _sfm_metrics_iteration(self):
905-
self._internal_executor.submit(self._send_sfm_metrics)
906-
next_timestamp = self._get_and_set_next_internal_callback_timestamp("sfm_metrics", SFM_METRIC_SENDING_INTERVAL)
907-
self._scheduler.enterabs(next_timestamp, 1, self._sfm_metrics_iteration)
880+
881+
882+
self._scheduler.add_job(self._send_metrics, executor=ExecutorType.INTERNAL,
883+
trigger=IntervalTrigger(seconds=METRIC_SENDING_INTERVAL.total_seconds()),
884+
next_run_time=datetime.now())
885+
886+
self._scheduler.add_job(self._send_buffered_events, executor=ExecutorType.INTERNAL,
887+
trigger=IntervalTrigger(seconds=METRIC_SENDING_INTERVAL.total_seconds()),
888+
next_run_time=datetime.now())
889+
890+
self._scheduler.add_job(self._send_sfm_metrics, executor=ExecutorType.INTERNAL,
891+
trigger=IntervalTrigger(seconds=SFM_METRIC_SENDING_INTERVAL.total_seconds()),
892+
next_run_time=datetime.now())
893+
894+
self._scheduler.add_job(self._update_cluster_time_diff, executor=ExecutorType.INTERNAL,
895+
trigger=IntervalTrigger(seconds=TIME_DIFF_INTERVAL.total_seconds()),
896+
next_run_time=datetime.now())
897+
898+
self._scheduler.start()
899+
900+
try:
901+
while self._scheduler.running:
902+
time.sleep(1)
903+
except Exception:
904+
self._scheduler.shutdown()
905+
908906

909907
def _send_metrics(self):
910908
with self._metrics_lock:
@@ -1105,8 +1103,8 @@ def _heartbeat(self):
11051103
api_logger.error(f"Heartbeat failed because {e}, response {response}", exc_info=True)
11061104

11071105
def __del__(self):
1108-
self._callbacks_executor.shutdown()
1109-
self._internal_executor.shutdown()
1106+
if self._scheduler.running:
1107+
self._scheduler.shutdown()
11101108

11111109
def _add_metric(self, metric: Metric):
11121110
metric.validate()
@@ -1150,7 +1148,7 @@ def _send_events_internal(self, events: dict | list[dict]):
11501148

11511149
def _send_events(self, events: dict | list[dict], send_immediately: bool = False):
11521150
if send_immediately:
1153-
self._internal_executor.submit(self._send_events_internal, events)
1151+
self._scheduler.add_job(self._send_events_internal, args=[events], executor=ExecutorType.INTERNAL)
11541152
return
11551153
with self._logs_lock:
11561154
if isinstance(events, dict):
@@ -1247,4 +1245,4 @@ def _send_sfm_logs(self, logs: dict | list[dict]):
12471245
log["dt.extension.config.label"] = self.monitoring_config_name
12481246
log.pop("monitoring.configuration", None)
12491247

1250-
self._internal_executor.submit(self._send_sfm_logs_internal, logs)
1248+
self._scheduler.add_job(self._send_sfm_logs_internal, args=[logs], executor=ExecutorType.INTERNAL)

dynatrace_extension/sdk/status.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ def __init__(self, send_sfm_logs_function: Callable) -> None:
159159
self._ep_records: dict[str, EndpointStatusRecord] = {}
160160
self._send_sfm_logs_function = send_sfm_logs_function
161161
self._logs_to_send: list[str] = []
162+
self._datetime_now = datetime.now # Mockable datetime function
162163

163164
def contains_any_status(self) -> bool:
164165
return len(self._ep_records) > 0
@@ -190,7 +191,7 @@ def send_ep_logs(self):
190191
ep_record.ep_status.message,
191192
)
192193
)
193-
ep_record.last_sent = datetime.now()
194+
ep_record.last_sent = self._datetime_now()
194195
ep_record.state = StatusState.ONGOING
195196

196197
if logs_to_send:
@@ -202,7 +203,7 @@ def _should_be_reported(self, ep_record: EndpointStatusRecord):
202203
elif ep_record.state in (StatusState.INITIAL, StatusState.NEW):
203204
return True
204205
elif ep_record.state == StatusState.ONGOING and (
205-
ep_record.last_sent is None or datetime.now() - ep_record.last_sent >= self.RESENDING_INTERVAL
206+
ep_record.last_sent is None or self._datetime_now() - ep_record.last_sent >= self.RESENDING_INTERVAL
206207
):
207208
return True
208209
else:

pyproject.toml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ dependencies = [
4747
"pytest",
4848
"typer[all]",
4949
"pyyaml",
50-
"dt-cli>=1.6.13",
51-
"freezegun"
50+
"dt-cli>=1.6.13"
5251
]
5352

5453
[tool.hatch.envs.default.scripts]
@@ -74,8 +73,7 @@ dependencies = [
7473
"ruff>=0.9.10",
7574
"typer[all]",
7675
"pyyaml",
77-
"pytest",
78-
"freezegun"
76+
"pytest"
7977
]
8078

8179
[tool.hatch.envs.lint.scripts]

tests/sdk/test_endpoints_sfm.py

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,42 @@
1-
import threading
21
import time
32
import unittest
43
from datetime import datetime, timedelta
54
from unittest.mock import MagicMock
65

7-
from freezegun import freeze_time
8-
96
from dynatrace_extension import EndpointStatus, EndpointStatuses, Extension, Severity, StatusValue
107

118

12-
class KillSchedulerError(Exception):
13-
pass
14-
15-
169
class TestSfmPerEndpont(unittest.TestCase):
1710
def setUp(self, extension_name=""):
1811
self.ext = Extension(name=extension_name)
1912
self.ext.logger = MagicMock()
2013
self.ext._running_in_sim = True
2114
self.ext._client = MagicMock()
15+
self.ext._client.send_sfm_logs = MagicMock(side_effect=lambda *args, **kwargs: [])
2216
self.ext._is_fastcheck = False
2317
self.i = 0
2418
self.test_cases = None
2519
self.time_machine_idx = None
2620

2721
self.ext.schedule(self.callback, timedelta(seconds=1))
28-
self.scheduler_thread = threading.Thread(target=self.scheduler_thread_fun)
29-
30-
def scheduler_thread_fun(self):
31-
with self.assertRaises(KillSchedulerError):
32-
self.ext._scheduler.run()
3322

3423
def tearDown(self) -> None:
35-
self.ext._scheduler.enter(delay=0, priority=1, action=lambda: (_ for _ in ()).throw(KillSchedulerError()))
36-
self.scheduler_thread.join()
24+
if Extension._instance and Extension._instance._scheduler.running:
25+
Extension._instance._scheduler.shutdown(wait=True)
26+
3727
Extension._instance = None
3828

3929
def run_test(self):
40-
self.scheduler_thread.start()
30+
self.ext._scheduler.start()
4131
time.sleep(0.1)
4232

4333
for case in self.test_cases[: self.time_machine_idx]:
4434
self.single_test_iteration(case)
4535

4636
if self.time_machine_idx:
47-
with freeze_time(datetime.now() + timedelta(hours=2), tick=True):
48-
for case in self.test_cases[self.time_machine_idx :]:
49-
self.single_test_iteration(case)
37+
self.ext._ep_statuses._datetime_now = lambda: datetime.now() + timedelta(hours=2)
38+
for case in self.test_cases[self.time_machine_idx :]:
39+
self.single_test_iteration(case)
5040

5141
def single_test_iteration(self, case):
5242
self.ext._client.send_sfm_logs.reset_mock()
@@ -247,7 +237,7 @@ def test_endpoint_independent_ongoing(self):
247237
),
248238
},
249239
{
250-
"status": EndpointStatus("1.2.3.4:1", StatusValue.WARNING, "Warning 1"),
240+
"status": None,
251241
"expected": self.expected_sfm_dict(
252242
device_address="1.2.3.4:1",
253243
level="WARN",
@@ -257,7 +247,7 @@ def test_endpoint_independent_ongoing(self):
257247
),
258248
},
259249
{
260-
"status": EndpointStatus("1.2.3.4:2", StatusValue.WARNING, "Warning 2"),
250+
"status": None,
261251
"expected": self.expected_sfm_dict(
262252
device_address="1.2.3.4:2",
263253
level="WARN",
@@ -287,19 +277,21 @@ def test_endpoint_independent_ongoing(self):
287277
},
288278
]
289279

290-
self.scheduler_thread.start()
280+
self.ext._scheduler.start()
291281
time.sleep(0.1)
292282

293283
self.single_test_iteration(self.test_cases[0])
294284

295-
with freeze_time(datetime.now() + timedelta(hours=1), tick=True):
296-
self.single_test_iteration(self.test_cases[1])
285+
self.ext._ep_statuses._datetime_now = lambda: datetime.now() + timedelta(hours=1)
286+
self.single_test_iteration(self.test_cases[1])
287+
288+
self.ext._ep_statuses._datetime_now = lambda: datetime.now() + timedelta(hours=2)
289+
self.single_test_iteration(self.test_cases[2])
297290

298-
with freeze_time(datetime.now() + timedelta(hours=1), tick=True):
299-
self.single_test_iteration(self.test_cases[2])
291+
self.ext._ep_statuses._datetime_now = lambda: datetime.now() + timedelta(hours=3)
292+
self.single_test_iteration(self.test_cases[3])
300293

301-
with freeze_time(datetime.now() + timedelta(hours=1), tick=True):
302-
self.single_test_iteration(self.test_cases[3])
294+
self.ext._ep_statuses._datetime_now = lambda: datetime.now() + timedelta(hours=5)
295+
self.single_test_iteration(self.test_cases[4])
303296

304-
with freeze_time(datetime.now() + timedelta(hours=2), tick=True):
305-
self.single_test_iteration(self.test_cases[4])
297+
self.ext._scheduler.shutdown(wait=True)

0 commit comments

Comments
 (0)