Skip to content

Commit 288b068

Browse files
authored
Merge pull request #62 from samson0v/master
Added rate limit for gateway devices
2 parents 45565cd + 44b68f2 commit 288b068

File tree

2 files changed

+116
-56
lines changed

2 files changed

+116
-56
lines changed

tb_device_mqtt.py

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import logging
15-
import uuid
1615
from collections import deque
1716
from inspect import signature
1817
from time import sleep
@@ -235,6 +234,22 @@ def get_minimal_limit(self):
235234
def get_minimal_timeout(self):
236235
return self.__minimal_timeout
237236

237+
@staticmethod
238+
def get_rate_limit_by_host(host, rate_limit):
239+
if rate_limit == "DEFAULT_RATE_LIMIT":
240+
if "thingsboard.cloud" in host:
241+
rate_limit = "8:1,450:60,30000:3600,"
242+
elif "tb" in host and "cloud" in host:
243+
rate_limit = "8:1,450:60,30000:3600,"
244+
elif "demo.thingsboard.io" in host:
245+
rate_limit = "8:1,450:60,30000:3600,"
246+
else:
247+
rate_limit = "0:0,"
248+
else:
249+
rate_limit = rate_limit
250+
251+
return rate_limit
252+
238253

239254
class TBQueue:
240255
def __init__(self, maxsize=None):
@@ -329,17 +344,7 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser
329344
self.__device_sub_dict = {}
330345
self.__device_client_rpc_dict = {}
331346
self.__attr_request_number = 0
332-
if rate_limit == "DEFAULT_RATE_LIMIT":
333-
if "thingsboard.cloud" in self.__host:
334-
rate_limit = "8:1,450:60,30000:3600,"
335-
elif "tb" in self.__host and "cloud" in self.__host:
336-
rate_limit = "8:1,450:60,30000:3600,"
337-
elif "demo.thingsboard.io" in self.__host:
338-
rate_limit = "8:1,450:60,30000:3600,"
339-
else:
340-
rate_limit = "0:0,"
341-
else:
342-
rate_limit = rate_limit
347+
rate_limit = RateLimit.get_rate_limit_by_host(self.__host, rate_limit)
343348
self.__rate_limit = RateLimit(rate_limit)
344349
self.max_inflight_messages_set(self.__rate_limit.get_minimal_limit())
345350
self.__attrs_request_timeout = {}
@@ -662,7 +667,7 @@ def _send_request(self, type, kwargs, timeout=DEFAULT_TIMEOUT):
662667
return TBPublishInfo(self._client.publish(**kwargs))
663668
elif type == TBSendMethod.SUBSCRIBE:
664669
self.__rate_limit.add_counter()
665-
return TBPublishInfo(self._client.subscribe(**kwargs))
670+
return self._client.subscribe(**kwargs)
666671
elif type == TBSendMethod.UNSUBSCRIBE:
667672
self.__rate_limit.add_counter()
668673
return TBPublishInfo(self._client.unsubscribe(**kwargs))
@@ -674,7 +679,7 @@ def _subscribe_to_topic(self, topic, qos=None, timeout=DEFAULT_TIMEOUT):
674679
waiting_for_connection_message_time = 0
675680
while not self.is_connected():
676681
if self.stopped:
677-
return -1, 128
682+
return TBPublishInfo(paho.MQTTMessageInfo(None))
678683
if time() - waiting_for_connection_message_time > 10.0:
679684
log.warning("Waiting for connection to be established before subscribing for data on ThingsBoard!")
680685
waiting_for_connection_message_time = time()

tb_gateway_mqtt.py

Lines changed: 97 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,16 @@
1414
#
1515

1616
import logging
17-
import time
18-
from tb_device_mqtt import TBDeviceMqttClient
17+
from time import sleep
18+
19+
import paho.mqtt.client as paho
20+
21+
try:
22+
from time import monotonic as time
23+
except ImportError:
24+
from time import time
25+
26+
from tb_device_mqtt import TBDeviceMqttClient, RateLimit, TBPublishInfo, TBSendMethod
1927

2028
GATEWAY_ATTRIBUTES_TOPIC = "v1/gateway/attributes"
2129
GATEWAY_ATTRIBUTES_REQUEST_TOPIC = "v1/gateway/attributes/request"
@@ -37,9 +45,11 @@ def __init__(self, host, port=1883, username=None, password=None, gateway=None,
3745
rate_limit="DEFAULT_RATE_LIMIT"):
3846
super().__init__(host, port, username, password, quality_of_service, client_id, rate_limit=rate_limit)
3947
self.quality_of_service = quality_of_service
48+
self._rate_limit = RateLimit.get_rate_limit_by_host(host, rate_limit)
4049
self.__max_sub_id = 0
4150
self.__sub_dict = {}
4251
self.__connected_devices = set("*")
52+
self._devices_rate_limit = {}
4353
self.devices_server_side_rpc_request_handler = None
4454
self._client.on_connect = self._on_connect
4555
self._client.on_message = self._on_message
@@ -51,44 +61,38 @@ def __init__(self, host, port=1883, username=None, password=None, gateway=None,
5161
def _on_connect(self, client, userdata, flags, result_code, *extra_params):
5262
super()._on_connect(client, userdata, flags, result_code, *extra_params)
5363
if result_code == 0:
54-
gateway_attributes_topic_sub_id = int(self._subscribe_to_topic(GATEWAY_ATTRIBUTES_TOPIC, qos=1,
55-
wait_for_result=True)[1])
56-
if gateway_attributes_topic_sub_id == 128:
57-
log.error("Service subscription to topic %s - failed.", GATEWAY_ATTRIBUTES_TOPIC)
58-
if gateway_attributes_topic_sub_id in self._gw_subscriptions:
59-
del self._gw_subscriptions[gateway_attributes_topic_sub_id]
60-
else:
61-
self._gw_subscriptions[gateway_attributes_topic_sub_id] = GATEWAY_ATTRIBUTES_TOPIC
62-
gateway_attributes_resp_sub_id = int(self._subscribe_to_topic(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, qos=1,
63-
wait_for_result=True)[1])
64-
if gateway_attributes_resp_sub_id == 128:
65-
log.error("Service subscription to topic %s - failed.", GATEWAY_ATTRIBUTES_RESPONSE_TOPIC)
66-
if gateway_attributes_resp_sub_id in self._gw_subscriptions:
67-
del self._gw_subscriptions[gateway_attributes_resp_sub_id]
68-
else:
69-
self._gw_subscriptions[gateway_attributes_resp_sub_id] = GATEWAY_ATTRIBUTES_RESPONSE_TOPIC
70-
gateway_rpc_topic_sub_id = int(self._subscribe_to_topic(GATEWAY_RPC_TOPIC, qos=1,
71-
wait_for_result=True)[1])
72-
if gateway_rpc_topic_sub_id == 128:
73-
log.error("Service subscription to topic %s - failed.", GATEWAY_RPC_TOPIC)
74-
if gateway_rpc_topic_sub_id in self._gw_subscriptions:
75-
del self._gw_subscriptions[gateway_rpc_topic_sub_id]
76-
else:
77-
self._gw_subscriptions[gateway_rpc_topic_sub_id] = GATEWAY_RPC_TOPIC
78-
# gateway_rpc_topic_response_sub_id = int(self._client.subscribe(GATEWAY_RPC_RESPONSE_TOPIC)[1])
79-
# self._gw_subscriptions[gateway_rpc_topic_response_sub_id] = GATEWAY_RPC_RESPONSE_TOPIC
64+
d = self._subscribe_to_topic(GATEWAY_ATTRIBUTES_TOPIC, qos=1)
65+
gateway_attributes_topic_sub_id = int(self._subscribe_to_topic(GATEWAY_ATTRIBUTES_TOPIC, qos=1)[1])
66+
self._add_or_delete_subscription(GATEWAY_ATTRIBUTES_TOPIC, gateway_attributes_topic_sub_id)
67+
68+
gateway_attributes_resp_sub_id = int(self._subscribe_to_topic(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, qos=1)[1])
69+
self._add_or_delete_subscription(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, gateway_attributes_resp_sub_id)
70+
71+
gateway_rpc_topic_sub_id = int(self._subscribe_to_topic(GATEWAY_RPC_TOPIC, qos=1)[1])
72+
self._add_or_delete_subscription(GATEWAY_RPC_TOPIC, gateway_rpc_topic_sub_id)
8073

8174
def _on_subscribe(self, client, userdata, mid, reasoncodes, properties=None):
8275
subscription = self._gw_subscriptions.get(mid)
8376
if subscription is not None:
8477
if mid == 128:
85-
log.error("Service subscription to topic %s - failed.", subscription)
86-
del self._gw_subscriptions[mid]
78+
self._delete_subscription(subscription, mid)
8779
else:
8880
log.debug("Service subscription to topic %s - successfully completed.", subscription)
8981
del self._gw_subscriptions[mid]
9082

91-
def _on_unsubscribe(self, *args):
83+
def _delete_subscription(self, topic, subscription_id):
84+
log.error("Service subscription to topic %s - failed.", topic)
85+
if subscription_id in self._gw_subscriptions:
86+
del self._gw_subscriptions[subscription_id]
87+
88+
def _add_or_delete_subscription(self, topic, subscription_id):
89+
if subscription_id == 128:
90+
self._delete_subscription(topic, subscription_id)
91+
else:
92+
self._gw_subscriptions[subscription_id] = topic
93+
94+
@staticmethod
95+
def _on_unsubscribe(*args):
9296
log.debug(args)
9397

9498
def get_subscriptions_in_progress(self):
@@ -140,42 +144,66 @@ def __request_attributes(self, device, keys, callback, type_is_client=False):
140144
log.error("There are no keys to request")
141145
return False
142146

143-
ts_in_millis = int(round(time.time() * 1000))
147+
ts_in_millis = int(round(time() * 1000))
144148
attr_request_number = self._add_attr_request_callback(callback)
145149
msg = {"keys": keys,
146150
"device": device,
147151
"client": type_is_client,
148152
"id": attr_request_number}
149-
info = self._publish_data(msg, GATEWAY_ATTRIBUTES_REQUEST_TOPIC, 1, high_priority=True)
153+
info = self._send_device_request(TBSendMethod.PUBLISH, device, topic=GATEWAY_ATTRIBUTES_REQUEST_TOPIC, data=msg,
154+
qos=1)
150155
self._add_timeout(attr_request_number, ts_in_millis + 30000)
151156
return info
152157

158+
def _send_device_request(self, _type, device_name, **kwargs):
159+
if _type == TBSendMethod.PUBLISH:
160+
is_reached = self.check_device_rate_limit(device_name)
161+
if is_reached:
162+
return is_reached
163+
164+
info = self._publish_data(**kwargs)
165+
return info
166+
153167
def gw_request_shared_attributes(self, device_name, keys, callback):
154168
return self.__request_attributes(device_name, keys, callback, False)
155169

156170
def gw_request_client_attributes(self, device_name, keys, callback):
157171
return self.__request_attributes(device_name, keys, callback, True)
158172

159173
def gw_send_attributes(self, device, attributes, quality_of_service=1):
160-
return self._publish_data({device: attributes}, GATEWAY_MAIN_TOPIC + "attributes", quality_of_service)
174+
return self._send_device_request(TBSendMethod.PUBLISH,
175+
device,
176+
topic=GATEWAY_MAIN_TOPIC + "attributes",
177+
data={device: attributes},
178+
qos=quality_of_service)
161179

162180
def gw_send_telemetry(self, device, telemetry, quality_of_service=1):
163181
if not isinstance(telemetry, list) and not (isinstance(telemetry, dict) and telemetry.get("ts") is not None):
164182
telemetry = [telemetry]
165-
return self._publish_data({device: telemetry}, GATEWAY_MAIN_TOPIC + "telemetry", quality_of_service)
183+
184+
return self._send_device_request(TBSendMethod.PUBLISH,
185+
device,
186+
topic=GATEWAY_MAIN_TOPIC + "telemetry",
187+
data={device: telemetry},
188+
qos=quality_of_service)
166189

167190
def gw_connect_device(self, device_name, device_type="default"):
168-
info = self._publish_data({"device": device_name, "type": device_type}, GATEWAY_MAIN_TOPIC + "connect",
169-
self.quality_of_service)
191+
info = self._send_device_request(TBSendMethod.PUBLISH, device_name, topic=GATEWAY_MAIN_TOPIC + "connect",
192+
data={"device": device_name, "type": device_type},
193+
qos=self.quality_of_service)
194+
170195
self.__connected_devices.add(device_name)
196+
171197
log.debug("Connected device %s", device_name)
172198
return info
173199

174200
def gw_disconnect_device(self, device_name):
175-
info = self._publish_data({"device": device_name}, GATEWAY_MAIN_TOPIC + "disconnect",
176-
self.quality_of_service)
201+
info = self._send_device_request(TBSendMethod.PUBLISH, device_name, topic=GATEWAY_MAIN_TOPIC + "disconnect",
202+
data={"device": device_name}, qos=self.quality_of_service)
203+
177204
if device_name in self.__connected_devices:
178205
self.__connected_devices.remove(device_name)
206+
179207
log.debug("Disconnected device %s", device_name)
180208
return info
181209

@@ -217,8 +245,10 @@ def gw_send_rpc_reply(self, device, req_id, resp, quality_of_service=None):
217245
if quality_of_service not in (0, 1):
218246
log.error("Quality of service (qos) value must be 0 or 1")
219247
return None
220-
info = self._publish_data({"device": device, "id": req_id, "data": resp}, GATEWAY_RPC_TOPIC,
221-
quality_of_service)
248+
249+
info = self._send_device_request(TBSendMethod.PUBLISH, device, topic=GATEWAY_RPC_TOPIC,
250+
data={"device": device, "id": req_id, "data": resp},
251+
qos=quality_of_service)
222252
return info
223253

224254
def gw_claim(self, device_name, secret_key, duration, claiming_request=None):
@@ -229,5 +259,30 @@ def gw_claim(self, device_name, secret_key, duration, claiming_request=None):
229259
"durationMs": duration
230260
}
231261
}
232-
info = self._publish_data(claiming_request, GATEWAY_CLAIMING_TOPIC, self.quality_of_service)
262+
263+
info = self._send_device_request(TBSendMethod.PUBLISH, device_name, topic=GATEWAY_CLAIMING_TOPIC,
264+
data=claiming_request, qos=self.quality_of_service)
233265
return info
266+
267+
def _add_device_rate_limit(self, device_name):
268+
rate_limit = RateLimit(self._rate_limit)
269+
self._devices_rate_limit[device_name] = {'rate_limit': rate_limit}
270+
271+
def check_device_rate_limit(self, device_name):
272+
if self._devices_rate_limit.get(device_name) is None:
273+
self._add_device_rate_limit(device_name)
274+
275+
is_reached = self._check_device_rate_limit(device_name)
276+
if is_reached:
277+
return is_reached
278+
279+
self._devices_rate_limit[device_name]['rate_limit'].add_counter()
280+
281+
def _check_device_rate_limit(self, device_name):
282+
start_time = time()
283+
timeout = self._devices_rate_limit[device_name]['rate_limit'].get_minimal_timeout()
284+
while self._devices_rate_limit[device_name]['rate_limit'].check_limit_reached():
285+
if time() >= timeout + start_time:
286+
log.error("Timeout while waiting for rate limit to be released!")
287+
return TBPublishInfo(paho.MQTTMessageInfo(None))
288+
sleep(0.001)

0 commit comments

Comments
 (0)