Skip to content

Commit 2e105e5

Browse files
Merge pull request #13 from FireTail-io/dev
[MAIN] bug fixes
2 parents 56dbb09 + d8a17cd commit 2e105e5

File tree

7 files changed

+122
-60
lines changed

7 files changed

+122
-60
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ POC for a FireTail Kubernetes Sensor.
1010
| ----------------------------------------------- | --------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
1111
| `FIRETAIL_API_TOKEN` || `PS-02-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX` | The API token the sensor will use to report logs to FireTail |
1212
| `BPF_EXPRESSION` || `tcp and (port 80 or port 443)` | The BPF filter used by the sensor. See docs for syntax info: https://www.tcpdump.org/manpages/pcap-filter.7.html |
13-
| `DISABLE_SERVICE_IP_FILTERING` | | `true` | Disables polling Kubernetes for the IP addresses of services & subsequently ignoring all requests captured that aren't made to one of those IPs. |
13+
| `MAX_CONTENT_LENGTH` | | `1048576` | The sensor will only read request or response bodies if their length is less than `MAX_CONTENT_LENGTH` bytes. |
1414
| `ENABLE_ONLY_LOG_JSON` || `true` | Enables only logging requests where the content-type implies the payload should be JSON, or the payload is valid JSON regardless of the content-type. |
15-
| `ONLY_LOG_JSON_MAX_CONTENT_LENGTH` || `1048576` | When `ENABLE_ONLY_LOG_JSON` is `true`, the sensor will only read request or response bodies to check if they're valid JSON if their length is less than `ONLY_LOG_JSON_MAX_CONTENT_LENGTH` bytes. |
15+
| `DISABLE_SERVICE_IP_FILTERING` || `true` | Disables polling Kubernetes for the IP addresses of services & subsequently ignoring all requests captured that aren't made to one of those IPs. |
1616
| `FIRETAIL_API_URL` || `https://api.logging.eu-west-1.prod.firetail.app/logs/bulk` | The API url the sensor will send logs to. Defaults to the EU region production environment. |
1717
| `FIRETAIL_KUBERNETES_SENSOR_DEV_MODE` || `true` | Enables debug logging when set to `true`, and reduces the max age of a log in a batch to be sent to FireTail. |
1818
| `FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED` || `true` | Enables a demo web server when set to `true`; useful for sending test requests to. |

build_setup/Dockerfile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
FROM golang:1.24-bullseye
22
WORKDIR /src
33
RUN apt-get update && apt-get install -y --no-install-recommends libpcap-dev
4-
COPY ./src/ ./
4+
COPY ./src/go.* ./
55
RUN go mod download
6+
COPY ./src/ ./
67
RUN go build -o /dist/main .
78
RUN rm -rf /src/*
89
RUN chmod +x /dist/main

src/bidirectional_stream.go

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,17 @@ import (
1515
)
1616

1717
type bidirectionalStreamFactory struct {
18-
conns map[string]*bidirectionalStream
18+
conns *sync.Map
1919
requestAndResponseChannel *chan httpRequestAndResponse
20+
maxBodySize int64
2021
}
2122

2223
func (f *bidirectionalStreamFactory) New(netFlow, tcpFlow gopacket.Flow) tcpassembly.Stream {
2324
key := netFlow.FastHash() ^ tcpFlow.FastHash()
2425

2526
// The second time we see the same connection, it will be from the server to the client
26-
if conn, ok := f.conns[fmt.Sprint(key)]; ok {
27-
return &conn.serverToClient
27+
if conn, ok := f.conns.LoadAndDelete(fmt.Sprint(key)); ok {
28+
return &conn.(*bidirectionalStream).serverToClient
2829
}
2930

3031
s := &bidirectionalStream{
@@ -33,8 +34,12 @@ func (f *bidirectionalStreamFactory) New(netFlow, tcpFlow gopacket.Flow) tcpasse
3334
clientToServer: tcpreader.NewReaderStream(),
3435
serverToClient: tcpreader.NewReaderStream(),
3536
requestAndResponseChannel: f.requestAndResponseChannel,
37+
closeCallback: func() {
38+
f.conns.Delete(fmt.Sprint(key))
39+
},
40+
maxBodySize: f.maxBodySize,
3641
}
37-
f.conns[fmt.Sprint(key)] = s
42+
f.conns.Store(fmt.Sprint(key), s)
3843
go s.run()
3944

4045
// The first time we see the connection, it will be from the client to the server
@@ -46,9 +51,13 @@ type bidirectionalStream struct {
4651
clientToServer tcpreader.ReaderStream
4752
serverToClient tcpreader.ReaderStream
4853
requestAndResponseChannel *chan httpRequestAndResponse
54+
closeCallback func()
55+
maxBodySize int64
4956
}
5057

5158
func (s *bidirectionalStream) run() {
59+
defer s.closeCallback()
60+
5261
wg := sync.WaitGroup{}
5362
wg.Add(2)
5463

@@ -58,56 +67,47 @@ func (s *bidirectionalStream) run() {
5867
defer close(responseChannel)
5968

6069
go func() {
70+
defer wg.Done()
6171
defer func() {
6272
if r := recover(); r != nil {
63-
slog.Warn("Recovered from panic in clientToServer reader:", "Err", r)
73+
slog.Error("Recovered from panic in clientToServer reader:", "Err", r)
6474
}
65-
wg.Done()
6675
}()
67-
reader := bufio.NewReader(&s.clientToServer)
68-
for {
69-
request, err := http.ReadRequest(reader)
70-
if err == io.EOF {
71-
return
72-
} else if err != nil {
73-
continue
74-
}
75-
// RemoteAddr is not filled in by ReadRequest so we have to populate it ourselves
76-
request.RemoteAddr = fmt.Sprintf("%s:%s", s.net.Src().String(), s.transport.Src().String())
77-
responseBody := make([]byte, request.ContentLength)
78-
if request.ContentLength > 0 {
79-
io.ReadFull(request.Body, responseBody)
80-
}
81-
request.Body.Close()
82-
request.Body = io.NopCloser(bytes.NewReader(responseBody))
83-
requestChannel <- request
84-
76+
requestBytes := make([]byte, s.maxBodySize)
77+
bytesRead, err := io.ReadFull(&s.clientToServer, requestBytes)
78+
if err != nil && err != io.ErrUnexpectedEOF {
79+
slog.Debug("Failed to read request bytes from stream:", "Err", err.Error(), "BytesRead", bytesRead)
80+
return
8581
}
82+
request, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(requestBytes)))
83+
if err != nil {
84+
slog.Debug("Failed to read request bytes:", "Err", err.Error())
85+
return
86+
}
87+
// RemoteAddr is not filled in by ReadRequest so we have to populate it ourselves
88+
request.RemoteAddr = fmt.Sprintf("%s:%s", s.net.Src().String(), s.transport.Src().String())
89+
requestChannel <- request
8690
}()
8791

8892
go func() {
93+
defer wg.Done()
8994
defer func() {
9095
if r := recover(); r != nil {
91-
slog.Warn("Recovered from panic in serverToClient reader:", "Err", r)
96+
slog.Error("Recovered from panic in serverToClient reader:", "Err", r)
9297
}
93-
wg.Done()
9498
}()
95-
reader := bufio.NewReader(&s.serverToClient)
96-
for {
97-
response, err := http.ReadResponse(reader, nil)
98-
if err == io.ErrUnexpectedEOF {
99-
return
100-
} else if err != nil {
101-
continue
102-
}
103-
responseBody := make([]byte, response.ContentLength)
104-
if response.ContentLength > 0 {
105-
io.ReadFull(response.Body, responseBody)
106-
}
107-
response.Body.Close()
108-
response.Body = io.NopCloser(bytes.NewReader(responseBody))
109-
responseChannel <- response
99+
responseBytes := make([]byte, s.maxBodySize)
100+
bytesRead, err := io.ReadFull(&s.serverToClient, responseBytes)
101+
if err != nil && err != io.ErrUnexpectedEOF {
102+
slog.Debug("Failed to read response bytes from stream:", "Err", err.Error(), "BytesRead", bytesRead)
103+
return
104+
}
105+
response, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(responseBytes)), nil)
106+
if err != nil {
107+
slog.Debug("Failed to read response bytes:", "Err", err.Error())
108+
return
110109
}
110+
responseChannel <- response
111111
}()
112112

113113
wg.Wait()

src/is_json.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func isJson(reqAndResp *httpRequestAndResponse, maxContentLength int64) bool {
2727
if err != nil {
2828
return false
2929
}
30-
var v map[string]interface{}
30+
var v interface{}
3131
if json.Unmarshal(bodyBytes, &v) == nil {
3232
return true
3333
}
@@ -39,7 +39,7 @@ func isJson(reqAndResp *httpRequestAndResponse, maxContentLength int64) bool {
3939
if err != nil {
4040
return false
4141
}
42-
var v map[string]interface{}
42+
var v interface{}
4343
if json.Unmarshal(bodyBytes, &v) == nil {
4444
return true
4545
}

src/is_json_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,60 @@ func TestIsJson(t *testing.T) {
143143
maxContentLength: 1024,
144144
expectedResult: true,
145145
},
146+
{
147+
name: "Request payload is an array at the root",
148+
reqContentType: "",
149+
reqBody: ``,
150+
respContentType: "",
151+
respBody: `[{"key": "value"}]`,
152+
maxContentLength: 1024,
153+
expectedResult: true,
154+
},
155+
{
156+
name: "Request payload is an array at the root",
157+
reqContentType: "",
158+
reqBody: `[{"key": "value"}]`,
159+
respContentType: "",
160+
respBody: ``,
161+
maxContentLength: 1024,
162+
expectedResult: true,
163+
},
164+
{
165+
name: "Request payload is an string at the root",
166+
reqContentType: "",
167+
reqBody: `"foo"`,
168+
respContentType: "",
169+
respBody: ``,
170+
maxContentLength: 1024,
171+
expectedResult: true,
172+
},
173+
{
174+
name: "Request payload is a number at the root",
175+
reqContentType: "",
176+
reqBody: `3.14159`,
177+
respContentType: "",
178+
respBody: ``,
179+
maxContentLength: 1024,
180+
expectedResult: true,
181+
},
182+
{
183+
name: "Request payload is a bool at the root",
184+
reqContentType: "",
185+
reqBody: `true`,
186+
respContentType: "",
187+
respBody: ``,
188+
maxContentLength: 1024,
189+
expectedResult: true,
190+
},
191+
{
192+
name: "Request payload is null",
193+
reqContentType: "",
194+
reqBody: `null`,
195+
respContentType: "",
196+
respBody: ``,
197+
maxContentLength: 1024,
198+
expectedResult: true,
199+
},
146200
}
147201

148202
for _, tt := range tests {

src/main.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,26 +55,23 @@ func main() {
5555
}
5656

5757
var maxContentLength int64
58-
onlyLogJson, _ := strconv.ParseBool(os.Getenv("ENABLE_ONLY_LOG_JSON"))
59-
if onlyLogJson {
60-
maxContentLengthStr, maxContentLengthSet := os.LookupEnv("ONLY_LOG_JSON_MAX_CONTENT_LENGTH")
61-
if !maxContentLengthSet {
62-
slog.Info("ONLY_LOG_JSON_MAX_CONTENT_LENGTH environment variable not set, using default: 1MiB")
63-
maxContentLength = 1048576 // 1MiB
64-
} else {
65-
maxContentLength, err = strconv.ParseInt(maxContentLengthStr, 10, 64)
66-
if err != nil {
67-
slog.Error("Failed to parse ONLY_LOG_JSON_MAX_CONTENT_LENGTH, Defaulting to 1MiB.", "Err", err.Error())
68-
maxContentLength = 1048576 // 1MiB
69-
}
70-
}
58+
maxContentLengthStr, maxContentLengthSet := os.LookupEnv("MAX_CONTENT_LENGTH")
59+
if !maxContentLengthSet {
60+
slog.Info("MAX_CONTENT_LENGTH environment variable not set, using default: 1MiB")
61+
maxContentLength = 1048576 // 1MiB
62+
} else if maxContentLength, err = strconv.ParseInt(maxContentLengthStr, 10, 64); err != nil {
63+
slog.Error("Failed to parse MAX_CONTENT_LENGTH, Defaulting to 1MiB.", "Err", err.Error())
64+
maxContentLength = 1048576 // 1MiB
7165
}
7266

67+
onlyLogJson, _ := strconv.ParseBool(os.Getenv("ENABLE_ONLY_LOG_JSON"))
68+
7369
requestAndResponseChannel := make(chan httpRequestAndResponse, 1)
7470
httpRequestStreamer := &httpRequestAndResponseStreamer{
7571
bpfExpression: bpfExpression,
7672
requestAndResponseChannel: &requestAndResponseChannel,
7773
ipManager: ipManager,
74+
maxBodySize: maxContentLength,
7875
}
7976
go httpRequestStreamer.start()
8077

@@ -127,13 +124,20 @@ func main() {
127124
"SrcPort", requestAndResponse.srcPort,
128125
"DstPort", requestAndResponse.dstPort,
129126
)
127+
requestAndResponse.request.Header.Set(
128+
"Content-Length", strconv.Itoa(int(requestAndResponse.request.ContentLength)),
129+
)
130+
requestAndResponse.request.Header.Set("Host", requestAndResponse.request.Host)
130131
firetailMiddleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
131132
w.WriteHeader(requestAndResponse.response.StatusCode)
132133
for key, values := range requestAndResponse.response.Header {
133134
for _, value := range values {
134135
w.Header().Add(key, value)
135136
}
136137
}
138+
requestAndResponse.response.Header.Set(
139+
"Content-Length", strconv.Itoa(int(requestAndResponse.response.ContentLength)),
140+
)
137141
capturedResponseBody, err := io.ReadAll(requestAndResponse.response.Body)
138142
if err != nil {
139143
slog.Error("Error reading request body:", "err", err.Error())

src/request_and_response.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"log"
55
"log/slog"
66
"net/http"
7+
"sync"
78
"time"
89

910
"github.com/google/gopacket"
@@ -25,6 +26,7 @@ type httpRequestAndResponseStreamer struct {
2526
bpfExpression string
2627
requestAndResponseChannel *chan httpRequestAndResponse
2728
ipManager *serviceIpManager
29+
maxBodySize int64
2830
}
2931

3032
func (s *httpRequestAndResponseStreamer) start() {
@@ -42,8 +44,9 @@ func (s *httpRequestAndResponseStreamer) start() {
4244
assembler := tcpassembly.NewAssembler(
4345
tcpassembly.NewStreamPool(
4446
&bidirectionalStreamFactory{
45-
conns: make(map[string]*bidirectionalStream),
47+
conns: &sync.Map{},
4648
requestAndResponseChannel: s.requestAndResponseChannel,
49+
maxBodySize: s.maxBodySize,
4750
},
4851
),
4952
)

0 commit comments

Comments
 (0)