|
3 | 3 | # SPDX-License-Identifier: MIT
|
4 | 4 |
|
5 | 5 | import logging
|
6 |
| -import sched |
7 | 6 | import signal
|
8 | 7 | import sys
|
9 | 8 | import threading
|
10 | 9 | import time
|
11 |
| -from apscheduler.schedulers.background import BackgroundScheduler |
12 |
| -from apscheduler.executors.pool import ThreadPoolExecutor |
13 |
| -from apscheduler.triggers.interval import IntervalTrigger |
14 | 10 | from argparse import ArgumentParser
|
15 | 11 | from collections.abc import Callable
|
16 | 12 | from datetime import datetime, timedelta, timezone
|
|
20 | 16 | from threading import Lock, RLock, active_count
|
21 | 17 | from typing import Any, ClassVar, NamedTuple
|
22 | 18 |
|
| 19 | +from apscheduler.executors.pool import ThreadPoolExecutor # type: ignore |
| 20 | +from apscheduler.schedulers.background import BackgroundScheduler # type: ignore |
| 21 | +from apscheduler.triggers.interval import IntervalTrigger # type: ignore |
| 22 | + |
23 | 23 | from .activation import ActivationConfig, ActivationType
|
24 | 24 | from .callback import WrappedCallback
|
25 | 25 | from .communication import CommunicationClient, DebugClient, HttpClient
|
@@ -175,9 +175,9 @@ def _add_sfm_metric(metric: Metric, sfm_metrics: list[Metric] | None = None):
|
175 | 175 |
|
176 | 176 |
|
177 | 177 | class ExecutorType(str, Enum):
|
178 |
| - CALLBACKS = 'callbacks' |
179 |
| - INTERNAL = 'internal' |
180 |
| - HEARTBEAT = 'heartbeat' |
| 178 | + CALLBACKS = "callbacks" |
| 179 | + INTERNAL = "internal" |
| 180 | + HEARTBEAT = "heartbeat" |
181 | 181 |
|
182 | 182 |
|
183 | 183 | class Extension:
|
@@ -249,11 +249,13 @@ def __init__(self, name: str = "") -> None:
|
249 | 249 | self._running_callbacks_lock: Lock = Lock()
|
250 | 250 |
|
251 | 251 | # Scheduler and executors for the callbacks and internal methods
|
252 |
| - self._scheduler = BackgroundScheduler(executors={ |
253 |
| - ExecutorType.CALLBACKS: ThreadPoolExecutor(max_workers=CALLBACKS_THREAD_POOL_SIZE), |
254 |
| - ExecutorType.INTERNAL: ThreadPoolExecutor(max_workers=INTERNAL_THREAD_POOL_SIZE), |
255 |
| - ExecutorType.HEARTBEAT: ThreadPoolExecutor(max_workers=HEARTBEAT_THREAD_POOL_SIZE) |
256 |
| - }) |
| 252 | + self._scheduler = BackgroundScheduler( |
| 253 | + executors={ |
| 254 | + ExecutorType.CALLBACKS: ThreadPoolExecutor(max_workers=CALLBACKS_THREAD_POOL_SIZE), |
| 255 | + ExecutorType.INTERNAL: ThreadPoolExecutor(max_workers=INTERNAL_THREAD_POOL_SIZE), |
| 256 | + ExecutorType.HEARTBEAT: ThreadPoolExecutor(max_workers=HEARTBEAT_THREAD_POOL_SIZE), |
| 257 | + } |
| 258 | + ) |
257 | 259 |
|
258 | 260 | # Extension metrics
|
259 | 261 | self._metrics_lock = RLock()
|
@@ -376,10 +378,12 @@ def _schedule_callback(self, callback: WrappedCallback):
|
376 | 378 | callback.running_in_sim = self._running_in_sim
|
377 | 379 | self._scheduled_callbacks.append(callback)
|
378 | 380 |
|
379 |
| - self._scheduler.add_job(self._run_callback, args=[callback], |
| 381 | + self._scheduler.add_job( |
| 382 | + self._run_callback, |
| 383 | + args=[callback], |
380 | 384 | executor=ExecutorType.CALLBACKS,
|
381 | 385 | trigger=IntervalTrigger(seconds=callback.interval.total_seconds()),
|
382 |
| - next_run_time=datetime.now() + timedelta(seconds=callback.initial_wait_time()) |
| 386 | + next_run_time=datetime.now() + timedelta(seconds=callback.initial_wait_time()), |
383 | 387 | )
|
384 | 388 |
|
385 | 389 | def schedule(
|
@@ -813,10 +817,15 @@ def _parse_args(self):
|
813 | 817 |
|
814 | 818 | if not self._is_fastcheck:
|
815 | 819 | try:
|
816 |
| - # TODO: is it surely okay to schedule hearbeat this way? Originally it was scheduled in the very same scheduler, which would starve heartbeat if any callback took too long |
| 820 | + # TODO: is it surely okay to schedule hearbeat this way? Originally it was scheduled in the very same scheduler, |
| 821 | + # which would starve heartbeat if any callback took too long |
817 | 822 | # On the other hand, those callbacks inserted specific potentially risky jobs to different executors, so it should be okay?
|
818 | 823 | # Why did heartbeat have a different priority (higher or lower?)
|
819 |
| - self._scheduler.add_job(self._heartbeat, executor=ExecutorType.HEARTBEAT, trigger=IntervalTrigger(seconds=HEARTBEAT_INTERVAL.total_seconds())) |
| 824 | + self._scheduler.add_job( |
| 825 | + self._heartbeat, |
| 826 | + executor=ExecutorType.HEARTBEAT, |
| 827 | + trigger=IntervalTrigger(seconds=HEARTBEAT_INTERVAL.total_seconds()), |
| 828 | + ) |
820 | 829 | self.initialize()
|
821 | 830 | if not self.is_helper:
|
822 | 831 | self.schedule(self.query, timedelta(minutes=1))
|
@@ -870,40 +879,49 @@ def _run_callback(self, callback: WrappedCallback):
|
870 | 879 | with self._running_callbacks_lock:
|
871 | 880 | self._running_callbacks.pop(current_thread_id, None)
|
872 | 881 |
|
873 |
| - |
874 | 882 | def _start_extension_loop(self):
|
875 | 883 | api_logger.debug(f"Starting main loop for monitoring configuration: '{self.monitoring_config_name}'")
|
876 | 884 |
|
877 | 885 | # These were scheduled before the extension started, schedule them now
|
878 | 886 | for callback in self._scheduled_callbacks_before_run:
|
879 | 887 | self._schedule_callback(callback)
|
880 | 888 |
|
881 |
| - |
882 |
| - self._scheduler.add_job(self._send_metrics, executor=ExecutorType.INTERNAL, |
| 889 | + self._scheduler.add_job( |
| 890 | + self._send_metrics, |
| 891 | + executor=ExecutorType.INTERNAL, |
883 | 892 | trigger=IntervalTrigger(seconds=METRIC_SENDING_INTERVAL.total_seconds()),
|
884 |
| - next_run_time=datetime.now()) |
| 893 | + next_run_time=datetime.now(), |
| 894 | + ) |
885 | 895 |
|
886 |
| - self._scheduler.add_job(self._send_buffered_events, executor=ExecutorType.INTERNAL, |
| 896 | + self._scheduler.add_job( |
| 897 | + self._send_buffered_events, |
| 898 | + executor=ExecutorType.INTERNAL, |
887 | 899 | trigger=IntervalTrigger(seconds=METRIC_SENDING_INTERVAL.total_seconds()),
|
888 |
| - next_run_time=datetime.now()) |
| 900 | + next_run_time=datetime.now(), |
| 901 | + ) |
889 | 902 |
|
890 |
| - self._scheduler.add_job(self._send_sfm_metrics, executor=ExecutorType.INTERNAL, |
| 903 | + self._scheduler.add_job( |
| 904 | + self._send_sfm_metrics, |
| 905 | + executor=ExecutorType.INTERNAL, |
891 | 906 | trigger=IntervalTrigger(seconds=SFM_METRIC_SENDING_INTERVAL.total_seconds()),
|
892 |
| - next_run_time=datetime.now()) |
| 907 | + next_run_time=datetime.now(), |
| 908 | + ) |
893 | 909 |
|
894 |
| - self._scheduler.add_job(self._update_cluster_time_diff, executor=ExecutorType.INTERNAL, |
| 910 | + self._scheduler.add_job( |
| 911 | + self._update_cluster_time_diff, |
| 912 | + executor=ExecutorType.INTERNAL, |
895 | 913 | trigger=IntervalTrigger(seconds=TIME_DIFF_INTERVAL.total_seconds()),
|
896 |
| - next_run_time=datetime.now()) |
| 914 | + next_run_time=datetime.now(), |
| 915 | + ) |
897 | 916 |
|
898 | 917 | self._scheduler.start()
|
899 |
| - |
| 918 | + |
900 | 919 | try:
|
901 | 920 | while self._scheduler.running:
|
902 | 921 | time.sleep(1)
|
903 | 922 | except Exception:
|
904 | 923 | self._scheduler.shutdown()
|
905 | 924 |
|
906 |
| - |
907 | 925 | def _send_metrics(self):
|
908 | 926 | with self._metrics_lock:
|
909 | 927 | with self._internal_callbacks_results_lock:
|
@@ -1167,11 +1185,6 @@ def _send_buffered_events(self):
|
1167 | 1185 | def _send_dt_event(self, event: dict[str, str | int | dict[str, str]]):
|
1168 | 1186 | self._client.send_dt_event(event)
|
1169 | 1187 |
|
1170 |
| - def _get_and_set_next_internal_callback_timestamp(self, callback_name: str, interval: timedelta): |
1171 |
| - next_timestamp = self._next_internal_callbacks_timestamps[callback_name] |
1172 |
| - self._next_internal_callbacks_timestamps[callback_name] += interval |
1173 |
| - return next_timestamp.timestamp() |
1174 |
| - |
1175 | 1188 | def get_version(self) -> str:
|
1176 | 1189 | """Return the extension version."""
|
1177 | 1190 | return self.activation_config.version
|
|
0 commit comments