Skip to content

Commit 3713a2e

Browse files
committed
Add logs on kafka push
1 parent 2c2c17e commit 3713a2e

File tree

4 files changed

+31
-10
lines changed

4 files changed

+31
-10
lines changed

trafficUtil/kafkaUtil/kafka.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func InitKafka() {
8686

8787
out, _ := json.Marshal(value)
8888
ctx := context.Background()
89-
err := ProduceStr(ctx, string(out))
89+
err := ProduceStr(ctx, string(out), "testKafkaConnection", "testKafkaConnectionHost")
9090
utils.PrintLog("logging kafka stats post pushing message")
9191
LogKafkaStats()
9292
if err != nil {
@@ -274,7 +274,7 @@ func ProduceLogs(ctx context.Context, message string, logType string) error {
274274
return nil
275275
}
276276

277-
func ProduceStr(ctx context.Context, message string) error {
277+
func ProduceStr(ctx context.Context, message string, url, reqHost string ) error {
278278
// initialize the writer with the broker addresses, and the topic
279279
topic := "akto.api.logs"
280280
msg := kafka.Message{
@@ -288,6 +288,8 @@ func ProduceStr(ctx context.Context, message string) error {
288288
slog.Error("ERROR while writing messages", "topic", topic, "error", err)
289289
return err
290290
}
291+
checkDebugUrlAndPrint(url, reqHost, "Kafka write successful: " + message)
292+
291293
return nil
292294
}
293295

trafficUtil/kafkaUtil/parser.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d
504504
} else {
505505
// Produce to kafka
506506
// TODO : remove and use protobuf instead
507-
go ProduceStr(ctx, string(out))
507+
go ProduceStr(ctx, string(out), url, req.Host)
508508
go Produce(ctx, payload)
509509
}
510510

trafficUtil/kafkaUtil/podinformer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,7 @@ func (w *PodInformer) handlePodAdd(obj interface{}) {
355355
slog.Debug("Pod added:", "namespace", pod.Namespace, "podName", pod.Name)
356356
w.podNameLabelsMap.Store(pod.Name, pod.Labels)
357357
// Build the PID to Hostname map again to ensure it is up-to-date
358+
// TODO: Optimize this ? What's the rate of pod add events?
358359
w.BuildPidHostNameMap()
359360
go ProducePodMapping(context.Background(), pod.Name)
360361
}
@@ -384,5 +385,6 @@ func (w *PodInformer) handlePodDelete(obj interface{}) {
384385
slog.Debug("Pod deleted:", "namespace", pod.Namespace, "podName", pod.Name)
385386
w.podNameLabelsMap.Delete(pod.Name)
386387
// Build the PID to Hostname map again to ensure it is up-to-date
388+
// TODO: Optimize this ? What's the rate of pod add events?
387389
w.BuildPidHostNameMap()
388390
}

trafficUtil/utils/logger.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,14 @@ func HasLogIntervalPassed() bool {
7777
return true
7878
}
7979

80-
var fileHandlers = make(map[string]*os.File)
80+
type textLogger struct {
81+
handler *os.File
82+
lastWriteTime int64
83+
}
84+
85+
var fileHandlers = make(map[string]*textLogger)
8186
const HOST_MAPPING_PATH = "/ebpf/logs/akto/"
87+
const LOG_ROTATE_INTERVAL = 60 * 5 // 5 minutes
8288

8389
const (
8490
OSPidLogFile = HOST_MAPPING_PATH + "ospidlog.txt"
@@ -89,7 +95,10 @@ const (
8995

9096
func SetupFileLogger(filePath string) {
9197
file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
92-
fileHandlers[filePath] = file
98+
fileHandlers[filePath] = &textLogger{
99+
handler: file,
100+
lastWriteTime: time.Now().Unix(),
101+
}
93102
if err != nil {
94103
slog.Error("Failed to open log file", "filePath", filePath, "error", err)
95104
return
@@ -99,16 +108,24 @@ func SetupFileLogger(filePath string) {
99108
}
100109

101110
func LogToSpecificFile(filePath string, message string, args ...any) {
102-
handler, exists := fileHandlers[filePath]
111+
textLogger, exists := fileHandlers[filePath]
103112
if !exists {
104113
slog.Error("Logger not initialized for", "filePath", filePath)
105114
return
106115
}
107-
if _, err := handler.WriteString(message); err != nil {
116+
now := time.Now().Unix()
117+
if now-textLogger.lastWriteTime > LOG_ROTATE_INTERVAL {
118+
// empty the file contents
119+
if err := os.Truncate(filePath, 0); err != nil {
120+
slog.Error("Failed to truncate log file", "filePath", filePath, "error", err)
121+
}
122+
slog.Debug("Truncated log file due to rotation interval", "filePath", filePath)
123+
}
124+
textLogger.lastWriteTime = now
125+
if _, err := textLogger.handler.WriteString(message); err != nil {
108126
slog.Error("Failed to write to log file", "filePath", filePath, "error", err)
109127
return
110128
}
111-
112129
}
113130

114131
func SetupAllFileLoggers() {
@@ -122,8 +139,8 @@ func SetupAllFileLoggers() {
122139
}
123140

124141
func CloseAllFileLoggers() {
125-
for filePath, file := range fileHandlers {
126-
err := file.Close()
142+
for filePath, textLogger := range fileHandlers {
143+
err := textLogger.handler.Close()
127144
if err != nil {
128145
slog.Error("Failed to close log file", "filePath", filePath, "error", err)
129146
continue

0 commit comments

Comments
 (0)