Skip to content

Commit 67f650e

Browse files
authored
Merge pull request #22 from amfern/mongo
add mongo as a possible sink
2 parents c765a49 + 6b37c79 commit 67f650e

File tree

4 files changed

+508
-1
lines changed

4 files changed

+508
-1
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ The sink type is configured using `$SINK_TYPE` environment variable. Valid value
7070
- `nsq`
7171
- `redis`
7272
- `kafka`
73+
- `mongo`
7374
- `stdout`
7475

7576
The `amqp` sink is configured using `$SINK_AMQP_CONNECTION` (`amqp://guest:guest@127.0.0.1:5672/`), `$SINK_AMQP_EXCHANGE` and `$SINK_AMQP_ROUTING_KEY`, `$SINK_AMQP_WORKERS` (default: `1`) environment variables.
@@ -82,6 +83,8 @@ The `redis` sink is configured using `$SINK_REDIS_URL` (`redis://[user]:[passwor
8283

8384
The `kafka` sink is configured using `$SINK_KAFKA_BROKERS` (`kafka1:9092,kafka2:9092,kafka3:9092`), and `$SINK_KAFKA_TOPIC` environment variables.
8485

86+
The `mongo` sink is configured using `$SINK_MONGODB_CONNECTION` (`mongodb://localhost:27017/`), `$SINK_MONGODB_DATABASE` and `$SINK_MONGODB_COLLECTION` environment variables.
87+
8588
The `stdout` sink does not have any configuration, it will simply output the JSON to stdout for debugging.
8689

8790
### `allocations`

sink/helper.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@ func GetSink() (Sink, error) {
2525
return NewNSQ()
2626
case "redis":
2727
return NewRedis()
28+
case "mongodb":
29+
return NewMongodb()
2830
case "stdout":
2931
return NewStdout()
3032
default:
31-
return nil, fmt.Errorf("Invalid SINK_TYPE: %s, Valid values: amqp, kafka, kinesis, nsq, rabbitmq, redis or stdout", sinkType)
33+
return nil, fmt.Errorf("Invalid SINK_TYPE: %s, Valid values: amqp, kafka, kinesis, nsq, rabbitmq, redis, mongodb or stdout", sinkType)
3234
}
3335
}

sink/mongodb.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package sink
2+
3+
import (
4+
"context"
5+
"strconv"
6+
"time"
7+
8+
"os"
9+
10+
"fmt"
11+
"encoding/json"
12+
13+
log "github.com/sirupsen/logrus"
14+
15+
mongo "github.com/mongodb/mongo-go-driver/mongo"
16+
)
17+
18+
// MongodbSink ...
19+
type MongodbSink struct {
20+
conn *mongo.Client
21+
database string
22+
collection string
23+
workerCount int
24+
stopCh chan interface{}
25+
putCh chan []byte
26+
}
27+
28+
// NewMongodb ...
29+
func NewMongodb() (*MongodbSink, error) {
30+
connStr := os.Getenv("SINK_MONGODB_CONNECTION")
31+
if connStr == "" {
32+
return nil, fmt.Errorf("[sink/mongodb] Missing SINK_MONGODB_CONNECTION (example: mongodb://foo:bar@localhost:27017)")
33+
}
34+
35+
database := os.Getenv("SINK_MONGODB_DATABASE")
36+
if database == "" {
37+
return nil, fmt.Errorf("[sink/mongodb] Mising SINK_MONGODB_DATABASE")
38+
}
39+
40+
collection := os.Getenv("SINK_MONGODB_COLLECTION")
41+
if collection == "" {
42+
return nil, fmt.Errorf("[sink/mongodb] Missing SINK_MONGODB_COLLECTION")
43+
}
44+
45+
workerCountStr := os.Getenv("SINK_MONGODB_WORKERS")
46+
if workerCountStr == "" {
47+
workerCountStr = "1"
48+
}
49+
workerCount, err := strconv.Atoi(workerCountStr)
50+
if err != nil {
51+
return nil, fmt.Errorf("Invalid SINK_MONGODB_WORKERS, must be an integer")
52+
}
53+
54+
conn, err := mongo.NewClient(connStr)
55+
if err != nil {
56+
return nil, fmt.Errorf("[sink/mongodb] Invalid to connect to string: %s", err)
57+
}
58+
59+
err = conn.Connect(context.Background())
60+
if err != nil {
61+
return nil, fmt.Errorf("[sink/mongodb] failed to connect to string: %s", err)
62+
}
63+
64+
65+
return &MongodbSink{
66+
conn: conn,
67+
database: database,
68+
collection: collection,
69+
workerCount: workerCount,
70+
stopCh: make(chan interface{}),
71+
putCh: make(chan []byte, 1000),
72+
}, nil
73+
}
74+
75+
// Start ...
76+
func (s *MongodbSink) Start() error {
77+
// Stop chan for all tasks to depend on
78+
s.stopCh = make(chan interface{})
79+
80+
for i := 0; i < s.workerCount; i++ {
81+
go s.write(i)
82+
}
83+
84+
// wait forever for a stop signal to happen
85+
for {
86+
select {
87+
case <-s.stopCh:
88+
break
89+
}
90+
break
91+
}
92+
93+
return nil
94+
}
95+
96+
// Stop ...
97+
func (s *MongodbSink) Stop() {
98+
log.Infof("[sink/mongodb] ensure writer queue is empty (%d messages left)", len(s.putCh))
99+
100+
for len(s.putCh) > 0 {
101+
log.Info("[sink/mongodb] Waiting for queue to drain - (%d messages left)", len(s.putCh))
102+
time.Sleep(1 * time.Second)
103+
}
104+
105+
close(s.stopCh)
106+
defer s.conn.Disconnect(context.Background())
107+
}
108+
109+
// Put ..
110+
func (s *MongodbSink) Put(data []byte) error {
111+
s.putCh <- data
112+
113+
return nil
114+
}
115+
116+
func (s *MongodbSink) write(id int) {
117+
log.Infof("[sink/mongodb/%d] Starting writer", id)
118+
119+
collection := s.conn.Database(s.database).Collection(s.collection)
120+
121+
for {
122+
select {
123+
case data := <-s.putCh:
124+
m := make(map[string]interface{})
125+
err := json.Unmarshal(data, &m)
126+
127+
if err != nil {
128+
log.Errorf("[sink/mongodb/%d] %s", id, err)
129+
continue
130+
}
131+
_, err = collection.InsertOne(context.Background(), m)
132+
if err != nil {
133+
log.Errorf("[sink/mongodb/%d] %s", id, err)
134+
} else {
135+
log.Debugf("[sink/mongodb/%d] publish ok", id)
136+
}
137+
}
138+
}
139+
}

0 commit comments

Comments
 (0)