Skip to content

Commit 101e3b3

Browse files
authored
fix(kafkaclient): do not close logs channel (#180)
1 parent 40ea6fa commit 101e3b3

File tree

2 files changed

+17
-25
lines changed

2 files changed

+17
-25
lines changed

kafka/config.go

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package kafka
22

33
import (
4-
"context"
54
"fmt"
65
"log/syslog"
76
"strings"
@@ -153,7 +152,7 @@ func (c Config) configureAuth(configMap *kafkalib.ConfigMap) error {
153152
type ConfigOpt func(c *kafkalib.ConfigMap)
154153

155154
// WithLogger adds a logger to a Kafka consumer or producer.
156-
func WithLogger(ctx context.Context, log logrus.FieldLogger) ConfigOpt {
155+
func WithLogger(log logrus.FieldLogger) ConfigOpt {
157156
return func(c *kafkalib.ConfigMap) {
158157

159158
syslogToLogrusLevelMapping := map[syslog.Priority]logrus.Level{
@@ -175,27 +174,20 @@ func WithLogger(ctx context.Context, log logrus.FieldLogger) ConfigOpt {
175174

176175
// Read from channel and print logs using the provided logger.
177176
go func() {
178-
defer close(logsChan)
179-
for {
180-
select {
181-
case <-ctx.Done():
182-
return
183-
case m, ok := <-logsChan:
184-
if !ok {
185-
return
186-
}
187-
l := log.WithFields(logrus.Fields{
188-
"kafka_context": m.Tag,
189-
"kafka_client": m.Name,
190-
}).WithTime(m.Timestamp)
191-
192-
logrusLevel := syslogToLogrusLevelMapping[syslog.Priority(m.Level)]
193-
switch logrusLevel {
194-
case logrus.ErrorLevel:
195-
l.WithError(errors.New(m.Message)).Error("Error in Kafka Consumer")
196-
default:
197-
l.Log(logrusLevel, m.Message)
198-
}
177+
// Do not close logsChan because confluent-kafka-go will send logs until we close the client.
178+
// Otherwise it will panic trying to send messages to a closed channel.
179+
for m := range logsChan {
180+
l := log.WithFields(logrus.Fields{
181+
"kafka_context": m.Tag,
182+
"kafka_client": m.Name,
183+
}).WithTime(m.Timestamp)
184+
185+
logrusLevel := syslogToLogrusLevelMapping[syslog.Priority(m.Level)]
186+
switch logrusLevel {
187+
case logrus.ErrorLevel:
188+
l.WithError(errors.New(m.Message)).Error("Error in Kafka Consumer")
189+
default:
190+
l.Log(logrusLevel, m.Message)
199191
}
200192
}
201193
}()

kafka/integration_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func TestIntegration(t *testing.T) {
5050
}
5151

5252
// create the producer
53-
p, err := NewProducer(conf, WithLogger(ctx, log), WithPartitionerAlgorithm(PartitionerMurMur2))
53+
p, err := NewProducer(conf, WithLogger(log), WithPartitionerAlgorithm(PartitionerMurMur2))
5454
assert.NoError(err)
5555
assert.NotNil(p)
5656

@@ -131,7 +131,7 @@ func TestIntegration(t *testing.T) {
131131
val := "gotestval"
132132

133133
// create the producer
134-
p, err := NewProducer(conf, WithLogger(ctx, log), WithPartitionerAlgorithm(PartitionerMurMur2))
134+
p, err := NewProducer(conf, WithLogger(log), WithPartitionerAlgorithm(PartitionerMurMur2))
135135
assert.NoError(err)
136136
assert.NotNil(p)
137137

0 commit comments

Comments
 (0)