11package jobs
22
33import (
4- "encoding/json"
54 "fmt"
65 "time"
76
@@ -10,8 +9,12 @@ import (
109 log "github.com/sirupsen/logrus"
1110)
1211
12+
13+ type WatchJobListFunc func(job *nomad.JobListStub)
14+
15+
1316// Firehose ...
14- type Firehose struct {
17+ type FirehoseBase struct {
1518 lastChangeIndex uint64
1619 lastChangeTimeCh chan interface{}
1720 nomadClient *nomad.Client
@@ -20,7 +23,7 @@ type Firehose struct {
2023}
2124
2225// NewFirehose ...
23- func NewFirehose () (*Firehose , error) {
26+ func NewFirehoseBase () (*FirehoseBase , error) {
2427 nomadClient, err := nomad.NewClient(nomad.DefaultConfig())
2528 if err != nil {
2629 return nil, err
@@ -31,23 +34,19 @@ func NewFirehose() (*Firehose, error) {
3134 return nil, err
3235 }
3336
34- return &Firehose {
37+ return &FirehoseBase {
3538 nomadClient: nomadClient,
3639 sink: sink,
3740 stopCh: make(chan struct{}, 1),
3841 lastChangeTimeCh: make(chan interface{}, 1),
3942 }, nil
4043}
4144
42- func (f *Firehose) Name() string {
43- return "jobs"
44- }
45-
46- func (f *Firehose) UpdateCh() <-chan interface{} {
45+ func (f *FirehoseBase) UpdateCh() <-chan interface{} {
4746 return f.lastChangeTimeCh
4847}
4948
50- func (f *Firehose ) SetRestoreValue(restoreValue interface{}) error {
49+ func (f *FirehoseBase ) SetRestoreValue(restoreValue interface{}) error {
5150 switch restoreValue.(type) {
5251 case int:
5352 f.lastChangeIndex = uint64(restoreValue.(int))
@@ -60,11 +59,11 @@ func (f *Firehose) SetRestoreValue(restoreValue interface{}) error {
6059}
6160
6261// Start the firehose
63- func (f *Firehose ) Start() {
62+ func (f *FirehoseBase ) Start(w WatchJobListFunc ) {
6463 go f.sink.Start()
6564
6665 // watch for allocation changes
67- go f.watch()
66+ go f.watch(w )
6867
6968 // Save the last event time every 5s
7069 go f.persistLastChangeTime(5 * time.Second)
@@ -77,15 +76,15 @@ func (f *Firehose) Start() {
7776}
7877
7978// Stop the firehose
80- func (f *Firehose ) Stop() {
79+ func (f *FirehoseBase ) Stop() {
8180 close(f.stopCh)
8281 f.sink.Stop()
8382}
8483
8584// Write the Last Change Time to Consul so if the process restarts,
8685// it will try to resume from where it left off, not emitting tons of double events for
8786// old events
88- func (f *Firehose ) persistLastChangeTime(interval time.Duration) {
87+ func (f *FirehoseBase ) persistLastChangeTime(interval time.Duration) {
8988 ticker := time.NewTicker(interval)
9089
9190 for {
@@ -99,18 +98,8 @@ func (f *Firehose) persistLastChangeTime(interval time.Duration) {
9998 }
10099}
101100
102- // Publish an update from the firehose
103- func (f *Firehose) Publish(update *nomad.Job) {
104- b, err := json.Marshal(update)
105- if err != nil {
106- log.Error(err)
107- }
108-
109- f.sink.Put(b)
110- }
111-
112101// Continously watch for changes to the allocation list and publish it as updates
113- func (f *Firehose ) watch() {
102+ func (f *FirehoseBase ) watch(w WatchJobListFunc ) {
114103 q := &nomad.QueryOptions{
115104 WaitIndex: f.lastChangeIndex,
116105 WaitTime: 5 * time.Minute,
@@ -148,15 +137,7 @@ func (f *Firehose) watch() {
148137 newMax = job.ModifyIndex
149138 }
150139
151- go func(jobID string) {
152- fullJob, _, err := f.nomadClient.Jobs().Info(jobID, &nomad.QueryOptions{})
153- if err != nil {
154- log.Errorf("Could not read job %s: %s", jobID, err)
155- return
156- }
157-
158- f.Publish(fullJob)
159- }(job.ID)
140+ w(job)
160141 }
161142
162143 // Update WaitIndex and Last Change Time for next iteration
0 commit comments