Skip to content

Commit 0cfd1b7

Browse files
committed
Corrected processing with rate limits for EACH device connected through the gateway
1 parent 85e8207 commit 0cfd1b7

File tree

2 files changed

+80
-46
lines changed

2 files changed

+80
-46
lines changed

tb_device_mqtt.py

Lines changed: 76 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -148,26 +148,35 @@ def get(self):
148148

149149

150150
class RateLimit:
151-
def __init__(self, rate_limit, percentage=80):
152-
self.__start_time = monotonic()
151+
def __init__(self, rate_limit, name=None, percentage=80):
153152
self.__no_limit = False
154-
if ''.join(c for c in rate_limit if c not in [' ', ',', ';']) in ("", "0:0"):
155-
self.__no_limit = True
156153
self.__rate_limit_dict = {}
157154
self.__lock = RLock()
158-
rate_configs = rate_limit.split(";")
159-
if "," in rate_limit:
160-
rate_configs = rate_limit.split(",")
161-
for rate in rate_configs:
162-
if rate == "":
163-
continue
164-
rate = rate.split(":")
165-
self.__rate_limit_dict[int(rate[1])] = {"counter": 0,
166-
"start": int(monotonic()),
167-
"limit": int(int(rate[0]) * percentage/100)}
168-
log.debug("Rate limit set to values: ")
169155
self.__minimal_timeout = DEFAULT_TIMEOUT
170156
self.__minimal_limit = 1000000000
157+
from_dict = isinstance(rate_limit, dict)
158+
if from_dict:
159+
self.__rate_limit_dict = rate_limit.get('rateLimit', rate_limit)
160+
name = rate_limit.get('name', name)
161+
percentage = rate_limit.get('percentage', percentage)
162+
self.no_limit = rate_limit.get('no_limit', False)
163+
self.name = name
164+
self.percentage = percentage
165+
self.__start_time = int(monotonic())
166+
if not from_dict:
167+
if ''.join(c for c in rate_limit if c not in [' ', ',', ';']) in ("", "0:0"):
168+
self.__no_limit = True
169+
rate_configs = rate_limit.split(";")
170+
if "," in rate_limit:
171+
rate_configs = rate_limit.split(",")
172+
for rate in rate_configs:
173+
if rate == "":
174+
continue
175+
rate = rate.split(":")
176+
self.__rate_limit_dict[int(rate[1])] = {"counter": 0,
177+
"start": int(monotonic()),
178+
"limit": int(int(rate[0]) * self.percentage / 100)}
179+
log.debug("Rate limit set to values: ")
171180
with self.__lock:
172181
if not self.__no_limit:
173182
for rate_limit_time in self.__rate_limit_dict:
@@ -209,10 +218,11 @@ def get_minimal_timeout(self):
209218
def has_limit(self):
210219
return not self.__no_limit
211220

212-
def set_limit(self, rate_limit, percentage=0):
221+
def set_limit(self, rate_limit, percentage=80):
213222
with self.__lock:
214223
old_rate_limit_dict = deepcopy(self.__rate_limit_dict)
215224
self.__rate_limit_dict = {}
225+
self.percentage = percentage if percentage != 0 else self.percentage
216226
rate_configs = rate_limit.split(";")
217227
if "," in rate_limit:
218228
rate_configs = rate_limit.split(",")
@@ -222,9 +232,10 @@ def set_limit(self, rate_limit, percentage=0):
222232
rate = rate.split(":")
223233
rate_limit_time = int(rate[1])
224234
limit = int(int(rate[0]) * percentage / 100)
225-
self.__rate_limit_dict[int(rate[1])] = {"counter": old_rate_limit_dict.get(rate_limit_time, {}).get('counter', 0),
226-
"start": self.__rate_limit_dict.get(rate_limit_time, {}).get('start', int(monotonic())),
227-
"limit": limit}
235+
self.__rate_limit_dict[int(rate[1])] = {
236+
"counter": old_rate_limit_dict.get(rate_limit_time, {}).get('counter', 0),
237+
"start": self.__rate_limit_dict.get(rate_limit_time, {}).get('start', int(monotonic())),
238+
"limit": limit}
228239
if rate_limit_time < self.__minimal_limit:
229240
self.__minimal_timeout = rate_limit_time + 1
230241
if limit < self.__minimal_limit:
@@ -235,6 +246,14 @@ def set_limit(self, rate_limit, percentage=0):
235246
for rate_limit_time in self.__rate_limit_dict:
236247
log.debug("Time: %s, Limit: %s", rate_limit_time, self.__rate_limit_dict[rate_limit_time]["limit"])
237248

249+
def __dict__(self):
250+
return {
251+
"rateLimit": self.__rate_limit_dict,
252+
"name": self.name,
253+
"percentage": self.percentage,
254+
"no_limit": self.__no_limit
255+
}
256+
238257
@staticmethod
239258
def get_rate_limits_by_host(host, rate_limit, dp_rate_limit):
240259
rate_limit = RateLimit.get_rate_limit_by_host(host, rate_limit)
@@ -271,11 +290,11 @@ def get_rate_limit_by_host(host, rate_limit):
271290
def get_dp_rate_limit_by_host(host, dp_rate_limit):
272291
if dp_rate_limit == "DEFAULT_TELEMETRY_DP_RATE_LIMIT":
273292
if "thingsboard.cloud" in host:
274-
dp_rate_limit = "5:1,60:60,"
293+
dp_rate_limit = "10:1,300:60,"
275294
elif "tb" in host and "cloud" in host:
276-
dp_rate_limit = "5:1,60:60,"
295+
dp_rate_limit = "10:1,300:60,"
277296
elif "demo.thingsboard.io" in host:
278-
dp_rate_limit = "5:1,60:60,"
297+
dp_rate_limit = "10:1,300:60,"
279298
else:
280299
dp_rate_limit = "0:0,"
281300
else:
@@ -287,7 +306,7 @@ def get_dp_rate_limit_by_host(host, dp_rate_limit):
287306
class TBDeviceMqttClient:
288307
"""ThingsBoard MQTT client. This class provides interface to send data to ThingsBoard and receive data from"""
289308

290-
EMPTY_RATE_LIMIT = RateLimit('0:0,')
309+
EMPTY_RATE_LIMIT = RateLimit('0:0,', "EMPTY_RATE_LIMIT")
291310

292311
def __init__(self, host, port=1883, username=None, password=None, quality_of_service=None, client_id="",
293312
chunk_size=0, messages_rate_limit="DEFAULT_MESSAGES_RATE_LIMIT",
@@ -325,10 +344,10 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser
325344
telemetry_dp_rate_limit)
326345
messages_rate_limit = RateLimit.get_rate_limit_by_host(self.__host, messages_rate_limit)
327346

328-
self._messages_rate_limit = RateLimit(messages_rate_limit)
329-
self.__telemetry_rate_limit = RateLimit(telemetry_rate_limit)
330-
self.__telemetry_dp_rate_limit = RateLimit(telemetry_dp_rate_limit)
331-
self._client.max_inflight_messages_set(self.__telemetry_rate_limit.get_minimal_limit())
347+
self._messages_rate_limit = RateLimit(messages_rate_limit, "Rate limit for messages")
348+
self._telemetry_rate_limit = RateLimit(telemetry_rate_limit, "Rate limit for telemetry messages")
349+
self._telemetry_dp_rate_limit = RateLimit(telemetry_dp_rate_limit, "Rate limit for telemetry data points")
350+
self._client.max_inflight_messages_set(self._telemetry_rate_limit.get_minimal_limit())
332351
self.__attrs_request_timeout = {}
333352
self.__timeout_thread = Thread(target=self.__timeout_check)
334353
self.__timeout_thread.daemon = True
@@ -651,13 +670,13 @@ def on_service_configuration(self, _, response, *args, **kwargs):
651670
if rate_limits_config.get('messages'):
652671
self._messages_rate_limit.set_limit(rate_limits_config.get('messages'), percentage=80)
653672
if rate_limits_config.get('telemetryMessages'):
654-
self.__telemetry_rate_limit.set_limit(rate_limits_config.get('telemetryMessages'), percentage=80)
673+
self._telemetry_rate_limit.set_limit(rate_limits_config.get('telemetryMessages'), percentage=80)
655674
if rate_limits_config.get('telemetryDataPoints'):
656-
self.__telemetry_dp_rate_limit.set_limit(rate_limits_config.get('telemetryDataPoints'), percentage=80)
675+
self._telemetry_dp_rate_limit.set_limit(rate_limits_config.get('telemetryDataPoints'), percentage=80)
657676
if service_config.get('maxInflightMessages'):
658677
max_inflight_messages = int(min(self._messages_rate_limit.get_minimal_limit(),
659-
self.__telemetry_rate_limit.get_minimal_limit(),
660-
service_config.get('maxInflightMessages', 100)) * 80 / 100)
678+
self._telemetry_rate_limit.get_minimal_limit(),
679+
service_config.get('maxInflightMessages', 100)) * 80 / 100)
661680
self.max_inflight_messages_set(max_inflight_messages)
662681
if service_config.get('maxPayloadSize'):
663682
self.max_payload_size = int(int(service_config.get('maxPayloadSize')) * 80 / 100)
@@ -677,6 +696,7 @@ def _wait_for_rate_limit_released(self, timeout, message_rate_limit, dp_rate_lim
677696
disconnected = False
678697
limit_reached_check = True
679698
log_posted = False
699+
waited = False
680700
while limit_reached_check:
681701
limit_reached_check = (message_rate_limit.check_limit_reached()
682702
or (dp_rate_limit is not None and dp_rate_limit.check_limit_reached(amount=amount))
@@ -695,11 +715,15 @@ def _wait_for_rate_limit_released(self, timeout, message_rate_limit, dp_rate_lim
695715
return TBPublishInfo(paho.MQTTMessageInfo(None))
696716
if not log_posted and limit_reached_check:
697717
if isinstance(limit_reached_check, int):
698-
log.debug("Rate limit reached for %i seconds, waiting for rate limit to be released...", limit_reached_check)
718+
log.debug("Rate limit reached for %i seconds, waiting for rate limit to be released...",
719+
limit_reached_check)
720+
waited = True
699721
else:
700722
log.debug("Waiting for rate limit to be released...")
701723
log_posted = True
702724
sleep(.01)
725+
if waited:
726+
log.debug("Rate limit released, sending data to ThingsBoard...")
703727

704728
def wait_until_current_queued_messages_processed(self):
705729
previous_notification_time = 0
@@ -718,12 +742,12 @@ def _send_request(self, _type, kwargs, timeout=DEFAULT_TIMEOUT, device=None,
718742
msg_rate_limit=None, dp_rate_limit=None):
719743
if msg_rate_limit is None:
720744
if kwargs.get('topic') == TELEMETRY_TOPIC:
721-
msg_rate_limit = self.__telemetry_rate_limit
745+
msg_rate_limit = self._telemetry_rate_limit
722746
else:
723747
msg_rate_limit = self._messages_rate_limit
724748
if dp_rate_limit is None:
725749
if kwargs.get('topic') == TELEMETRY_TOPIC:
726-
dp_rate_limit = self.__telemetry_dp_rate_limit
750+
dp_rate_limit = self._telemetry_dp_rate_limit
727751
else:
728752
dp_rate_limit = self.EMPTY_RATE_LIMIT
729753
if msg_rate_limit.has_limit() or dp_rate_limit.has_limit():
@@ -758,7 +782,7 @@ def __get_rate_limits_by_topic(self, topic, device=None, msg_rate_limit=None, dp
758782
return msg_rate_limit, dp_rate_limit
759783
else:
760784
if topic == TELEMETRY_TOPIC:
761-
return self.__telemetry_rate_limit, self.__telemetry_dp_rate_limit
785+
return self._telemetry_rate_limit, self._telemetry_dp_rate_limit
762786
else:
763787
return self._messages_rate_limit, None
764788

@@ -771,22 +795,20 @@ def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate
771795
attributes_format = topic.endswith('attributes')
772796
if topic.endswith('telemetry') or attributes_format:
773797
if device is None or data.get(device) is None:
774-
device_split_messages = self._split_message(data, dp_rate_limit.get_minimal_limit(),
775-
self.max_payload_size)
798+
device_split_messages = self._split_message(data, dp_rate_limit.get_minimal_limit(), self.max_payload_size)
776799
if attributes_format:
777800
split_messages = [{'message': msg_data, 'datapoints': len(msg_data)} for split_message in device_split_messages for msg_data in split_message['data']]
778801
else:
779802
split_messages = [{'message': split_message['data'], 'datapoints': split_message['datapoints']} for split_message in device_split_messages]
780803
else:
781804
device_data = data.get(device)
782-
device_split_messages = self._split_message(device_data, dp_rate_limit.get_minimal_limit(),
783-
self.max_payload_size)
805+
device_split_messages = self._split_message(device_data, dp_rate_limit.get_minimal_limit(), self.max_payload_size)
784806
if attributes_format:
785807
split_messages = [{'message': {device: msg_data}, 'datapoints': len(msg_data)} for split_message in device_split_messages for msg_data in split_message['data']]
786808
else:
787809
split_messages = [{'message': {device: split_message['data']}, 'datapoints': split_message['datapoints']} for split_message in device_split_messages]
788810
else:
789-
split_messages = [{'message': data, 'datapoints': 0}]
811+
split_messages = [{'message': data, 'datapoints': self._count_datapoints_in_message(data, device)}]
790812

791813
results = []
792814
for part in split_messages:
@@ -801,10 +823,18 @@ def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate
801823
kwargs["payload"] = dumps(part['message'])
802824
self.wait_until_current_queued_messages_processed()
803825
if not self.stopped:
826+
if device is not None:
827+
log.debug("Device: %s, Sending message to topic: %s ", device, topic)
804828
if part['datapoints'] > 0:
805829
log.debug("Sending message with %i datapoints", part['datapoints'])
830+
log.debug("Message payload: %r", kwargs["payload"])
831+
log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__)
832+
log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__)
806833
else:
807834
log.debug("Sending message with %r", kwargs["payload"])
835+
log.debug("Message payload: %r", kwargs["payload"])
836+
log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__)
837+
log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__)
808838
results.append(self._client.publish(**kwargs))
809839
return TBPublishInfo(results)
810840

@@ -956,14 +986,18 @@ def _count_datapoints_in_message(data, device=None):
956986
if isinstance(data.get(device), list):
957987
for data_object in data[device]:
958988
datapoints += TBDeviceMqttClient._count_datapoints_in_message(data_object) # noqa
959-
else:
989+
elif isinstance(data.get(device), dict):
960990
datapoints += TBDeviceMqttClient._count_datapoints_in_message(data.get(device, data.get('device')))
991+
else:
992+
datapoints += 1
961993
else:
962994
if isinstance(data, dict):
963995
datapoints += TBDeviceMqttClient._get_data_points_from_message(data)
964-
else:
996+
elif isinstance(data, list):
965997
for item in data:
966998
datapoints += TBDeviceMqttClient._get_data_points_from_message(item)
999+
else:
1000+
datapoints += 1
9671001
return datapoints
9681002

9691003
@staticmethod

tb_gateway_mqtt.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -300,18 +300,18 @@ def gw_claim(self, device_name, secret_key, duration, claiming_request=None):
300300
return info
301301

302302
def _add_device_rate_limit(self, device_name):
303-
telemetry_rate_limit = RateLimit(self.__device_telemetry_rate_limit)
304-
telemetry_dp_rate_limit = RateLimit(self.__device_telemetry_dp_rate_limit)
303+
telemetry_rate_limit = RateLimit(self.__device_telemetry_rate_limit, "Rate limit for device %s telemetry messages" % device_name)
304+
telemetry_dp_rate_limit = RateLimit(self.__device_telemetry_dp_rate_limit, "Rate limit for device %s telemetry data points" % device_name)
305305
msg_dp_rate_limit = self.EMPTY_RATE_LIMIT
306-
msg_rate_limit = RateLimit(self.__device_messages_rate_limit)
306+
msg_rate_limit = RateLimit(self.__device_messages_rate_limit, "Rate limit for device %s messages" % device_name)
307307
self._devices_rate_limit[device_name] = {
308308
'msg_rate_limit': msg_rate_limit,
309309
'dp_rate_limit': msg_dp_rate_limit,
310310
'telemetry_msg_rate_limit': telemetry_rate_limit,
311311
'telemetry_dp_rate_limit': telemetry_dp_rate_limit
312312
}
313313

314-
def _change_devices_rate_limit(self, rate_limit_key, rate_limit_value, percentage=80):
314+
def _change_devices_rate_limit(self, rate_limit_key, rate_limit_value, percentage=50):
315315
for device in self._devices_rate_limit.values():
316316
device[rate_limit_key].set_limit(rate_limit_value, percentage=percentage)
317317

0 commit comments

Comments
 (0)