Skip to content

Commit 1c96743

Browse files
authored
feat(kafka): add new package (#148)
* feat(kafka): add new package
1 parent 3f245dc commit 1c96743

File tree

12 files changed

+917
-0
lines changed

12 files changed

+917
-0
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
88
github.com/bugsnag/bugsnag-go v1.5.3
99
github.com/bugsnag/panicwrap v1.2.0 // indirect
10+
github.com/confluentinc/confluent-kafka-go v1.4.2
1011
github.com/go-chi/chi v4.0.2+incompatible
1112
github.com/gofrs/uuid v3.2.0+incompatible // indirect
1213
github.com/joho/godotenv v1.3.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ github.com/bugsnag/panicwrap v1.2.0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywR
2929
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
3030
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
3131
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
32+
github.com/confluentinc/confluent-kafka-go v1.4.2 h1:13EK9RTujF7lVkvHQ5Hbu6bM+Yfrq8L0MkJNnjHSd4Q=
33+
github.com/confluentinc/confluent-kafka-go v1.4.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
3234
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
3335
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
3436
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=

kafka/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# github.com/netlify/netlify-commons/kafka
2+
3+
Package kafka provides a Consumer and a Producer for basic Kafka operations.
4+
5+
It relies on https://github.com/confluentinc/confluent-kafka-go which is a Go wrapper on top of https://github.com/edenhill/librdkafka.
6+
This provides a reliable implementation, fully supported by the community, but also from Confluent, the creators of Kafka.
7+
8+
## Docs
9+
10+
Please find the generated **godoc** documentation including some examples in [pkg.go.dev](https://pkg.go.dev/mod/github.com/netlify/netlify-commons?tab=packages).
11+
12+
## TODO
13+
14+
- Support standalone consumers and not only consumers members of a consumer group.
15+
- Support seeking by timestamp (only offset is supported)
16+
- Integration tests

kafka/config.go

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/syslog"
7+
"strings"
8+
"time"
9+
10+
kafkalib "github.com/confluentinc/confluent-kafka-go/kafka"
11+
"github.com/pkg/errors"
12+
"github.com/sirupsen/logrus"
13+
)
14+
15+
// Supported auth types
16+
const (
17+
AuthTypePlain = "plain"
18+
AuthTypeSCRAM256 = "scram-sha256"
19+
AuthTypeSCRAM512 = "scram-sha512"
20+
)
21+
22+
// DefaultLogLevel is the log level Kafka producers/consumers will use if non set.
23+
const DefaultLogLevel = logrus.ErrorLevel
24+
25+
// Config holds all the configuration for this package.
26+
type Config struct {
27+
Brokers []string `json:"brokers"`
28+
Topic string `json:"topic"`
29+
Producer ProducerConfig `json:"producer"`
30+
Consumer ConsumerConfig `json:"consumer"`
31+
AuthType string `json:"auth" split_words:"true"`
32+
User string `json:"user"`
33+
Password string `json:"password"`
34+
CAPEMFile string `json:"ca_pem_file"`
35+
LogLevel string `json:"log_level" split_words:"true"`
36+
}
37+
38+
// baseKafkaConfig provides the base config that applies to both consumers and producers.
39+
func (c Config) baseKafkaConfig() *kafkalib.ConfigMap {
40+
logrusToSylogLevelMapping := map[logrus.Level]syslog.Priority{
41+
logrus.PanicLevel: syslog.LOG_EMERG, // Skipping LOG_ALERT, LOG_CRIT. LOG_EMERG has highest priority.
42+
logrus.FatalLevel: syslog.LOG_EMERG, // Skipping LOG_ALERT, LOG_CRIT. LOG_EMERG has highest priority.
43+
logrus.ErrorLevel: syslog.LOG_ERR,
44+
logrus.WarnLevel: syslog.LOG_WARNING,
45+
logrus.InfoLevel: syslog.LOG_NOTICE, // Skipping LOG_INFO. LOG_NOTICE has highest priority.
46+
logrus.DebugLevel: syslog.LOG_DEBUG,
47+
logrus.TraceLevel: syslog.LOG_DEBUG,
48+
}
49+
50+
logLevel, err := logrus.ParseLevel(c.LogLevel)
51+
if err != nil {
52+
logLevel = DefaultLogLevel
53+
}
54+
55+
// See Reference at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
56+
kafkaConf := &kafkalib.ConfigMap{
57+
"bootstrap.servers": strings.Join(c.Brokers, ","),
58+
"socket.keepalive.enable": true,
59+
"log_level": int(logrusToSylogLevelMapping[logLevel]),
60+
}
61+
62+
if logLevel == logrus.DebugLevel {
63+
_ = kafkaConf.SetKey("debug", "consumer,broker,topic,msg")
64+
}
65+
66+
return kafkaConf
67+
}
68+
69+
// ConsumerConfig holds the specific configuration for a consumer.
70+
type ConsumerConfig struct {
71+
GroupID string `json:"group_id" split_words:"true"`
72+
}
73+
74+
// Apply applies the specific configuration for a consumer.
75+
func (c ConsumerConfig) Apply(kafkaConf *kafkalib.ConfigMap) {
76+
if id := c.GroupID; id != "" {
77+
_ = kafkaConf.SetKey("group.id", id)
78+
}
79+
}
80+
81+
// ProducerConfig holds the specific configuration for a producer.
82+
type ProducerConfig struct {
83+
FlushPeriod time.Duration `json:"flush_period" split_words:"true"`
84+
BatchSize int `json:"batch_size" split_words:"true"`
85+
DeliveryTimeout time.Duration `json:"delivery_timeout" split_words:"true"`
86+
}
87+
88+
// Apply applies the specific configuration for a producer.
89+
func (c ProducerConfig) Apply(kafkaConf *kafkalib.ConfigMap) {
90+
if timeout := c.DeliveryTimeout; timeout > 0 {
91+
_ = kafkaConf.SetKey("delivery.timeout.ms", int(timeout.Milliseconds()))
92+
}
93+
94+
if size := c.BatchSize; size > 0 {
95+
_ = kafkaConf.SetKey("queue.buffering.max.messages", size)
96+
}
97+
98+
if period := c.FlushPeriod; period > 0 {
99+
_ = kafkaConf.SetKey("queue.buffering.max.ms", int(period.Milliseconds()))
100+
}
101+
}
102+
103+
func (c Config) configureAuth(configMap *kafkalib.ConfigMap) error {
104+
switch c.AuthType {
105+
case "":
106+
// No auth mechanism
107+
return nil
108+
case AuthTypePlain:
109+
_ = configMap.SetKey("security.protocol", "sasl_plain")
110+
_ = configMap.SetKey("sasl.mechanism", "PLAIN")
111+
_ = configMap.SetKey("sasl.username", c.User)
112+
_ = configMap.SetKey("sasl.password", c.Password)
113+
case AuthTypeSCRAM256:
114+
_ = configMap.SetKey("security.protocol", "sasl_ssl")
115+
_ = configMap.SetKey("sasl.mechanism", "SCRAM-SHA-256")
116+
_ = configMap.SetKey("sasl.username", c.User)
117+
_ = configMap.SetKey("sasl.password", c.Password)
118+
case AuthTypeSCRAM512:
119+
_ = configMap.SetKey("security.protocol", "sasl_ssl")
120+
_ = configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512")
121+
_ = configMap.SetKey("sasl.username", c.User)
122+
_ = configMap.SetKey("sasl.password", c.Password)
123+
default:
124+
return fmt.Errorf("unknown auth type: %s", c.AuthType)
125+
}
126+
127+
if c.CAPEMFile != "" {
128+
_ = configMap.SetKey("ssl.ca.location", c.CAPEMFile)
129+
}
130+
131+
return nil
132+
}
133+
134+
// ConfigOpt configures Kafka consumers and producers.
135+
type ConfigOpt func(c *kafkalib.ConfigMap)
136+
137+
// WithLogger adds a logger to a Kafka consumer or producer.
138+
func WithLogger(ctx context.Context, log logrus.FieldLogger) ConfigOpt {
139+
return func(c *kafkalib.ConfigMap) {
140+
141+
syslogToLogrusLevelMapping := map[syslog.Priority]logrus.Level{
142+
// We don't want to let the app to panic so considering Error Level as the highest severity.
143+
syslog.LOG_EMERG: logrus.ErrorLevel,
144+
syslog.LOG_ALERT: logrus.ErrorLevel,
145+
syslog.LOG_CRIT: logrus.ErrorLevel,
146+
syslog.LOG_ERR: logrus.ErrorLevel,
147+
syslog.LOG_WARNING: logrus.WarnLevel,
148+
syslog.LOG_NOTICE: logrus.InfoLevel,
149+
syslog.LOG_INFO: logrus.InfoLevel,
150+
syslog.LOG_DEBUG: logrus.DebugLevel,
151+
}
152+
153+
// Forward logs to a channel.
154+
logsChan := make(chan kafkalib.LogEvent, 10000)
155+
_ = c.SetKey("go.logs.channel.enable", true)
156+
_ = c.SetKey("go.logs.channel", logsChan)
157+
158+
// Read from channel and print logs using the provided logger.
159+
go func() {
160+
defer close(logsChan)
161+
for {
162+
select {
163+
case <-ctx.Done():
164+
return
165+
case m, ok := <-logsChan:
166+
if !ok {
167+
return
168+
}
169+
l := log.WithFields(logrus.Fields{
170+
"kafka_context": m.Tag,
171+
"kafka_client": m.Name,
172+
}).WithTime(m.Timestamp)
173+
174+
logrusLevel := syslogToLogrusLevelMapping[syslog.Priority(m.Level)]
175+
switch logrusLevel {
176+
case logrus.ErrorLevel:
177+
l.WithError(errors.New(m.Message)).Error("Error in Kafka Consumer")
178+
default:
179+
l.Log(logrusLevel, m.Message)
180+
}
181+
}
182+
}
183+
}()
184+
}
185+
}
186+
187+
// WithConsumerGroupID sets the Consumer consumer group ID.
188+
func WithConsumerGroupID(groupID string) ConfigOpt {
189+
return func(c *kafkalib.ConfigMap) {
190+
_ = c.SetKey("group.id", groupID)
191+
}
192+
}

0 commit comments

Comments
 (0)