Skip to content

Commit d9d8015

Browse files
authored
Merge pull request #26 from ckong1991/tls-config
Add TLS config to kafka
2 parents c9eb9fd + 408f52c commit d9d8015

File tree

2 files changed

+33
-0
lines changed

2 files changed

+33
-0
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ session "" {
6060
}
6161
```
6262

63+
### Kafka
64+
65+
To connect to Kafka with TLS, set the SINK_KAFKA_CA_CERT_PATH to the path to your CA cert file
66+
67+
6368
## Usage
6469

6570
The `nomad-firehose` binary has several helper subcommands.

sink/kafka.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package sink
22

33
import (
4+
"crypto/tls"
5+
"crypto/x509"
46
"fmt"
7+
"io/ioutil"
58
"os"
69
"strings"
710
"time"
@@ -23,6 +26,25 @@ type KafkaSink struct {
2326
putCh chan []byte
2427
}
2528

29+
func createTlsConfiguration() (t *tls.Config) {
30+
caFile := os.Getenv("SINK_KAFKA_CA_CERT_PATH")
31+
if caFile != "" {
32+
caCert, err := ioutil.ReadFile(caFile)
33+
if err != nil {
34+
log.Fatal(err)
35+
}
36+
37+
caCertPool := x509.NewCertPool()
38+
caCertPool.AppendCertsFromPEM(caCert)
39+
40+
t = &tls.Config{
41+
RootCAs: caCertPool,
42+
}
43+
}
44+
45+
return t
46+
}
47+
2648
// NewKafka ...
2749
func NewKafka() (*KafkaSink, error) {
2850
brokers := os.Getenv("SINK_KAFKA_BROKERS")
@@ -42,6 +64,12 @@ func NewKafka() (*KafkaSink, error) {
4264
config := sarama.NewConfig()
4365
config.Producer.Return.Successes = true
4466

67+
tlsConfig := createTlsConfiguration()
68+
if tlsConfig != nil {
69+
config.Net.TLS.Config = tlsConfig
70+
config.Net.TLS.Enable = true
71+
}
72+
4573
producer, err := sarama.NewSyncProducer(brokerList, config)
4674
if err != nil {
4775
log.Fatal(err)

0 commit comments

Comments
 (0)