|
50 | 50 | CLAIMING_TOPIC = 'v1/devices/me/claim'
|
51 | 51 | PROVISION_TOPIC_REQUEST = '/provision/request'
|
52 | 52 | PROVISION_TOPIC_RESPONSE = '/provision/response'
|
53 |
| -log = logging.getLogger(__name__) |
| 53 | +log = logging.getLogger('tb_connection') |
54 | 54 |
|
55 | 55 | RESULT_CODES = {
|
56 | 56 | 1: "incorrect protocol version",
|
@@ -224,7 +224,7 @@ def check_limit_reached(self, amount=1):
|
224 | 224 | log.debug("Rate limit reset for %s second for config %s", rate_limit_time, rate_limit_info)
|
225 | 225 | self.__rate_limit_dict[rate_limit_time]["counter"] = 0
|
226 | 226 | if rate_limit_info['counter'] + amount >= rate_limit_info['limit']:
|
227 |
| - return True |
| 227 | + return rate_limit_time |
228 | 228 | return False
|
229 | 229 |
|
230 | 230 | def get_minimal_limit(self):
|
@@ -289,6 +289,7 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser
|
289 | 289 | rate_limit, dp_rate_limit = RateLimit.get_rate_limits_by_host(self.__host, rate_limit, dp_rate_limit)
|
290 | 290 | self.__rate_limit = RateLimit(rate_limit)
|
291 | 291 | self.__dp_rate_limit = RateLimit(dp_rate_limit)
|
| 292 | + self._client.max_queued_messages_set(self.__rate_limit.get_minimal_limit()) |
292 | 293 | self.__attrs_request_timeout = {}
|
293 | 294 | self.__timeout_thread = Thread(target=self.__timeout_check)
|
294 | 295 | self.__timeout_thread.daemon = True
|
@@ -599,7 +600,22 @@ def set_server_side_rpc_request_handler(self, handler):
|
599 | 600 | def _wait_for_rate_limit_released(self, timeout, amount=1):
|
600 | 601 | start_time = int(time())
|
601 | 602 | timeout = max(self.__rate_limit.get_minimal_timeout(), self.__dp_rate_limit.get_minimal_timeout(), timeout)
|
602 |
| - while self.__rate_limit.check_limit_reached() or self.__dp_rate_limit.check_limit_reached(amount=amount): |
| 603 | + timeout_updated = False |
| 604 | + disconnected = False |
| 605 | + limit_reached_check = True |
| 606 | + while limit_reached_check: |
| 607 | + limit_reached_check = (self.__rate_limit.check_limit_reached() |
| 608 | + or self.__dp_rate_limit.check_limit_reached(amount=amount) |
| 609 | + or not self.is_connected()) |
| 610 | + if not timeout_updated and limit_reached_check: |
| 611 | + timeout = max(timeout, limit_reached_check) |
| 612 | + timeout_updated = True |
| 613 | + if self.stopped: |
| 614 | + return TBPublishInfo(paho.MQTTMessageInfo(None)) |
| 615 | + if not disconnected and not self.is_connected(): |
| 616 | + log.warning("Waiting for connection to be established before sending data to ThingsBoard!") |
| 617 | + disconnected = True |
| 618 | + timeout = max(timeout, 180) |
603 | 619 | if int(time()) >= timeout + start_time:
|
604 | 620 | log.error("Timeout while waiting for rate limit to be released!")
|
605 | 621 | return TBPublishInfo(paho.MQTTMessageInfo(None))
|
@@ -783,19 +799,20 @@ def _count_datapoints_in_message_and_increase_rate_limit(data, rate_limit):
|
783 | 799 |
|
784 | 800 | log.debug("Data points in message: %s", datapoints)
|
785 | 801 | rate_limit.increase_rate_limit_counter(datapoints)
|
| 802 | + log.debug("Rate limit counter increased by %s and is now %s", datapoints, rate_limit._RateLimit__rate_limit_dict.values()) |
786 | 803 |
|
787 | 804 | return datapoints
|
788 | 805 |
|
789 | 806 | @staticmethod
|
790 | 807 | def _get_data_points_from_message(data):
|
791 | 808 | if isinstance(data, dict):
|
792 | 809 | if data.get("ts") is not None and data.get("values") is not None:
|
793 |
| - datapoints_in_message_amount = len(data.get("values")) |
| 810 | + datapoints_in_message_amount = len(data['values']) + len(str(data['values']))/1000 |
794 | 811 | else:
|
795 |
| - datapoints_in_message_amount = len(data.keys()) |
| 812 | + datapoints_in_message_amount = len(data.keys()) + len(str(data))/1000 |
796 | 813 | else:
|
797 |
| - datapoints_in_message_amount = len(data) |
798 |
| - return datapoints_in_message_amount |
| 814 | + datapoints_in_message_amount = len(data) + len(str(data))/1000 |
| 815 | + return int(datapoints_in_message_amount) |
799 | 816 |
|
800 | 817 | @staticmethod
|
801 | 818 | def provision(host,
|
|
0 commit comments