15
15
import logging
16
16
from inspect import signature
17
17
from time import sleep
18
- from time import time as timestamp
19
18
20
19
import paho .mqtt .client as paho
21
20
from math import ceil
@@ -738,8 +737,6 @@ def _send_request(self, _type, kwargs, timeout=DEFAULT_TIMEOUT, device=None,
738
737
if msg_rate_limit .has_limit ():
739
738
return self .__send_publish_with_limitations (kwargs , timeout , device , msg_rate_limit , dp_rate_limit )
740
739
else :
741
- if self .__is_test_latency_message (kwargs ['payload' ]):
742
- kwargs = self .__convert_test_latency_message (kwargs )
743
740
744
741
if "payload" in kwargs and not isinstance (kwargs ["payload" ], str ):
745
742
kwargs ["payload" ] = dumps (kwargs ["payload" ])
@@ -761,12 +758,12 @@ def __get_rate_limits_by_topic(self, topic, device=None, msg_rate_limit=None, dp
761
758
762
759
def __send_publish_with_limitations (self , kwargs , timeout , device = None , msg_rate_limit : RateLimit = None ,
763
760
dp_rate_limit : RateLimit = None ):
764
- data_for_analysis = data = kwargs .get ("payload" )
761
+ data = kwargs .get ("payload" )
765
762
if isinstance (data , str ):
766
- data_for_analysis = loads (data )
763
+ data = loads (data )
767
764
datapoints = - 1
768
765
if dp_rate_limit .has_limit ():
769
- datapoints = self ._count_datapoints_in_message (data_for_analysis , device = device )
766
+ datapoints = self ._count_datapoints_in_message (data , device = device )
770
767
payload = data
771
768
if dp_rate_limit .has_limit () and datapoints >= 0 and dp_rate_limit .get_minimal_limit () < datapoints :
772
769
log .debug ("Rate limit is too low, cannot send message with %i datapoints, "
@@ -782,19 +779,19 @@ def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate
782
779
device_split_messages = self ._split_message (device_data , dp_rate_limit .get_minimal_limit (),
783
780
self .max_payload_size )
784
781
split_messages = [
785
- {'message' : {device : [split_message ['data' ]]}, 'datapoints' : split_message ['datapoints' ]}
786
- for split_message in device_split_messages ]
782
+ {'message' : {device : [split_message ['data' ]]}, 'datapoints' : split_message ['datapoints' ],
783
+ 'metadata' : split_message .get ('metadata' )} for split_message in device_split_messages ]
784
+
787
785
if len (split_messages ) == 0 :
788
786
log .debug ("Cannot split message to smaller parts!" )
787
+
789
788
results = []
790
789
for part in split_messages :
791
790
dp_rate_limit .increase_rate_limit_counter (part ['datapoints' ])
792
791
self ._wait_for_rate_limit_released (timeout ,
793
792
message_rate_limit = msg_rate_limit ,
794
793
dp_rate_limit = dp_rate_limit ,
795
794
amount = dp_rate_limit .get_minimal_limit ())
796
- if self .__is_test_latency_message (kwargs ['payload' ]):
797
- kwargs = self .__convert_test_latency_message (kwargs )
798
795
kwargs ["payload" ] = dumps (part ['message' ])
799
796
self .wait_until_current_queued_messages_processed ()
800
797
results .append (self ._client .publish (** kwargs ))
@@ -806,8 +803,6 @@ def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate
806
803
message_rate_limit = msg_rate_limit ,
807
804
dp_rate_limit = dp_rate_limit ,
808
805
amount = datapoints )
809
- if self .__is_test_latency_message (kwargs ['payload' ]):
810
- kwargs = self .__convert_test_latency_message (kwargs )
811
806
kwargs ["payload" ] = dumps (payload )
812
807
return TBPublishInfo (self ._client .publish (** kwargs ))
813
808
@@ -1043,6 +1038,7 @@ def _split_message(message_pack, max_size, max_payload_size):
1043
1038
values = message .get ("values" )
1044
1039
else :
1045
1040
values = message
1041
+
1046
1042
values_data_keys = tuple (values .keys ())
1047
1043
if len (values_data_keys ) == 1 :
1048
1044
if ts is not None :
@@ -1072,7 +1068,8 @@ def _split_message(message_pack, max_size, max_payload_size):
1072
1068
or current_data_key_index == len (values_data_keys ) - 1 ) or len (
1073
1069
str (message_item_values_with_allowed_size )) >= max_payload_size :
1074
1070
if ts is not None :
1075
- final_message_item ['data' ] = {"ts" : ts , "values" : message_item_values_with_allowed_size }
1071
+ final_message_item ['data' ] = {"ts" : ts , "values" : message_item_values_with_allowed_size ,
1072
+ 'metadata' : message .get ('metadata' )}
1076
1073
else :
1077
1074
final_message_item ['data' ] = message_item_values_with_allowed_size
1078
1075
final_message_item ['datapoints' ] = len (message_item_values_with_allowed_size )
@@ -1085,24 +1082,3 @@ def _split_message(message_pack, max_size, max_payload_size):
1085
1082
if add_last_item :
1086
1083
split_messages .append (final_message_item )
1087
1084
return split_messages
1088
-
1089
- @staticmethod
1090
- def __is_test_latency_message (payload ):
1091
- if isinstance (payload , list ) and payload [0 ].get ('values' , {}).get ('isTestLatencyMessageType' , False ):
1092
- return True
1093
-
1094
- return False
1095
-
1096
- @staticmethod
1097
- def __convert_test_latency_message (kwargs ):
1098
- try :
1099
- values = kwargs ['payload' ][0 ]['values' ]
1100
- payload = {
1101
- values ['connectorName' ]: {'receivedTs' : values ['receivedTs' ], 'publishedTs' : int (timestamp () * 1000 )}}
1102
-
1103
- kwargs ['payload' ] = payload
1104
- kwargs ['topic' ] = 'v1/gateway/metrics'
1105
- return kwargs
1106
- except Exception as e :
1107
- log .error (e )
1108
- return kwargs
0 commit comments