Skip to content

Commit 0527156

Browse files
authored
Merge pull request #108 from akto-api-security/feat/tags-only
Revert "Merge pull request #105 from akto-api-security/event_loss_fixes"
2 parents cae6012 + 032c366 commit 0527156

File tree

5 files changed

+68
-102
lines changed

5 files changed

+68
-102
lines changed

ebpf/bpfwrapper/perfbufferreaders.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package bpfwrapper
22

33
import (
44
"fmt"
5-
"github.com/akto-api-security/mirroring-api-logging/trafficUtil/kafkaUtil"
65
"log"
76

87
"github.com/iovisor/gobpf/bcc"
@@ -37,7 +36,7 @@ func NewProbeChannel(name string, handler ProbeEventLoop) *ProbeChannel {
3736

3837
// Start initiate a goroutine for the event loop handler, for a lost events messages and the perf map.
3938
func (probeChannel *ProbeChannel) Start(module *bcc.Module, connectionFactory *connections.Factory) error {
40-
probeChannel.eventChannel = make(chan []byte, kafkaUtil.EventChanBuffSize)
39+
probeChannel.eventChannel = make(chan []byte)
4140
probeChannel.lostEventsChannel = make(chan uint64)
4241

4342
table := bcc.NewTable(module.TableId(probeChannel.name), module)

ebpf/connections/factory.go

Lines changed: 47 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package connections
22

33
import (
4-
"bytes"
54
"encoding/binary"
65
"fmt"
76
"log/slog"
@@ -16,86 +15,19 @@ import (
1615
"github.com/google/uuid"
1716
)
1817

19-
var httpBytes = []byte("HTTP")
20-
2118
// Factory is a routine-safe container that holds a trackers with unique ID, and able to create new tracker.
2219
type Factory struct {
23-
processor map[structs.ConnID]chan interface{}
24-
connections map[structs.ConnID]*Tracker
25-
mutex *sync.RWMutex
26-
trackersToDelete sync.Map
20+
processor map[structs.ConnID]chan interface{}
21+
connections map[structs.ConnID]*Tracker
22+
mutex *sync.RWMutex
2723
}
2824

2925
// NewFactory creates a new instance of the factory.
3026
func NewFactory() *Factory {
31-
f := &Factory{
32-
processor: make(map[structs.ConnID]chan interface{}),
33-
connections: make(map[structs.ConnID]*Tracker),
34-
mutex: &sync.RWMutex{},
35-
trackersToDelete: sync.Map{},
36-
}
37-
38-
f.StartCleanupWorker()
39-
return f
40-
}
41-
42-
func (factory *Factory) StartCleanupWorker() {
43-
go func() {
44-
for {
45-
time.Sleep(100 * time.Millisecond)
46-
47-
now := time.Now()
48-
factory.trackersToDelete.Range(func(key, value interface{}) bool {
49-
connID := key.(structs.ConnID)
50-
markedAt := value.(time.Time)
51-
52-
if now.Sub(markedAt) > time.Duration(trackerDataProcessInterval)*time.Millisecond {
53-
factory.ProcessAndStopWorker(connID)
54-
factory.forceDeleteWorker(connID)
55-
factory.trackersToDelete.Delete(connID)
56-
}
57-
return true
58-
})
59-
}
60-
}()
61-
}
62-
63-
func (factory *Factory) forceDeleteWorker(connectionID structs.ConnID) {
64-
factory.mutex.Lock()
65-
defer factory.mutex.Unlock()
66-
67-
if ch, exists := factory.processor[connectionID]; exists {
68-
close(ch)
69-
delete(factory.processor, connectionID)
70-
slog.Debug("Deleted event channel", "fd", connectionID.Fd, "id", connectionID.Id, "timestamp", connectionID.Conn_start_ns, "ip", connectionID.Ip, "port", connectionID.Port)
71-
}
72-
73-
if _, exists := factory.connections[connectionID]; exists {
74-
delete(factory.connections, connectionID)
75-
slog.Debug("Deleted connection", "fd", connectionID.Fd, "id", connectionID.Id, "timestamp", connectionID.Conn_start_ns, "ip", connectionID.Ip, "port", connectionID.Port)
76-
requestProcessCount++
77-
}
78-
79-
if (time.Now().UnixMilli())-lastMemCheck > int64(memCheckInterval) {
80-
lastMemCheck = time.Now().UnixMilli()
81-
mem := utils.LogMemoryStats()
82-
83-
utils.PrintLog("Requests processed", "count", requestProcessCount, "lastMemCheck", lastMemCheck)
84-
utils.PrintLog("connection factory size", "connections", len(factory.connections), "processors", len(factory.processor), "lastMemCheck", lastMemCheck)
85-
requestProcessCount = 0
86-
if mem >= bufferMemThreshold {
87-
trackersToDelete := make(map[structs.ConnID]struct{})
88-
for k := range factory.connections {
89-
trackersToDelete[k] = struct{}{}
90-
}
91-
for key := range trackersToDelete {
92-
if ch, exists := factory.processor[key]; exists {
93-
close(ch)
94-
delete(factory.processor, key)
95-
}
96-
delete(factory.connections, key)
97-
}
98-
}
27+
return &Factory{
28+
processor: make(map[structs.ConnID]chan interface{}),
29+
connections: make(map[structs.ConnID]*Tracker),
30+
mutex: &sync.RWMutex{},
9931
}
10032
}
10133

@@ -143,8 +75,7 @@ var (
14375
bufferMemThreshold = 400
14476

14577
// unique id of daemonset
146-
uniqueDaemonsetId = uuid.New().String()
147-
trackerDataProcessInterval = 100
78+
uniqueDaemonsetId = uuid.New().String()
14879
)
14980

15081
func init() {
@@ -153,7 +84,6 @@ func init() {
15384
utils.InitVar("TRAFFIC_INACTIVITY_THRESHOLD", &inactivityThreshold)
15485
utils.InitVar("TRAFFIC_BUFFER_THRESHOLD", &bufferMemThreshold)
15586
utils.InitVar("AKTO_MEM_SOFT_LIMIT", &bufferMemThreshold)
156-
utils.InitVar("TRACKER_DATA_PROCESS_INTERVAL", &trackerDataProcessInterval)
15787
}
15888

15989
func ProcessTrackerData(connID structs.ConnID, tracker *Tracker, isComplete bool) {
@@ -185,14 +115,10 @@ func ProcessTrackerData(connID structs.ConnID, tracker *Tracker, isComplete bool
185115
hostName = kafkaUtil.PodInformerInstance.GetPodNameByProcessId(int32(connID.Id>>32))
186116
}
187117

188-
if len(sentBuffer) >= len(httpBytes) && (bytes.Equal(sentBuffer[:len(httpBytes)], httpBytes)) {
189-
tryReadFromBD(destIpStr, srcIpStr, receiveBuffer, sentBuffer, isComplete, 1, connID.Id, connID.Fd, uniqueDaemonsetId, hostName)
190-
}
118+
tryReadFromBD(destIpStr, srcIpStr, receiveBuffer, sentBuffer, isComplete, 1, connID.Id, connID.Fd, uniqueDaemonsetId, hostName)
191119
if !disableEgress {
192120
// attempt to parse the egress as well by switching the recv and sent buffers.
193-
if len(receiveBuffer) >= len(httpBytes) && (bytes.Equal(receiveBuffer[:len(httpBytes)], httpBytes)) {
194-
tryReadFromBD(srcIpStr, destIpStr, sentBuffer, receiveBuffer, isComplete, 2, connID.Id, connID.Fd, uniqueDaemonsetId, hostName)
195-
}
121+
tryReadFromBD(srcIpStr, destIpStr, sentBuffer, receiveBuffer, isComplete, 2, connID.Id, connID.Fd, uniqueDaemonsetId, hostName)
196122
}
197123
}
198124

@@ -286,13 +212,16 @@ func (factory *Factory) StartWorker(connectionID structs.ConnID, tracker *Tracke
286212
case *structs.SocketCloseEvent:
287213
utils.LogProcessing("Received close event", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port)
288214
tracker.AddCloseEvent(*e)
215+
factory.ProcessAndStopWorker(connID)
289216
factory.DeleteWorker(connID)
290217
utils.LogProcessing("Stopping go routine", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port)
218+
return
291219
}
292220

293221
case <-inactivityTimer.C:
294222
// Eat the go routine after inactive threshold, process the tracker and stop the worker
295223
utils.LogProcessing("Inactivity threshold reached, marking connection as inactive and processing", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port)
224+
factory.ProcessAndStopWorker(connID)
296225
factory.DeleteWorker(connID)
297226
utils.LogProcessing("Stopping go routine", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port)
298227
return
@@ -312,7 +241,40 @@ func (factory *Factory) ProcessAndStopWorker(connectionID structs.ConnID) {
312241
func (factory *Factory) DeleteWorker(connectionID structs.ConnID) {
313242
factory.mutex.Lock()
314243
defer factory.mutex.Unlock()
315-
factory.trackersToDelete.Store(connectionID, time.Now())
244+
245+
if ch, exists := factory.processor[connectionID]; exists {
246+
close(ch)
247+
delete(factory.processor, connectionID)
248+
utils.LogProcessing("Deleted event channel", "fd", connectionID.Fd, "id", connectionID.Id, "timestamp", connectionID.Conn_start_ns, "ip", connectionID.Ip, "port", connectionID.Port)
249+
}
250+
251+
if _, exists := factory.connections[connectionID]; exists {
252+
delete(factory.connections, connectionID)
253+
utils.LogProcessing("Deleted connection", "fd", connectionID.Fd, "id", connectionID.Id, "timestamp", connectionID.Conn_start_ns, "ip", connectionID.Ip, "port", connectionID.Port)
254+
requestProcessCount++
255+
}
256+
257+
if (time.Now().UnixMilli())-lastMemCheck > int64(memCheckInterval) {
258+
lastMemCheck = time.Now().UnixMilli()
259+
mem := utils.LogMemoryStats()
260+
utils.PrintLog("Requests processed", "count", requestProcessCount, "lastMemCheck", lastMemCheck)
261+
utils.PrintLog("connection factory size", "connections", len(factory.connections), "processors", len(factory.processor), "lastMemCheck", lastMemCheck)
262+
requestProcessCount = 0
263+
if mem >= bufferMemThreshold {
264+
trackersToDelete := make(map[structs.ConnID]struct{})
265+
utils.LogProcessing("Deleting all trackers at mem", "mem", mem)
266+
for k := range factory.connections {
267+
trackersToDelete[k] = struct{}{}
268+
}
269+
for key := range trackersToDelete {
270+
if ch, exists := factory.processor[key]; exists {
271+
close(ch)
272+
delete(factory.processor, key)
273+
}
274+
delete(factory.connections, key)
275+
}
276+
}
277+
}
316278
}
317279

318280
func (factory *Factory) getChannel(connectionID structs.ConnID) (chan interface{}, bool) {

trafficUtil/kafkaUtil/kafka.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ func getKafkaWriter(kafkaURL string, batchSize int, batchTimeout time.Duration)
306306
WriteTimeout: batchTimeout,
307307
Async: true,
308308
Balancer: &kafka.Hash{},
309-
Compression: kafka.Zstd,
309+
Compression: kafka.Lz4,
310310
}
311311

312312
if useTLS {

trafficUtil/kafkaUtil/parser.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,14 @@ var (
4040
"TRACE": true,
4141
"TRACK": true,
4242
"PATCH": true}
43-
DebugStrings = []string{}
44-
globalReader = &bytes.Reader{}
45-
globalReaderLock sync.Mutex
46-
47-
EventChanBuffSize = 100000
43+
DebugStrings = []string{}
4844
)
4945

5046
const ONE_MINUTE = 60
5147

5248
func init() {
5349
utils.InitVar("DEBUG_MODE", &debugMode)
5450
utils.InitVar("OUTPUT_BANDWIDTH_LIMIT", &outputBandwidthLimitPerMin)
55-
utils.InitVar("EVENT_CHAN_BUFF_SIZE", &EventChanBuffSize)
5651
// convert MB to B
5752
if outputBandwidthLimitPerMin != -1 {
5853
outputBandwidthLimitPerMin = outputBandwidthLimitPerMin * 1024 * 1024
@@ -66,6 +61,10 @@ func init() {
6661

6762
// Start ticker to read debug URLs from file every 30 seconds
6863
go func() {
64+
if !utils.FileLoggingEnabled {
65+
slog.Info("File logging is not enabled, skipping debug URL file watcher")
66+
return
67+
}
6968
ticker := time.NewTicker(30 * time.Second)
7069
defer ticker.Stop()
7170
for {
@@ -131,7 +130,7 @@ func checkDebugUrlAndPrint(url string, host string, message string) {
131130
utils.PrintLogDebug(logMsg)
132131
go ProduceLogs(ctx, logMsg, LogTypeInfo)
133132
break
134-
}else if strings.Contains(host, debugString) {
133+
} else if strings.Contains(host, debugString) {
135134
ctx := context.Background()
136135
logMsg := fmt.Sprintf("%s : %s", message, host)
137136
utils.PrintLogDebug(logMsg)
@@ -140,7 +139,6 @@ func checkDebugUrlAndPrint(url string, host string, message string) {
140139
}
141140
}
142141
}
143-
slog.Debug("EventChanBuffSize value ", EventChanBuffSize)
144142
}
145143

146144
func checkAndUpdateBandwidthProcessed(sampleSize int) bool {
@@ -463,7 +461,7 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d
463461
}
464462
}
465463
} else {
466-
checkDebugUrlAndPrint(url,req.Host, "Pod labels not resolved, PodInformerInstance is nil or direction is not inbound, direction: "+fmt.Sprint(direction))
464+
checkDebugUrlAndPrint(url, req.Host, "Pod labels not resolved, PodInformerInstance is nil or direction is not inbound, direction: "+fmt.Sprint(direction))
467465
}
468466

469467
out, _ := json.Marshal(value)
@@ -509,4 +507,4 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d
509507

510508
i++
511509
}
512-
}
510+
}

trafficUtil/utils/logger.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ var (
1414
processLogs bool = false
1515
aktoLogLevel string
1616
level slog.Level = slog.LevelWarn
17+
FileLoggingEnabled bool = false
1718
)
1819

1920
func SetupLogger() {
2021
slog.Warn("Setting up logger")
2122
InitVar("INGEST_LOGS", &ingestLogs)
2223
InitVar("PROCESS_LOGS", &processLogs)
2324
InitVar("AKTO_LOG_LEVEL", &aktoLogLevel)
25+
InitVar("AKTO_FILE_LOGGING_ENABLED", &FileLoggingEnabled)
2426

2527
if aktoLogLevel != "" {
2628
switch strings.ToUpper(aktoLogLevel) {
@@ -73,10 +75,8 @@ const HOST_MAPPING_PATH = "/ebpf/logs/akto/"
7375
const LOG_ROTATE_INTERVAL = 60 * 5 // 5 minutes
7476

7577
const (
76-
OSPidLogFile = HOST_MAPPING_PATH + "ospidlog.txt"
7778
GoPidLogFile = HOST_MAPPING_PATH + "gopidlog.txt"
7879
LabelsMapLogFile = HOST_MAPPING_PATH + "labelsmaplog.txt"
79-
ResolveLabelsLogFile = HOST_MAPPING_PATH + "resolvelabels.txt"
8080
)
8181

8282
func SetupFileLogger(filePath string) {
@@ -93,6 +93,11 @@ func SetupFileLogger(filePath string) {
9393
}
9494

9595
func LogToSpecificFile(filePath string, message string, args ...any) {
96+
if !FileLoggingEnabled {
97+
slog.Warn("File logging is disabled, skipping log to file", "filePath", filePath)
98+
return
99+
}
100+
96101
textLogger, exists := fileHandlers[filePath]
97102
if !exists {
98103
slog.Error("Logger not initialized for", "filePath", filePath)
@@ -114,13 +119,15 @@ func LogToSpecificFile(filePath string, message string, args ...any) {
114119
}
115120

116121
func SetupAllFileLoggers() {
122+
if !FileLoggingEnabled {
123+
slog.Warn("File logging is disabled, skipping setup")
124+
return
125+
}
117126
if err := os.MkdirAll(HOST_MAPPING_PATH, 0755); err != nil {
118127
slog.Error("Failed to create log directory", "path", HOST_MAPPING_PATH, "error", err)
119128
}
120-
SetupFileLogger(OSPidLogFile)
121129
SetupFileLogger(GoPidLogFile)
122130
SetupFileLogger(LabelsMapLogFile)
123-
SetupFileLogger(ResolveLabelsLogFile)
124131
}
125132

126133
// TODO: Call this somewhere in the shutdown process

0 commit comments

Comments
 (0)