Skip to content

Commit 45565cd

Browse files
authored
Merge pull request #61 from samson0v/master
Refactored rate limits
2 parents 85f1f10 + 069d13c commit 45565cd

File tree

1 file changed

+60
-140
lines changed

1 file changed

+60
-140
lines changed

tb_device_mqtt.py

Lines changed: 60 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
from time import monotonic as time
2525
except ImportError:
2626
from time import time
27-
import queue
2827
import ssl
2928
from threading import Lock, RLock, Thread, Condition
3029
from enum import Enum
@@ -123,6 +122,7 @@ def get_credentials(self):
123122
class TBSendMethod(Enum):
124123
SUBSCRIBE = 0
125124
PUBLISH = 1
125+
UNSUBSCRIBE = 2
126126

127127

128128
class TBPublishInfo:
@@ -165,7 +165,7 @@ class TBPublishInfo:
165165
16: 'The keepalive time has been exceeded.'
166166
}
167167

168-
def __init__(self, message_info):
168+
def __init__(self, message_info: paho.MQTTMessageInfo):
169169
self.message_info = message_info
170170

171171
# pylint: disable=invalid-name
@@ -196,43 +196,44 @@ def __init__(self, rate_limit):
196196
if rate == "":
197197
continue
198198
rate = rate.split(":")
199-
self.__rate_limit_dict[int(rate[1])] = {"queue": queue.Queue(int(rate[0])), "start": time()}
199+
self.__rate_limit_dict[int(rate[1])] = {"counter": 0, "start": time(), "limit": int(rate[0])}
200200
log.debug("Rate limit set to values: ")
201-
for rate_limit_time in self.__rate_limit_dict:
202-
log.debug("Time: %s, Limit: %s", rate_limit_time,
203-
self.__rate_limit_dict[rate_limit_time]["queue"].maxsize)
201+
self.__minimal_timeout = DEFAULT_TIMEOUT * 10
202+
self.__minimal_limit = 1000000000
203+
if not self.__no_limit:
204+
for rate_limit_time in self.__rate_limit_dict:
205+
log.debug("Time: %s, Limit: %s", rate_limit_time,
206+
self.__rate_limit_dict[rate_limit_time]["limit"])
207+
if self.__rate_limit_dict[rate_limit_time]["limit"] < self.__minimal_limit:
208+
self.__minimal_limit = self.__rate_limit_dict[rate_limit_time]["limit"]
209+
if rate_limit_time < self.__minimal_limit:
210+
self.__minimal_timeout = rate_limit_time + 1
204211

205212
def add_counter(self):
206213
if self.__no_limit:
207214
return
208215
with self.__lock:
209216
for rate_limit_time in self.__rate_limit_dict:
210-
self.__rate_limit_dict[rate_limit_time]["queue"].put(1)
217+
self.__rate_limit_dict[rate_limit_time]["counter"] += 1
211218

212219
def check_limit_reached(self):
213220
if self.__no_limit:
214221
return False
215-
with self.__lock:
216-
for rate_limit_time in self.__rate_limit_dict:
217-
rate_limit_point_queue = self.__rate_limit_dict[rate_limit_time]["queue"]
218-
if self.__rate_limit_dict[rate_limit_time]["start"] + rate_limit_time < time():
219-
self.__rate_limit_dict[rate_limit_time]["start"] = time()
220-
rate_limit_point_queue = queue.Queue(rate_limit_point_queue.maxsize)
221-
self.__rate_limit_dict[rate_limit_time]["queue"] = rate_limit_point_queue
222-
if rate_limit_point_queue.full():
223-
log.debug("Rate limit exceeded for %s second", rate_limit_time)
224-
log.debug("Queue size: %s", rate_limit_point_queue.qsize())
225-
return True
226-
return False
222+
for rate_limit_time, rate_limit_info in self.__rate_limit_dict.items():
223+
if self.__rate_limit_dict[rate_limit_time]["start"] + rate_limit_time <= time():
224+
self.__rate_limit_dict[rate_limit_time]["start"] = time()
225+
self.__rate_limit_dict[rate_limit_time]["counter"] = 0
226+
if rate_limit_info['counter'] >= rate_limit_info['limit']:
227+
log.debug("Rate limit exceeded for %s second", rate_limit_time)
228+
log.debug("Rate limit counter: %s", rate_limit_info['counter'])
229+
return True
230+
return False
227231

228232
def get_minimal_limit(self):
229-
minimal_limit = 1000000000
230-
if self.__no_limit:
231-
return 1000000000
232-
for rate_limit_time in self.__rate_limit_dict:
233-
if self.__rate_limit_dict[rate_limit_time]["queue"].maxsize < minimal_limit:
234-
minimal_limit = self.__rate_limit_dict[rate_limit_time]["queue"].maxsize
235-
return minimal_limit
233+
return self.__minimal_limit
234+
235+
def get_minimal_timeout(self):
236+
return self.__minimal_timeout
236237

237238

238239
class TBQueue:
@@ -328,7 +329,6 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser
328329
self.__device_sub_dict = {}
329330
self.__device_client_rpc_dict = {}
330331
self.__attr_request_number = 0
331-
# rate_limit = rate_limit if rate_limit != "DEFAULT_RATE_LIMIT" else "8:1;2000:60;30000:3600;"
332332
if rate_limit == "DEFAULT_RATE_LIMIT":
333333
if "thingsboard.cloud" in self.__host:
334334
rate_limit = "8:1,450:60,30000:3600,"
@@ -342,15 +342,6 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser
342342
rate_limit = rate_limit
343343
self.__rate_limit = RateLimit(rate_limit)
344344
self.max_inflight_messages_set(self.__rate_limit.get_minimal_limit())
345-
self.__sending_queue = TBQueue()
346-
self.__sending_queue_warning_published = 0
347-
self.__responses = {}
348-
self.__sending_thread = Thread(target=self.__sending_thread_main, name="Sending thread", daemon=True)
349-
self.__sending_thread.start()
350-
self.__housekeeping_thread = Thread(target=self.__housekeeping_thread_main,
351-
name="Housekeeping thread",
352-
daemon=True)
353-
self.__housekeeping_thread.start()
354345
self.__attrs_request_timeout = {}
355346
self.__timeout_thread = Thread(target=self.__timeout_check)
356347
self.__timeout_thread.daemon = True
@@ -427,8 +418,7 @@ def __request_firmware_info(self):
427418
self.__request_id = self.__request_id + 1
428419
self._publish_data({"sharedKeys": REQUIRED_SHARED_KEYS},
429420
f"v1/devices/me/attributes/request/{self.__request_id}",
430-
1,
431-
high_priority=True)
421+
1)
432422

433423
def is_connected(self):
434424
return self.__is_connected
@@ -577,7 +567,7 @@ def __get_firmware(self):
577567
payload = '' if not self.__chunk_size or self.__chunk_size > self.firmware_info.get(FW_SIZE_ATTR, 0) \
578568
else str(self.__chunk_size).encode()
579569
self._publish_data(payload, f"v2/fw/request/{self.__firmware_request_id}/chunk/{self.__current_chunk}",
580-
1, high_priority=True)
570+
1)
581571

582572
def __on_firmware_received(self, version_to):
583573
with open(self.firmware_info.get(FW_TITLE_ATTR), "wb") as firmware_file:
@@ -640,7 +630,7 @@ def send_rpc_reply(self, req_id, resp, quality_of_service=None, wait_for_publish
640630
if quality_of_service not in (0, 1):
641631
log.error("Quality of service (qos) value must be 0 or 1")
642632
return None
643-
info = self._publish_data(resp, RPC_RESPONSE_TOPIC + req_id, quality_of_service, high_priority=True)
633+
info = self._publish_data(resp, RPC_RESPONSE_TOPIC + req_id, quality_of_service)
644634
if wait_for_publish:
645635
info.get()
646636

@@ -653,74 +643,33 @@ def send_rpc_call(self, method, params, callback):
653643
payload = {"method": method, "params": params}
654644
self._publish_data(payload,
655645
RPC_REQUEST_TOPIC + str(rpc_request_id),
656-
self.quality_of_service,
657-
high_priority=True)
646+
self.quality_of_service)
658647

659648
def set_server_side_rpc_request_handler(self, handler):
660649
"""Set the callback that will be called when a server-side RPC is received."""
661650
self.__device_on_server_side_rpc_response = handler
662-
663-
def __sending_thread_main(self):
664-
while not self.stopped:
665-
try:
666-
if not self.is_connected():
667-
sleep(.1)
668-
continue
669-
if not self.__rate_limit.check_limit_reached():
670-
if not self.__sending_queue.empty():
671-
item = self.__sending_queue.get(False)
672-
if item is not None:
673-
if item["method"] == TBSendMethod.PUBLISH:
674-
info = self._client.publish(item["topic"], item["data"], qos=item["qos"])
675-
if TBPublishInfo.TB_ERR_QUEUE_SIZE == info.rc:
676-
self.__sending_queue.put_left(item, True)
677-
continue
678-
self.__responses[item['id']] = {"info": info, "timeout_ts": int(time()) + DEFAULT_TIMEOUT}
679-
self.__rate_limit.add_counter()
680-
elif item["method"] == TBSendMethod.SUBSCRIBE:
681-
result = self._client.subscribe(item["topic"], qos=item["qos"])
682-
self.__responses[item['id']] = {"info": result, "timeout_ts": int(time()) + DEFAULT_TIMEOUT}
683-
self.__rate_limit.add_counter()
684-
else:
685-
sleep(.01)
686-
else:
687-
sleep(0.001)
688-
except Exception as e:
689-
log.exception("Error during data sending:", exc_info=e)
690-
sleep(1)
691-
692-
def __housekeeping_thread_main(self):
693-
while not self.stopped:
694-
if not self.__responses:
695-
sleep(0.1)
696-
else:
697-
for req_id in list(self.__responses.keys()):
698-
if int(time()) > self.__responses[req_id]["timeout_ts"]:
699-
try:
700-
if (req_id in self.__responses
701-
and ((self.__responses[req_id]["method"] == TBSendMethod.PUBLISH
702-
and self.__responses[req_id]["info"].is_published())
703-
or (self.__responses[req_id]["method"] == TBSendMethod.SUBSCRIBE))):
704-
self.__responses.pop(req_id)
705-
except (KeyError, AttributeError):
706-
pass
707-
except (Exception, RuntimeError, ValueError) as e:
708-
pass
709-
# log.debug("Error during housekeeping sent messages:", exc_info=e)
710-
# log.debug("Timeout occurred while waiting for a reply from ThingsBoard!")
711-
sleep(0.01)
712-
713-
def _subscribe_to_topic(self, topic, callback=None, qos=None, wait_for_result=False, timeout=DEFAULT_TIMEOUT):
651+
652+
def _send_request(self, type, kwargs, timeout=DEFAULT_TIMEOUT):
653+
start_time = time()
654+
timeout = max(self.__rate_limit.get_minimal_timeout(), timeout)
655+
while self.__rate_limit.check_limit_reached():
656+
if time() >= timeout + start_time:
657+
log.error("Timeout while waiting for rate limit to be released!")
658+
return TBPublishInfo(paho.MQTTMessageInfo(None))
659+
sleep(0.001)
660+
if type == TBSendMethod.PUBLISH:
661+
self.__rate_limit.add_counter()
662+
return TBPublishInfo(self._client.publish(**kwargs))
663+
elif type == TBSendMethod.SUBSCRIBE:
664+
self.__rate_limit.add_counter()
665+
return TBPublishInfo(self._client.subscribe(**kwargs))
666+
elif type == TBSendMethod.UNSUBSCRIBE:
667+
self.__rate_limit.add_counter()
668+
return TBPublishInfo(self._client.unsubscribe(**kwargs))
669+
670+
def _subscribe_to_topic(self, topic, qos=None, timeout=DEFAULT_TIMEOUT):
714671
if qos is None:
715672
qos = self.quality_of_service
716-
req_id = uuid.uuid4()
717-
self.__sending_queue.put_left({"topic": topic, "qos": qos, "callback": callback, "id": req_id,
718-
"method": TBSendMethod.SUBSCRIBE}, True)
719-
sending_queue_size = self.__sending_queue.qsize()
720-
if sending_queue_size > 1000000 and int(time()) - self.__sending_queue_warning_published > 5:
721-
self.__sending_queue_warning_published = int(time())
722-
log.warning("Sending queue is bigger than 1000000 messages (%r), consider increasing the rate limit, "
723-
"or decreasing the amount of messages sent!", sending_queue_size)
724673

725674
waiting_for_connection_message_time = 0
726675
while not self.is_connected():
@@ -731,35 +680,15 @@ def _subscribe_to_topic(self, topic, callback=None, qos=None, wait_for_result=Fa
731680
waiting_for_connection_message_time = time()
732681
sleep(0.01)
733682

734-
start_time = int(time())
735-
if wait_for_result:
736-
while req_id not in list(self.__responses.keys()):
737-
if 0 < timeout < int(time()) - start_time or self.stopped:
738-
log.error("Timeout while waiting for a subscribe to ThingsBoard!")
739-
return -1, 128
740-
sleep(0.01)
741-
742-
return self.__responses.pop(req_id)["info"]
683+
return self._send_request(TBSendMethod.SUBSCRIBE, {"topic": topic, "qos": qos}, timeout)
743684

744-
def _publish_data(self, data, topic, qos, wait_for_publish=True, high_priority=False, timeout=DEFAULT_TIMEOUT):
685+
def _publish_data(self, data, topic, qos, timeout=DEFAULT_TIMEOUT):
745686
data = dumps(data)
746687
if qos is None:
747688
qos = self.quality_of_service
748689
if qos not in (0, 1):
749690
log.exception("Quality of service (qos) value must be 0 or 1")
750691
raise TBQoSException("Quality of service (qos) value must be 0 or 1")
751-
req_id = uuid.uuid4()
752-
if high_priority:
753-
self.__sending_queue.put_left({"topic": topic, "data": data, "qos": qos, "id": req_id,
754-
"method": TBSendMethod.PUBLISH}, False)
755-
else:
756-
self.__sending_queue.put({"topic": topic, "data": data, "qos": qos, "id": req_id,
757-
"method": TBSendMethod.PUBLISH}, False)
758-
sending_queue_size = self.__sending_queue.qsize()
759-
if sending_queue_size > 1000000 and int(time()) - self.__sending_queue_warning_published > 5:
760-
self.__sending_queue_warning_published = int(time())
761-
log.warning("Sending queue is bigger than 1000000 messages (%r), consider increasing the rate limit, "
762-
"or decreasing the amount of messages sent!", sending_queue_size)
763692

764693
waiting_for_connection_message_time = 0
765694
while not self.is_connected():
@@ -770,27 +699,19 @@ def _publish_data(self, data, topic, qos, wait_for_publish=True, high_priority=F
770699
waiting_for_connection_message_time = time()
771700
sleep(0.01)
772701

773-
start_time = int(time())
774-
if wait_for_publish:
775-
while req_id not in list(self.__responses.keys()):
776-
if 0 < timeout < int(time()) - start_time:
777-
log.error("Timeout while waiting for a publish to ThingsBoard!")
778-
return TBPublishInfo(paho.MQTTMessageInfo(None))
779-
sleep(0.01)
780-
781-
return TBPublishInfo(self.__responses.pop(req_id)["info"])
702+
return self._send_request(TBSendMethod.PUBLISH, {"topic": topic, "payload": data, "qos": qos}, timeout)
782703

783-
def send_telemetry(self, telemetry, quality_of_service=None, wait_for_publish=True, high_priority=False):
704+
def send_telemetry(self, telemetry, quality_of_service=None, wait_for_publish=True):
784705
"""Send telemetry to ThingsBoard. The telemetry can be a single dictionary or a list of dictionaries."""
785706
quality_of_service = quality_of_service if quality_of_service is not None else self.quality_of_service
786707
if not isinstance(telemetry, list) and not (isinstance(telemetry, dict) and telemetry.get("ts") is not None):
787708
telemetry = [telemetry]
788-
return self._publish_data(telemetry, TELEMETRY_TOPIC, quality_of_service, wait_for_publish, high_priority)
709+
return self._publish_data(telemetry, TELEMETRY_TOPIC, quality_of_service, wait_for_publish)
789710

790-
def send_attributes(self, attributes, quality_of_service=None, wait_for_publish=True, high_priority=False):
711+
def send_attributes(self, attributes, quality_of_service=None, wait_for_publish=True):
791712
"""Send attributes to ThingsBoard. The attributes can be a single dictionary or a list of dictionaries."""
792713
quality_of_service = quality_of_service if quality_of_service is not None else self.quality_of_service
793-
return self._publish_data(attributes, ATTRIBUTES_TOPIC, quality_of_service, wait_for_publish, high_priority)
714+
return self._publish_data(attributes, ATTRIBUTES_TOPIC, quality_of_service, wait_for_publish)
794715

795716
def unsubscribe_from_attribute(self, subscription_id):
796717
"""Unsubscribe from attribute updates for subscription_id."""
@@ -842,8 +763,7 @@ def request_attributes(self, client_keys=None, shared_keys=None, callback=None):
842763

843764
attr_request_number = self._add_attr_request_callback(callback)
844765

845-
info = self._publish_data(msg, ATTRIBUTES_TOPIC_REQUEST + str(attr_request_number), self.quality_of_service,
846-
high_priority=True)
766+
info = self._publish_data(msg, ATTRIBUTES_TOPIC_REQUEST + str(attr_request_number), self.quality_of_service)
847767

848768
self._add_timeout(attr_request_number, ts_in_millis, timeout=20)
849769
return info
@@ -888,7 +808,7 @@ def claim(self, secret_key, duration=30000):
888808
"secretKey": secret_key,
889809
"durationMs": duration
890810
}
891-
info = self._publish_data(claiming_request, CLAIMING_TOPIC, self.quality_of_service, high_priority=True)
811+
info = self._publish_data(claiming_request, CLAIMING_TOPIC, self.quality_of_service)
892812
return info
893813

894814
@staticmethod

0 commit comments

Comments
 (0)