|
20 | 20 | from math import ceil
|
21 | 21 |
|
22 | 22 | try:
|
23 |
| - from time import monotonic as time |
| 23 | + from time import monotonic as time, time as timestamp |
24 | 24 | except ImportError:
|
25 |
| - from time import time |
| 25 | + from time import time, time as timestamp |
26 | 26 | import ssl
|
27 | 27 | from threading import RLock, Thread
|
28 | 28 | from enum import Enum
|
@@ -735,18 +735,41 @@ def _send_request(self, _type, kwargs, timeout=DEFAULT_TIMEOUT, device=None,
|
735 | 735 |
|
736 | 736 | if _type == TBSendMethod.PUBLISH:
|
737 | 737 | if msg_rate_limit.has_limit():
|
| 738 | + self.__add_metadata_to_data_dict_from_device(kwargs["payload"]) |
738 | 739 | return self.__send_publish_with_limitations(kwargs, timeout, device, msg_rate_limit, dp_rate_limit)
|
739 | 740 | else:
|
740 |
| - |
741 |
| - if "payload" in kwargs and not isinstance(kwargs["payload"], str): |
742 |
| - kwargs["payload"] = dumps(kwargs["payload"]) |
743 |
| - |
| 741 | + if "payload" in kwargs: |
| 742 | + not_converted_to_str = True |
| 743 | + if isinstance(kwargs["payload"], dict): |
| 744 | + self.__add_metadata_to_data_dict_from_device(kwargs["payload"]) |
| 745 | + kwargs["payload"] = dumps(kwargs["payload"]) |
| 746 | + not_converted_to_str = False |
| 747 | + elif isinstance(kwargs["payload"], str): |
| 748 | + if 'metadata' in kwargs["payload"]: |
| 749 | + payload = loads(kwargs["payload"]) |
| 750 | + self.__add_metadata_to_data_dict_from_device(payload) |
| 751 | + not_converted_to_str = False |
| 752 | + if not_converted_to_str and not isinstance(kwargs["payload"], str): |
| 753 | + kwargs["payload"] = dumps(kwargs["payload"]) |
744 | 754 | return TBPublishInfo(self._client.publish(**kwargs))
|
745 | 755 | elif _type == TBSendMethod.SUBSCRIBE:
|
746 | 756 | return self._client.subscribe(**kwargs)
|
747 | 757 | elif _type == TBSendMethod.UNSUBSCRIBE:
|
748 | 758 | return self._client.unsubscribe(**kwargs)
|
749 | 759 |
|
| 760 | + def __add_metadata_to_data_dict_from_device(self, data): |
| 761 | + if isinstance(data, dict) and "metadata" in data: |
| 762 | + data["metadata"]["publishedTs"] = int(timestamp() * 1000) |
| 763 | + elif isinstance(data, list): |
| 764 | + current_time = int(timestamp() * 1000) |
| 765 | + for data_item in data: |
| 766 | + if isinstance(data_item, dict): |
| 767 | + if 'ts' in data_item and 'metadata' in data_item: |
| 768 | + data_item["metadata"]["publishedTs"] = current_time |
| 769 | + elif isinstance(data, dict): |
| 770 | + for key, value in data.items(): |
| 771 | + self.__add_metadata_to_data_dict_from_device(value) |
| 772 | + |
750 | 773 | def __get_rate_limits_by_topic(self, topic, device=None, msg_rate_limit=None, dp_rate_limit=None):
|
751 | 774 | if device is not None:
|
752 | 775 | return msg_rate_limit, dp_rate_limit
|
@@ -921,7 +944,7 @@ def _add_attr_request_callback(self, callback):
|
921 | 944 |
|
922 | 945 | def __timeout_check(self):
|
923 | 946 | while not self.stopped:
|
924 |
| - current_ts_in_millis = int(time()) |
| 947 | + current_ts_in_millis = int(time() * 1000) |
925 | 948 | for (attr_request_number, ts) in tuple(self.__attrs_request_timeout.items()):
|
926 | 949 | if current_ts_in_millis < ts:
|
927 | 950 | continue
|
|
0 commit comments