@@ -758,12 +758,12 @@ def __get_rate_limits_by_topic(self, topic, device=None, msg_rate_limit=None, dp
758
758
759
759
def __send_publish_with_limitations (self , kwargs , timeout , device = None , msg_rate_limit : RateLimit = None ,
760
760
dp_rate_limit : RateLimit = None ):
761
- data_for_analysis = data = kwargs .get ("payload" )
761
+ data = kwargs .get ("payload" )
762
762
if isinstance (data , str ):
763
- data_for_analysis = loads (data )
763
+ data = loads (data )
764
764
datapoints = - 1
765
765
if dp_rate_limit .has_limit ():
766
- datapoints = self ._count_datapoints_in_message (data_for_analysis , device = device )
766
+ datapoints = self ._count_datapoints_in_message (data , device = device )
767
767
payload = data
768
768
if dp_rate_limit .has_limit () and datapoints >= 0 and dp_rate_limit .get_minimal_limit () < datapoints :
769
769
log .debug ("Rate limit is too low, cannot send message with %i datapoints, "
@@ -779,10 +779,12 @@ def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate
779
779
device_split_messages = self ._split_message (device_data , dp_rate_limit .get_minimal_limit (),
780
780
self .max_payload_size )
781
781
split_messages = [
782
- {'message' : {device : [split_message ['data' ]]}, 'datapoints' : split_message ['datapoints' ]}
783
- for split_message in device_split_messages ]
782
+ {'message' : {device : [split_message ['data' ]]}, 'datapoints' : split_message ['datapoints' ],
783
+ 'metadata' : split_message .get ('metadata' )} for split_message in device_split_messages ]
784
+
784
785
if len (split_messages ) == 0 :
785
786
log .debug ("Cannot split message to smaller parts!" )
787
+
786
788
results = []
787
789
for part in split_messages :
788
790
dp_rate_limit .increase_rate_limit_counter (part ['datapoints' ])
@@ -1036,6 +1038,7 @@ def _split_message(message_pack, max_size, max_payload_size):
1036
1038
values = message .get ("values" )
1037
1039
else :
1038
1040
values = message
1041
+
1039
1042
values_data_keys = tuple (values .keys ())
1040
1043
if len (values_data_keys ) == 1 :
1041
1044
if ts is not None :
@@ -1065,7 +1068,8 @@ def _split_message(message_pack, max_size, max_payload_size):
1065
1068
or current_data_key_index == len (values_data_keys ) - 1 ) or len (
1066
1069
str (message_item_values_with_allowed_size )) >= max_payload_size :
1067
1070
if ts is not None :
1068
- final_message_item ['data' ] = {"ts" : ts , "values" : message_item_values_with_allowed_size }
1071
+ final_message_item ['data' ] = {"ts" : ts , "values" : message_item_values_with_allowed_size ,
1072
+ 'metadata' : message .get ('metadata' )}
1069
1073
else :
1070
1074
final_message_item ['data' ] = message_item_values_with_allowed_size
1071
1075
final_message_item ['datapoints' ] = len (message_item_values_with_allowed_size )
0 commit comments