Skip to content

Commit 5da1d55

Browse files
committed
Updated rate limits processing due to realisation on ThingsBoard
1 parent 0cfd1b7 commit 5da1d55

File tree

3 files changed

+31
-64
lines changed

3 files changed

+31
-64
lines changed

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
with open(path.join(this_directory, 'README.md')) as f:
2222
long_description = f.read()
2323

24-
VERSION = "1.10.4"
24+
VERSION = "1.10.5"
2525

2626
setup(
2727
version=VERSION,

tb_device_mqtt.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ def __init__(self, rate_limit, name=None, percentage=80):
176176
self.__rate_limit_dict[int(rate[1])] = {"counter": 0,
177177
"start": int(monotonic()),
178178
"limit": int(int(rate[0]) * self.percentage / 100)}
179-
log.debug("Rate limit set to values: ")
179+
log.debug("Rate limit %s set to values: " % self.name)
180180
with self.__lock:
181181
if not self.__no_limit:
182182
for rate_limit_time in self.__rate_limit_dict:
@@ -701,6 +701,8 @@ def _wait_for_rate_limit_released(self, timeout, message_rate_limit, dp_rate_lim
701701
limit_reached_check = (message_rate_limit.check_limit_reached()
702702
or (dp_rate_limit is not None and dp_rate_limit.check_limit_reached(amount=amount))
703703
or not self.is_connected())
704+
if timeout < limit_reached_check:
705+
timeout = limit_reached_check
704706
if not timeout_updated and limit_reached_check:
705707
timeout += 10
706708
timeout_updated = True
@@ -711,12 +713,12 @@ def _wait_for_rate_limit_released(self, timeout, message_rate_limit, dp_rate_lim
711713
disconnected = True
712714
timeout = max(timeout, 180) + 10
713715
if int(monotonic()) >= timeout + start_time:
714-
log.error("Timeout while waiting for rate limit to be released!")
716+
log.warning("Timeout while waiting for rate limit for %i seconds to be released!", limit_reached_check)
715717
return TBPublishInfo(paho.MQTTMessageInfo(None))
716718
if not log_posted and limit_reached_check:
717719
if isinstance(limit_reached_check, int):
718-
log.debug("Rate limit reached for %i seconds, waiting for rate limit to be released...",
719-
limit_reached_check)
720+
log.warning("Rate limit reached for %i seconds, waiting for rate limit to be released...",
721+
limit_reached_check)
720722
waited = True
721723
else:
722724
log.debug("Waiting for rate limit to be released...")
@@ -815,10 +817,12 @@ def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate
815817
if not part:
816818
continue
817819
dp_rate_limit.increase_rate_limit_counter(part['datapoints'])
818-
self._wait_for_rate_limit_released(timeout,
819-
message_rate_limit=msg_rate_limit,
820-
dp_rate_limit=dp_rate_limit,
821-
amount=part['datapoints'])
820+
rate_limited = self._wait_for_rate_limit_released(timeout,
821+
message_rate_limit=msg_rate_limit,
822+
dp_rate_limit=dp_rate_limit,
823+
amount=part['datapoints'])
824+
if rate_limited:
825+
return rate_limited
822826
msg_rate_limit.increase_rate_limit_counter()
823827
kwargs["payload"] = dumps(part['message'])
824828
self.wait_until_current_queued_messages_processed()
@@ -1004,11 +1008,11 @@ def _count_datapoints_in_message(data, device=None):
10041008
def _get_data_points_from_message(data):
10051009
if isinstance(data, dict):
10061010
if data.get("ts") is not None and data.get("values") is not None:
1007-
datapoints_in_message_amount = len(data['values']) + len(str(data['values']))/1000
1011+
datapoints_in_message_amount = len(data['values']) + len(str(data['values'])) / 1000
10081012
else:
1009-
datapoints_in_message_amount = len(data.keys()) + len(str(data))/1000
1013+
datapoints_in_message_amount = len(data.keys()) + len(str(data)) / 1000
10101014
else:
1011-
datapoints_in_message_amount = len(data) + len(str(data))/1000
1015+
datapoints_in_message_amount = len(data) + len(str(data)) / 1000
10121016
return int(datapoints_in_message_amount)
10131017

10141018
@staticmethod

tb_gateway_mqtt.py

Lines changed: 15 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,15 @@ def __init__(self, host, port=1883, username=None, password=None, gateway=None,
6363
host, device_telemetry_rate_limit, device_telemetry_dp_rate_limit)
6464
self.__device_messages_rate_limit = RateLimit.get_rate_limit_by_host(host, device_messages_rate_limit)
6565

66+
self._devices_connected_through_gateway_telemetry_messages_rate_limit = RateLimit(self.__device_telemetry_rate_limit, "Rate limit for devices connected through gateway telemetry messages")
67+
self._devices_connected_through_gateway_telemetry_datapoints_rate_limit = RateLimit(self.__device_telemetry_dp_rate_limit, "Rate limit for devices connected through gateway telemetry data points")
68+
self._devices_connected_through_gateway_messages_rate_limit = RateLimit(self.__device_messages_rate_limit, "Rate limit for devices connected through gateway messages")
69+
6670
self.service_configuration_callback = self.__on_service_configuration
6771
self.quality_of_service = quality_of_service
6872
self.__max_sub_id = 0
6973
self.__sub_dict = {}
7074
self.__connected_devices = set("*")
71-
self._devices_rate_limit = {}
7275
self.devices_server_side_rpc_request_handler = None
7376
self._client.on_connect = self._on_connect
7477
self._client.on_message = self._on_message
@@ -125,6 +128,7 @@ def _on_decoded_message(self, content, message, **kwargs):
125128
if message.topic.startswith(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC):
126129
with self._lock:
127130
req_id = content["id"]
131+
self._devices_connected_through_gateway_messages_rate_limit.increase_rate_limit_counter(1)
128132
# pop callback and use it
129133
if self._attr_request_dict[req_id]:
130134
callback = self._attr_request_dict.pop(req_id)
@@ -143,6 +147,7 @@ def _on_decoded_message(self, content, message, **kwargs):
143147
# callbacks for device. in this case callback executes for all attributes in message
144148
if content.get("device") is None:
145149
return
150+
self._devices_connected_through_gateway_messages_rate_limit.increase_rate_limit_counter(1)
146151
target = content["device"] + "|*"
147152
if self.__sub_dict.get(target):
148153
for device in self.__sub_dict[target]:
@@ -154,6 +159,7 @@ def _on_decoded_message(self, content, message, **kwargs):
154159
for device in self.__sub_dict[target]:
155160
self.__sub_dict[target][device](content)
156161
elif message.topic == GATEWAY_RPC_TOPIC:
162+
self._devices_connected_through_gateway_messages_rate_limit.increase_rate_limit_counter(1)
157163
if self.devices_server_side_rpc_request_handler:
158164
self.devices_server_side_rpc_request_handler(self, content)
159165

@@ -162,26 +168,23 @@ def __request_attributes(self, device, keys, callback, type_is_client=False):
162168
log.error("There are no keys to request")
163169
return False
164170

165-
ts_in_millis = int(round(time() * 1000))
166171
attr_request_number = self._add_attr_request_callback(callback)
167172
msg = {"keys": keys,
168173
"device": device,
169174
"client": type_is_client,
170175
"id": attr_request_number}
171176
info = self._send_device_request(TBSendMethod.PUBLISH, device, topic=GATEWAY_ATTRIBUTES_REQUEST_TOPIC, data=msg,
172177
qos=1)
173-
self._add_timeout(attr_request_number, ts_in_millis + 30000)
178+
self.__attrs_request_timeout[attr_request_number] = int(time()) + 20
174179
return info
175180

176181
def _send_device_request(self, _type, device_name, **kwargs):
177182
if _type == TBSendMethod.PUBLISH:
178-
if self._devices_rate_limit.get(device_name) is None:
179-
self._add_device_rate_limit(device_name)
180-
device_msg_rate_limit = self._devices_rate_limit[device_name]['msg_rate_limit']
181-
device_dp_rate_limit = self._devices_rate_limit[device_name]['dp_rate_limit']
183+
device_msg_rate_limit = self._devices_connected_through_gateway_messages_rate_limit
184+
device_dp_rate_limit = self.EMPTY_RATE_LIMIT
182185
if kwargs.get('topic') == GATEWAY_TELEMETRY_TOPIC:
183-
device_msg_rate_limit = self._devices_rate_limit[device_name]['telemetry_msg_rate_limit']
184-
device_dp_rate_limit = self._devices_rate_limit[device_name]['telemetry_dp_rate_limit']
186+
device_msg_rate_limit = self._devices_connected_through_gateway_telemetry_messages_rate_limit
187+
device_dp_rate_limit = self._devices_connected_through_gateway_telemetry_datapoints_rate_limit
185188
info = self._publish_data(**kwargs, device=device_name,
186189
msg_rate_limit=device_msg_rate_limit,
187190
dp_rate_limit=device_dp_rate_limit)
@@ -242,16 +245,6 @@ def gw_subscribe_to_attribute(self, device, attribute, callback):
242245
return False
243246

244247
with self._lock:
245-
# if device == '*':
246-
# for device_name in self._devices_rate_limit.keys():
247-
# is_reached = self.check_device_rate_limit(device_name)
248-
# if is_reached:
249-
# return is_reached
250-
# else:
251-
# is_reached = self.check_device_rate_limit(device)
252-
# if is_reached:
253-
# return is_reached
254-
255248
self.__max_sub_id += 1
256249
key = device + "|" + attribute
257250
if key not in self.__sub_dict:
@@ -299,22 +292,6 @@ def gw_claim(self, device_name, secret_key, duration, claiming_request=None):
299292
data=claiming_request, qos=self.quality_of_service)
300293
return info
301294

302-
def _add_device_rate_limit(self, device_name):
303-
telemetry_rate_limit = RateLimit(self.__device_telemetry_rate_limit, "Rate limit for device %s telemetry messages" % device_name)
304-
telemetry_dp_rate_limit = RateLimit(self.__device_telemetry_dp_rate_limit, "Rate limit for device %s telemetry data points" % device_name)
305-
msg_dp_rate_limit = self.EMPTY_RATE_LIMIT
306-
msg_rate_limit = RateLimit(self.__device_messages_rate_limit, "Rate limit for device %s messages" % device_name)
307-
self._devices_rate_limit[device_name] = {
308-
'msg_rate_limit': msg_rate_limit,
309-
'dp_rate_limit': msg_dp_rate_limit,
310-
'telemetry_msg_rate_limit': telemetry_rate_limit,
311-
'telemetry_dp_rate_limit': telemetry_dp_rate_limit
312-
}
313-
314-
def _change_devices_rate_limit(self, rate_limit_key, rate_limit_value, percentage=50):
315-
for device in self._devices_rate_limit.values():
316-
device[rate_limit_key].set_limit(rate_limit_value, percentage=percentage)
317-
318295
def __on_service_configuration(self, _, response, *args, **kwargs):
319296
if "error" in response:
320297
log.warning("Timeout while waiting for service configuration!, session will use default configuration.")
@@ -327,23 +304,9 @@ def __on_service_configuration(self, _, response, *args, **kwargs):
327304
super().on_service_configuration(_, {'rateLimit': gateway_device_itself_rate_limit_config, **service_config}, *args, **kwargs)
328305

329306
if gateway_devices_rate_limit_config.get("messages"):
330-
# change global rate limit for future devices
331-
self.__device_messages_rate_limit = gateway_devices_rate_limit_config.get('messages')
332-
333-
# change rate limit for already connected devices
334-
self._change_devices_rate_limit('msg_rate_limit', gateway_devices_rate_limit_config.get('messages'))
307+
self._devices_connected_through_gateway_messages_rate_limit.set_limit(gateway_devices_rate_limit_config.get("messages"))
335308
if gateway_devices_rate_limit_config.get('telemetryMessages'):
336-
# change global rate limit for future devices
337-
self.__device_telemetry_rate_limit = gateway_devices_rate_limit_config.get('telemetryMessages')
338-
339-
# change rate limit for already connected devices
340-
self._change_devices_rate_limit('telemetry_msg_rate_limit',
341-
gateway_devices_rate_limit_config.get('telemetryMessages'))
309+
self._devices_connected_through_gateway_telemetry_messages_rate_limit.set_limit(gateway_devices_rate_limit_config.get('telemetryMessages'))
342310
if gateway_devices_rate_limit_config.get('telemetryDataPoints'):
343-
# change global rate limit for future devices
344-
self.__device_telemetry_dp_rate_limit = gateway_devices_rate_limit_config.get('telemetryDataPoints')
345-
346-
# change rate limit for already connected devices
347-
self._change_devices_rate_limit('telemetry_dp_rate_limit',
348-
gateway_devices_rate_limit_config.get('telemetryDataPoints'))
311+
self._devices_connected_through_gateway_telemetry_datapoints_rate_limit.set_limit(gateway_devices_rate_limit_config.get('telemetryDataPoints'))
349312
self.rate_limits_received = True

0 commit comments

Comments
 (0)