@@ -39,8 +39,6 @@ import (
39
39
40
40
var isGcp = false
41
41
var printCounter = 500
42
- var bytesInThreshold = 200 * 1024 * 1024
43
- var bytesInSleepDuration = time .Second * 120
44
42
var assemblerMap = make (map [int ]* tcpassembly.Assembler )
45
43
var incomingCountMap = make (map [string ]utils.IncomingCounter )
46
44
var outgoingCountMap = make (map [string ]utils.OutgoingCounter )
@@ -390,18 +388,6 @@ func run(handle *pcap.Handle, apiCollectionId int, source string) {
390
388
}
391
389
log .Println ("kafka_url" , kafka_url )
392
390
393
- bytesInThresholdInput := os .Getenv ("AKTO_BYTES_IN_THRESHOLD" )
394
- if len (bytesInThresholdInput ) > 0 {
395
- bytesInThreshold , err = strconv .Atoi (bytesInThresholdInput )
396
- if err != nil {
397
- log .Println ("AKTO_BYTES_IN_THRESHOLD should be valid integer. Found " , bytesInThresholdInput )
398
- return
399
- } else {
400
- log .Println ("Setting bytes in threshold at " , bytesInThreshold )
401
- }
402
-
403
- }
404
-
405
391
kafka_batch_size , e := strconv .Atoi (os .Getenv ("AKTO_TRAFFIC_BATCH_SIZE" ))
406
392
if e != nil {
407
393
log .Printf ("AKTO_TRAFFIC_BATCH_SIZE should be valid integer" )
@@ -430,8 +416,6 @@ func run(handle *pcap.Handle, apiCollectionId int, source string) {
430
416
431
417
log .Println ("reading in packets" )
432
418
// Read in packets, pass to assembler.
433
- var bytesIn = 0
434
- var bytesInEpoch = time .Now ()
435
419
packetSource := gopacket .NewPacketSource (handle , handle .LinkType ())
436
420
for packet := range packetSource .Packets () {
437
421
innerPacket := packet
@@ -472,24 +456,6 @@ func run(handle *pcap.Handle, apiCollectionId int, source string) {
472
456
assembler := createAndGetAssembler (vxlanID , source )
473
457
assembler .AssembleWithTimestamp (innerPacket .NetworkLayer ().NetworkFlow (), tcp , packet .Metadata ().Timestamp )
474
458
475
- bytesIn += len (tcp .Payload )
476
-
477
- if bytesIn % 1_000_000 == 0 {
478
- log .Println ("Bytes in: " , bytesIn )
479
- }
480
-
481
- if bytesIn > bytesInThreshold {
482
- if time .Now ().Sub (bytesInEpoch ).Seconds () < 60 {
483
- log .Println ("exceeded bytesInThreshold: " , bytesInThreshold , " with curr: " , bytesIn )
484
- log .Println ("sleeping for: " , bytesInSleepDuration )
485
- flushAll ()
486
- time .Sleep (bytesInSleepDuration )
487
- }
488
-
489
- bytesIn = 0
490
- bytesInEpoch = time .Now ()
491
- }
492
-
493
459
}
494
460
}
495
461
}
0 commit comments