Skip to content

Commit 37289ed

Browse files
authored
Merge pull request #27 from amfern/http-sink
add http sink
2 parents d9d8015 + fd06f8e commit 37289ed

File tree

3 files changed

+109
-2
lines changed

3 files changed

+109
-2
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ The `kafka` sink is configured using `$SINK_KAFKA_BROKERS` (`kafka1:9092,kafka2:
9090

9191
The `mongo` sink is configured using `$SINK_MONGODB_CONNECTION` (`mongodb://localhost:27017/`), `$SINK_MONGODB_DATABASE` and `$SINK_MONGODB_COLLECTION` environment variables.
9292

93+
The `http` sink is configured using `$SINK_HTTP_ADDRESS` (`localhost:8080/allocations`)` environment variable.
94+
9395
The `stdout` sink does not have any configuration, it will simply output the JSON to stdout for debugging.
9496

9597
### `allocations`

sink/helper.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
func GetSink() (Sink, error) {
1010
sinkType := os.Getenv("SINK_TYPE")
1111
if sinkType == "" {
12-
return nil, fmt.Errorf("Missing SINK_TYPE: amqp, kafka, kinesis, nsq, rabbitmq, redis or stdout")
12+
return nil, fmt.Errorf("Missing SINK_TYPE: amqp, kafka, kinesis, nsq, rabbitmq, redis, mongodb, http or stdout")
1313
}
1414

1515
switch sinkType {
@@ -27,9 +27,11 @@ func GetSink() (Sink, error) {
2727
return NewRedis()
2828
case "mongodb":
2929
return NewMongodb()
30+
case "http":
31+
return NewHttp()
3032
case "stdout":
3133
return NewStdout()
3234
default:
33-
return nil, fmt.Errorf("Invalid SINK_TYPE: %s, Valid values: amqp, kafka, kinesis, nsq, rabbitmq, redis, mongodb or stdout", sinkType)
35+
return nil, fmt.Errorf("Invalid SINK_TYPE: %s, Valid values: amqp, kafka, kinesis, nsq, rabbitmq, redis, mongodb, http or stdout", sinkType)
3436
}
3537
}

sink/http.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package sink
2+
3+
import (
4+
"strconv"
5+
"time"
6+
"bytes"
7+
8+
"net/http"
9+
10+
"os"
11+
12+
"fmt"
13+
14+
log "github.com/sirupsen/logrus"
15+
)
16+
17+
// HttpSink ...
18+
type HttpSink struct {
19+
address string
20+
workerCount int
21+
stopCh chan interface{}
22+
putCh chan []byte
23+
}
24+
25+
// NewHttp ...
26+
func NewHttp() (*HttpSink, error) {
27+
address := os.Getenv("SINK_HTTP_ADDRESS")
28+
if address == "" {
29+
return nil, fmt.Errorf("[sink/http] Missing SINK_HTTP_ADDRESS (example: http://miau.com:8080/biau)")
30+
}
31+
32+
workerCountStr := os.Getenv("SINK_WORKER_COUNT")
33+
if workerCountStr == "" {
34+
workerCountStr = "1"
35+
}
36+
workerCount, err := strconv.Atoi(workerCountStr)
37+
if err != nil {
38+
return nil, fmt.Errorf("Invalid SINK_WORKER_COUNT, must be an integer")
39+
}
40+
41+
return &HttpSink{
42+
address: address,
43+
workerCount: workerCount,
44+
stopCh: make(chan interface{}),
45+
putCh: make(chan []byte, 1000),
46+
}, nil
47+
}
48+
49+
// Start ...
50+
func (s *HttpSink) Start() error {
51+
// Stop chan for all tasks to depend on
52+
s.stopCh = make(chan interface{})
53+
54+
for i := 0; i < s.workerCount; i++ {
55+
go s.send(i)
56+
}
57+
58+
// wait forever for a stop signal to happen
59+
for {
60+
select {
61+
case <-s.stopCh:
62+
break
63+
}
64+
break
65+
}
66+
67+
return nil
68+
}
69+
70+
// Stop ...
71+
func (s *HttpSink) Stop() {
72+
log.Infof("[sink/http] ensure writer queue is empty (%d messages left)", len(s.putCh))
73+
74+
for len(s.putCh) > 0 {
75+
log.Info("[sink/http] Waiting for queue to drain - (%d messages left)", len(s.putCh))
76+
time.Sleep(1 * time.Second)
77+
}
78+
79+
close(s.stopCh)
80+
}
81+
82+
// Put ..
83+
func (s *HttpSink) Put(data []byte) error {
84+
s.putCh <- data
85+
86+
return nil
87+
}
88+
89+
func (s *HttpSink) send(id int) {
90+
log.Infof("[sink/http/%d] Starting writer", id)
91+
92+
for {
93+
select {
94+
case data := <-s.putCh:
95+
_, err := http.Post(s.address, "text/json", bytes.NewReader(data))
96+
if err != nil {
97+
log.Errorf("[sink/http/%d] %s", id, err)
98+
} else {
99+
log.Debugf("[sink/http/%d] publish ok", id)
100+
}
101+
}
102+
}
103+
}

0 commit comments

Comments
 (0)