Skip to content

Commit 0bd309c

Browse files
Feature/sfm logs per endpoint (#120)
* Basic sfm sending * EndpointStatuses reported explicitly * Sending SFM log per endpoint * SFM logs blocked for custom extensions * Unit tests for SFM statuses per endpoint * Refactored building of overall status, fixed enrichment, linter fixes. * Removed `clear_endpoint_error` (redundant and potentially misleading) * Status implementation extracted to a separate file * Additional log if SFMs not allowed for custom extensions.
1 parent c62bc6a commit 0bd309c

File tree

9 files changed

+624
-287
lines changed

9 files changed

+624
-287
lines changed

dynatrace_extension/__about__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@
33
# SPDX-License-Identifier: MIT
44

55

6-
__version__ = "1.6.3"
6+
__version__ = "1.7.0"

dynatrace_extension/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
# ruff: noqa: F401
77

88
from .sdk.activation import ActivationConfig, ActivationType
9-
from .sdk.communication import EndpointStatus, EndpointStatuses, IgnoreStatus, MultiStatus, Status, StatusValue
109
from .sdk.event import Severity
1110
from .sdk.extension import DtEventType, Extension
1211
from .sdk.helper import (
@@ -25,3 +24,4 @@
2524
schedule_function,
2625
)
2726
from .sdk.metric import Metric, MetricType, SummaryStat
27+
from .sdk.status import EndpointStatus, EndpointStatuses, IgnoreStatus, MultiStatus, Status, StatusValue

dynatrace_extension/sdk/callback.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from timeit import default_timer as timer
1010

1111
from .activation import ActivationType
12-
from .communication import EndpointStatuses, IgnoreStatus, MultiStatus, Status, StatusValue
12+
from .status import EndpointStatuses, IgnoreStatus, MultiStatus, Status, StatusValue
1313

1414

1515
class WrappedCallback:

dynatrace_extension/sdk/communication.py

Lines changed: 24 additions & 170 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,10 @@
1010
from abc import ABC, abstractmethod
1111
from collections.abc import Generator, Sequence
1212
from dataclasses import dataclass
13-
from enum import Enum
1413
from pathlib import Path
15-
from threading import RLock
1614
from typing import Any, TypeVar
1715

16+
from .status import Status
1817
from .vendor.mureq.mureq import HTTPException, Response, request
1918

2019
CONTENT_TYPE_JSON = "application/json;charset=utf-8"
@@ -30,173 +29,6 @@
3029
HTTP_BAD_REQUEST = 400
3130

3231

33-
class StatusValue(Enum):
34-
EMPTY = ""
35-
OK = "OK"
36-
GENERIC_ERROR = "GENERIC_ERROR"
37-
INVALID_ARGS_ERROR = "INVALID_ARGS_ERROR"
38-
EEC_CONNECTION_ERROR = "EEC_CONNECTION_ERROR"
39-
INVALID_CONFIG_ERROR = "INVALID_CONFIG_ERROR"
40-
AUTHENTICATION_ERROR = "AUTHENTICATION_ERROR"
41-
DEVICE_CONNECTION_ERROR = "DEVICE_CONNECTION_ERROR"
42-
WARNING = "WARNING"
43-
UNKNOWN_ERROR = "UNKNOWN_ERROR"
44-
45-
46-
class IgnoreStatus:
47-
pass
48-
49-
50-
class Status:
51-
def __init__(self, status: StatusValue = StatusValue.EMPTY, message: str = "", timestamp: int | None = None):
52-
self.status = status
53-
self.message = message
54-
self.timestamp = timestamp
55-
56-
def to_json(self) -> dict:
57-
status = {"status": self.status.value, "message": self.message}
58-
if self.timestamp:
59-
status["timestamp"] = self.timestamp # type: ignore
60-
return status
61-
62-
def __repr__(self):
63-
return json.dumps(self.to_json())
64-
65-
def is_error(self) -> bool:
66-
# WARNING is treated as an error
67-
return self.status not in (StatusValue.OK, StatusValue.EMPTY)
68-
69-
def is_warning(self) -> bool:
70-
return self.status == StatusValue.WARNING
71-
72-
73-
class MultiStatus:
74-
def __init__(self):
75-
self.statuses: list[Status] = []
76-
77-
def add_status(self, status: StatusValue, message):
78-
self.statuses.append(Status(status, message))
79-
80-
def build(self) -> Status:
81-
ret = Status(StatusValue.OK)
82-
if len(self.statuses) == 0:
83-
return ret
84-
85-
messages = []
86-
all_ok = True
87-
all_err = True
88-
any_warning = False
89-
90-
for stored_status in self.statuses:
91-
if stored_status.message != "":
92-
messages.append(stored_status.message)
93-
94-
if stored_status.is_warning():
95-
any_warning = True
96-
97-
if stored_status.is_error():
98-
all_ok = False
99-
else:
100-
all_err = False
101-
102-
ret.message = ", ".join(messages)
103-
104-
if any_warning:
105-
ret.status = StatusValue.WARNING
106-
elif all_ok:
107-
ret.status = StatusValue.OK
108-
elif all_err:
109-
ret.status = StatusValue.GENERIC_ERROR
110-
else:
111-
ret.status = StatusValue.WARNING
112-
113-
return ret
114-
115-
116-
class EndpointStatus:
117-
def __init__(self, endpoint_hint: str, short_status: StatusValue, message: str):
118-
self.endpoint = endpoint_hint
119-
self.status: StatusValue = short_status
120-
self.message = message
121-
122-
def __str__(self):
123-
return str(self.__dict__)
124-
125-
126-
class EndpointStatuses:
127-
class TooManyEndpointStatusesError(Exception):
128-
pass
129-
130-
class MergeConflictError(Exception):
131-
def __init__(self, first: EndpointStatus, second: EndpointStatus):
132-
super().__init__(f"Endpoint Statuses conflict while merging - first: {first}; second: {second}")
133-
134-
def __init__(self, total_endpoints_number: int):
135-
self._lock = RLock()
136-
self._faulty_endpoints: dict[str, EndpointStatus] = {}
137-
self._num_endpoints = total_endpoints_number
138-
139-
def add_endpoint_status(self, status: EndpointStatus):
140-
with self._lock:
141-
if status.status == StatusValue.OK:
142-
self.clear_endpoint_error(status.endpoint)
143-
else:
144-
if len(self._faulty_endpoints) == self._num_endpoints:
145-
message = "Cannot add another endpoint status. \
146-
The number of reported statuses already has reached preconfigured maximum of {self._num_endpoints} endpoints."
147-
raise EndpointStatuses.TooManyEndpointStatusesError(message)
148-
149-
self._faulty_endpoints[status.endpoint] = status
150-
151-
def clear_endpoint_error(self, endpoint_hint: str):
152-
with self._lock:
153-
try:
154-
del self._faulty_endpoints[endpoint_hint]
155-
except KeyError:
156-
pass
157-
158-
def merge(self, other: EndpointStatuses):
159-
with self._lock:
160-
with other._lock:
161-
self._num_endpoints += other._num_endpoints
162-
163-
for endpoint, status in other._faulty_endpoints.items():
164-
if endpoint not in self._faulty_endpoints.keys():
165-
self._faulty_endpoints[endpoint] = status
166-
else:
167-
self._num_endpoints -= 1
168-
raise EndpointStatuses.MergeConflictError(
169-
self._faulty_endpoints[endpoint], other._faulty_endpoints[endpoint]
170-
)
171-
172-
def build_common_status(self) -> Status:
173-
with self._lock:
174-
ok_count = self._num_endpoints - len(self._faulty_endpoints)
175-
nok_count = len(self._faulty_endpoints)
176-
177-
if nok_count == 0:
178-
return Status(StatusValue.OK, f"Endpoints OK: {self._num_endpoints} NOK: 0")
179-
180-
error_messages = []
181-
for ep_status in self._faulty_endpoints.values():
182-
error_messages.append(f"{ep_status.endpoint} - {ep_status.status.value} {ep_status.message}")
183-
common_msg = ", ".join(error_messages)
184-
185-
# Determine status value
186-
all_endpoints_faulty = nok_count == self._num_endpoints
187-
has_warning_status = StatusValue.WARNING in [
188-
ep_status.status for ep_status in self._faulty_endpoints.values()
189-
]
190-
191-
if all_endpoints_faulty and not has_warning_status:
192-
status_value = StatusValue.GENERIC_ERROR
193-
else:
194-
status_value = StatusValue.WARNING
195-
196-
message = f"Endpoints OK: {ok_count} NOK: {nok_count} NOK_reported_errors: {common_msg}"
197-
return Status(status=status_value, message=message)
198-
199-
20032
class CommunicationClient(ABC):
20133
"""
20234
Abstract class for extension communication
@@ -250,6 +82,10 @@ def get_cluster_time_diff(self) -> int:
25082
def send_dt_event(self, event: dict) -> None:
25183
pass
25284

85+
@abstractmethod
86+
def send_sfm_logs(self, sfm_logs: dict | list[dict]) -> list[dict | None]:
87+
pass
88+
25389

25490
class HttpClient(CommunicationClient):
25591
"""
@@ -261,6 +97,7 @@ def __init__(self, base_url: str, datasource_id: str, id_token_file_path: str, l
26197
self._extension_config_url = f"{base_url}/extconfig/{datasource_id}"
26298
self._metric_url = f"{base_url}/mint/{datasource_id}"
26399
self._sfm_url = f"{base_url}/sfm/{datasource_id}"
100+
self._sfm_logs_url = f"{base_url}/sfmlogs/{datasource_id}"
264101
self._keep_alive_url = f"{base_url}/alive/{datasource_id}"
265102
self._timediff_url = f"{base_url}/timediffms"
266103
self._events_url = f"{base_url}/logs/{datasource_id}"
@@ -417,7 +254,13 @@ def send_metrics(self, mint_lines: list[str]) -> list[MintResponse]:
417254

418255
def send_events(self, events: dict | list[dict], eec_enrichment: bool = True) -> list[dict | None]:
419256
self.logger.debug(f"Sending log events: {events}")
257+
return self._send_events(self._events_url, events, eec_enrichment)
258+
259+
def send_sfm_logs(self, sfm_logs: dict | list[dict]):
260+
self.logger.debug(f"Sending SFM logs: {sfm_logs}")
261+
return self._send_events(self._sfm_logs_url, sfm_logs)
420262

263+
def _send_events(self, url, events: dict | list[dict], eec_enrichment: bool = True) -> list[dict | None]:
421264
responses = []
422265
if isinstance(events, dict):
423266
events = [events]
@@ -426,7 +269,7 @@ def send_events(self, events: dict | list[dict], eec_enrichment: bool = True) ->
426269
for batch in batches:
427270
try:
428271
eec_response = self._make_request(
429-
self._events_url,
272+
url,
430273
"POST",
431274
batch,
432275
extra_headers={"Content-Type": CONTENT_TYPE_JSON, "eec-enrichment": str(eec_enrichment).lower()},
@@ -583,6 +426,17 @@ def replace_secrets_in_activation_config(self, secrets: dict, activation_config_
583426

584427
return activation_config_string
585428

429+
def send_sfm_logs(self, sfm_logs: dict | list[dict]) -> list[dict | None]:
430+
if isinstance(sfm_logs, dict):
431+
sfm_logs = [sfm_logs]
432+
433+
self.logger.info(f"send_sfm_logs: {len(sfm_logs)} logs")
434+
435+
if self.print_metrics:
436+
for log in sfm_logs:
437+
self.logger.info(f"send_sfm_log: {log}")
438+
return []
439+
586440

587441
def divide_into_batches(
588442
items: Sequence[dict | str], max_size_bytes: int, join_with: str | None = None

dynatrace_extension/sdk/extension.py

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,12 @@
2020

2121
from .activation import ActivationConfig, ActivationType
2222
from .callback import WrappedCallback
23-
from .communication import (
24-
CommunicationClient,
25-
DebugClient,
26-
EndpointStatuses,
27-
HttpClient,
28-
IgnoreStatus,
29-
Status,
30-
StatusValue,
31-
)
23+
from .communication import CommunicationClient, DebugClient, HttpClient
3224
from .event import Severity
3325
from .metric import Metric, MetricType, SfmMetric, SummaryStat
3426
from .runtime import RuntimeProperties
3527
from .snapshot import Snapshot
28+
from .status import EndpointStatuses, EndpointStatusesMap, IgnoreStatus, Status, StatusValue
3629
from .throttled_logger import StrictThrottledHandler, ThrottledHandler
3730

3831
HEARTBEAT_INTERVAL = timedelta(seconds=50)
@@ -285,6 +278,13 @@ def __init__(self, name: str = "") -> None:
285278
# Error message from caught exception in self.initialize()
286279
self._initialization_error: str = ""
287280

281+
# Map of all Endpoint Statuses
282+
self._sfm_logs_allowed = not self.extension_name.startswith("custom:")
283+
if not self._sfm_logs_allowed:
284+
self.logger.warning("SFM logs not allowed for custom extensions.")
285+
286+
self._ep_statuses = EndpointStatusesMap(send_sfm_logs_function=self._send_sfm_logs)
287+
288288
self._parse_args()
289289

290290
for function, interval, args, activation_type in Extension.schedule_decorators:
@@ -1037,8 +1037,7 @@ def _build_current_status(self):
10371037
if internal_callback_error:
10381038
return Status(overall_status_value, "\n".join(messages))
10391039

1040-
# Handle regular statuses, merge all EndpointStatuses
1041-
ep_status_merged = EndpointStatuses(0)
1040+
# Handle regular statuses, report all EndpointStatuses
10421041
all_ok = True
10431042
all_err = True
10441043
any_warning = False
@@ -1048,10 +1047,7 @@ def _build_current_status(self):
10481047
continue
10491048

10501049
if isinstance(callback.status, EndpointStatuses):
1051-
try:
1052-
ep_status_merged.merge(callback.status)
1053-
except EndpointStatuses.MergeConflictError as e:
1054-
self.logger.exception(e)
1050+
self._ep_statuses.update_ep_statuses(callback.status)
10551051
continue
10561052

10571053
if callback.status.is_warning():
@@ -1066,8 +1062,9 @@ def _build_current_status(self):
10661062
messages.append(f"{callback.name()}: {callback.status.status.value} - {callback.status.message}")
10671063

10681064
# Handle merged EndpointStatuses
1069-
if ep_status_merged._num_endpoints > 0:
1070-
ep_status_merged = ep_status_merged.build_common_status()
1065+
if self._ep_statuses.contains_any_status():
1066+
self._ep_statuses.send_ep_logs()
1067+
ep_status_merged = self._ep_statuses.build_common_status()
10711068
messages.insert(0, ep_status_merged.message)
10721069

10731070
if ep_status_merged.is_warning():
@@ -1221,3 +1218,32 @@ def get_snapshot(self, snapshot_file: Path | str | None = None) -> Snapshot:
12211218
raise FileNotFoundError(msg)
12221219

12231220
return Snapshot.parse_from_file(snapshot_file)
1221+
1222+
def _send_sfm_logs_internal(self, logs: dict | list[dict]):
1223+
try:
1224+
responses = self._client.send_sfm_logs(logs)
1225+
1226+
for response in responses:
1227+
with self._internal_callbacks_results_lock:
1228+
self._internal_callbacks_results[self._send_sfm_logs_internal.__name__] = Status(StatusValue.OK)
1229+
if not response or "error" not in response or "message" not in response["error"]:
1230+
return
1231+
self._internal_callbacks_results[self._send_sfm_logs_internal.__name__] = Status(
1232+
StatusValue.GENERIC_ERROR, response["error"]["message"]
1233+
)
1234+
except Exception as e:
1235+
api_logger.error(f"Error sending SFM logs: {e!r}", exc_info=True)
1236+
with self._internal_callbacks_results_lock:
1237+
self._internal_callbacks_results[self._send_sfm_logs_internal.__name__] = Status(StatusValue.GENERIC_ERROR, str(e))
1238+
1239+
1240+
def _send_sfm_logs(self, logs: dict | list[dict]):
1241+
if not self._sfm_logs_allowed or not logs:
1242+
return
1243+
1244+
for log in logs:
1245+
log.update(self._metadata)
1246+
log["dt.extension.config.label"] = self.monitoring_config_name
1247+
log.pop("monitoring.configuration", None)
1248+
1249+
self._internal_executor.submit(self._send_sfm_logs_internal, logs)

0 commit comments

Comments
 (0)