@@ -1411,9 +1411,7 @@ def _split_message(message_pack, datapoints_max_count, max_payload_size):
1411
1411
message_pack = [message_pack ]
1412
1412
1413
1413
def _get_metadata_repr (metadata ):
1414
- if isinstance (metadata , dict ):
1415
- return tuple (sorted (metadata .items ()))
1416
- return None
1414
+ return tuple (sorted (metadata .items ())) if isinstance (metadata , dict ) else None
1417
1415
1418
1416
def estimate_chunk_size (chunk ):
1419
1417
if isinstance (chunk , dict ) and "values" in chunk :
@@ -1427,47 +1425,46 @@ def estimate_chunk_size(chunk):
1427
1425
else :
1428
1426
return len (str (chunk )) + 20
1429
1427
1430
-
1431
1428
ts_group_cache = {}
1432
1429
current_message = {"data" : [], "datapoints" : 0 }
1433
- current_datapoints = 0
1434
1430
current_size = 0
1431
+ current_datapoints = 0
1435
1432
1436
1433
def flush_current_message ():
1437
- nonlocal current_message , current_datapoints , current_size
1434
+ nonlocal current_message , current_size , current_datapoints
1438
1435
if current_message ["data" ]:
1439
1436
split_messages .append (current_message )
1440
1437
current_message = {"data" : [], "datapoints" : 0 }
1441
- current_datapoints = 0
1442
1438
current_size = 0
1439
+ current_datapoints = 0
1443
1440
1444
1441
def split_and_add_chunk (chunk , chunk_datapoints ):
1445
- nonlocal current_message , current_datapoints , current_size
1442
+ nonlocal current_message , current_size , current_datapoints
1443
+
1446
1444
chunk_size = estimate_chunk_size (chunk )
1447
1445
1448
- if (datapoints_max_count > 0 and current_datapoints + chunk_datapoints > datapoints_max_count ) or \
1446
+ if (0 < datapoints_max_count <= current_datapoints + chunk_datapoints ) or \
1449
1447
(current_size + chunk_size > max_payload_size ):
1450
1448
flush_current_message ()
1451
1449
1452
1450
if chunk_datapoints > datapoints_max_count > 0 or chunk_size > max_payload_size :
1453
- keys = list (chunk [ "values" ] .keys ()) if "values" in chunk else list (chunk .keys ())
1451
+ keys = list (chunk . get ( "values" , {}) .keys ()) if isinstance ( chunk , dict ) else list (chunk .keys ())
1454
1452
if len (keys ) == 1 :
1453
+ flush_current_message ()
1455
1454
current_message ["data" ].append (chunk )
1456
1455
current_message ["datapoints" ] += chunk_datapoints
1457
1456
current_size += chunk_size
1457
+ current_datapoints += chunk_datapoints
1458
+ flush_current_message ()
1458
1459
return
1459
1460
1460
- max_step = int (datapoints_max_count ) if datapoints_max_count > 0 else len (keys )
1461
- if max_step < 1 :
1462
- max_step = 1
1463
-
1461
+ max_step = max (1 , datapoints_max_count if datapoints_max_count > 0 else len (keys ))
1464
1462
for i in range (0 , len (keys ), max_step ):
1465
1463
sub_values = (
1466
- {k : chunk ["values" ][k ] for k in keys [i :i + max_step ]}
1467
- if "values" in chunk else
1468
- {k : chunk [k ] for k in keys [i :i + max_step ]}
1464
+ {k : chunk ["values" ][k ] for k in keys [i :i + max_step ]} if "values" in chunk
1465
+ else {k : chunk [k ] for k in keys [i :i + max_step ]}
1469
1466
)
1470
-
1467
+ sub_chunk = {}
1471
1468
if "ts" in chunk :
1472
1469
sub_chunk = {"ts" : chunk ["ts" ], "values" : sub_values }
1473
1470
if "metadata" in chunk :
@@ -1478,46 +1475,55 @@ def split_and_add_chunk(chunk, chunk_datapoints):
1478
1475
sub_datapoints = len (sub_values )
1479
1476
sub_size = estimate_chunk_size (sub_chunk )
1480
1477
1481
- if sub_size > max_payload_size :
1478
+ if sub_size > max_payload_size or (0 < datapoints_max_count <= sub_datapoints ):
1479
+ flush_current_message ()
1482
1480
current_message ["data" ].append (sub_chunk )
1483
1481
current_message ["datapoints" ] += sub_datapoints
1484
1482
current_size += sub_size
1485
- continue
1486
-
1487
- split_and_add_chunk (sub_chunk , sub_datapoints )
1483
+ current_datapoints += sub_datapoints
1484
+ flush_current_message ()
1485
+ else :
1486
+ split_and_add_chunk (sub_chunk , sub_datapoints )
1488
1487
return
1489
1488
1490
1489
current_message ["data" ].append (chunk )
1491
1490
current_message ["datapoints" ] += chunk_datapoints
1492
1491
current_size += chunk_size
1492
+ current_datapoints += chunk_datapoints
1493
+
1494
+ if 0 < datapoints_max_count == current_datapoints :
1495
+ flush_current_message ()
1493
1496
1494
1497
def add_chunk_to_current_message (chunk , chunk_datapoints ):
1495
- nonlocal current_message , current_datapoints , current_size
1498
+ nonlocal current_message , current_size , current_datapoints
1499
+
1496
1500
chunk_size = estimate_chunk_size (chunk )
1497
1501
1498
- if (datapoints_max_count > 0 and chunk_datapoints > datapoints_max_count ) or chunk_size > max_payload_size :
1502
+ if (0 < datapoints_max_count <= chunk_datapoints ) or chunk_size > max_payload_size :
1499
1503
split_and_add_chunk (chunk , chunk_datapoints )
1500
1504
return
1501
1505
1502
- if (datapoints_max_count > 0 and current_datapoints + chunk_datapoints > datapoints_max_count ) or \
1506
+ if (0 < datapoints_max_count <= current_datapoints + chunk_datapoints ) or \
1503
1507
(current_size + chunk_size > max_payload_size ):
1504
1508
flush_current_message ()
1505
1509
1506
1510
current_message ["data" ].append (chunk )
1507
1511
current_message ["datapoints" ] += chunk_datapoints
1508
1512
current_size += chunk_size
1513
+ current_datapoints += chunk_datapoints
1509
1514
1510
- if datapoints_max_count > 0 and current_message [ "datapoints" ] == datapoints_max_count :
1515
+ if 0 < datapoints_max_count == current_datapoints :
1511
1516
flush_current_message ()
1512
1517
1513
1518
def flush_ts_group (ts_key , ts , metadata_repr ):
1519
+ nonlocal current_message , current_size , current_datapoints
1514
1520
if ts_key not in ts_group_cache :
1515
1521
return
1522
+
1516
1523
values , _ , metadata = ts_group_cache .pop (ts_key )
1517
1524
keys = list (values .keys ())
1518
- step = int (datapoints_max_count ) if datapoints_max_count > 0 else len (keys )
1519
- if step < 1 :
1520
- step = 1
1525
+
1526
+ step = max (1 , datapoints_max_count if datapoints_max_count > 0 else len (keys ))
1521
1527
for i in range (0 , len (keys ), step ):
1522
1528
chunk_values = {k : values [k ] for k in keys [i :i + step ]}
1523
1529
if ts is not None :
@@ -1526,13 +1532,25 @@ def flush_ts_group(ts_key, ts, metadata_repr):
1526
1532
chunk ["metadata" ] = metadata
1527
1533
else :
1528
1534
chunk = chunk_values .copy ()
1529
- add_chunk_to_current_message (chunk , len (chunk_values ))
1535
+
1536
+ chunk_datapoints = len (chunk_values )
1537
+ chunk_size = estimate_chunk_size (chunk )
1538
+
1539
+ if chunk_size > max_payload_size or (0 < datapoints_max_count <= chunk_datapoints ):
1540
+ flush_current_message ()
1541
+ current_message ["data" ].append (chunk )
1542
+ current_message ["datapoints" ] += chunk_datapoints
1543
+ current_size += chunk_size
1544
+ current_datapoints += chunk_datapoints
1545
+ flush_current_message ()
1546
+ else :
1547
+ add_chunk_to_current_message (chunk , chunk_datapoints )
1530
1548
1531
1549
for message in message_pack :
1532
1550
if not isinstance (message , dict ):
1533
1551
continue
1534
1552
1535
- ts = message .get ("ts" , None )
1553
+ ts = message .get ("ts" )
1536
1554
metadata = message .get ("metadata" ) if isinstance (message .get ("metadata" ), dict ) else None
1537
1555
values = message .get ("values" ) if isinstance (message .get ("values" ), dict ) else \
1538
1556
message if isinstance (message , dict ) else {}
0 commit comments