Skip to content

Commit 4e8528c

Browse files
authored
Merge pull request #860 from google/exporter-shutdown
Explicitly stop the `Exporter` when main routines are done.
2 parents c1e84ca + 64c82f4 commit 4e8528c

File tree

8 files changed

+53
-64
lines changed

8 files changed

+53
-64
lines changed

cmd/mtail/main.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"os/signal"
1212
"runtime"
1313
"strings"
14-
"sync"
1514
"syscall"
1615
"time"
1716

@@ -249,23 +248,22 @@ func main() {
249248
if *oneShot {
250249
switch *oneShotFormat {
251250
case "prometheus":
252-
var wg sync.WaitGroup
253-
e, err := exporter.New(ctx, &wg, store, eOpts...)
251+
e, err := exporter.New(ctx, store, eOpts...)
254252
if err != nil {
255253
glog.Error(err)
256254
cancel()
257-
wg.Wait()
255+
e.Stop()
258256
os.Exit(1) //nolint:gocritic // false positive
259257
}
260258
err = e.Write(os.Stdout)
261259
if err != nil {
262260
glog.Error(err)
263261
cancel()
264-
wg.Wait()
262+
e.Stop()
265263
os.Exit(1) //nolint:gocritic // false positive
266264
}
267265
cancel()
268-
wg.Wait()
266+
e.Stop()
269267
os.Exit(0) //nolint:gocritic // false positive
270268
case "json":
271269
err = store.WriteMetrics(os.Stdout)

internal/exporter/export.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ var (
3131
// Exporter manages the export of metrics to passive and active collectors.
3232
type Exporter struct {
3333
ctx context.Context
34+
cancelFunc context.CancelFunc
3435
wg sync.WaitGroup
3536
store *metrics.Store
3637
pushInterval time.Duration
@@ -40,6 +41,7 @@ type Exporter struct {
4041
exportDisabled bool
4142
pushTargets []pushOptions
4243
initDone chan struct{}
44+
shutdownDone chan struct{}
4345
}
4446

4547
// Option configures a new Exporter.
@@ -83,24 +85,19 @@ func DisableExport() Option {
8385
}
8486
}
8587

86-
var (
87-
ErrNeedsStore = errors.New("exporter needs a Store")
88-
ErrNeedsWaitgroup = errors.New("exporter needs a WaitGroup")
89-
)
88+
var ErrNeedsStore = errors.New("exporter needs a Store")
9089

9190
// New creates a new Exporter.
92-
func New(ctx context.Context, wg *sync.WaitGroup, store *metrics.Store, options ...Option) (*Exporter, error) {
91+
func New(ctx context.Context, store *metrics.Store, options ...Option) (*Exporter, error) {
9392
if store == nil {
9493
return nil, ErrNeedsStore
9594
}
96-
if wg == nil {
97-
return nil, ErrNeedsWaitgroup
98-
}
9995
e := &Exporter{
100-
ctx: ctx,
101-
store: store,
102-
initDone: make(chan struct{}),
96+
store: store,
97+
initDone: make(chan struct{}),
98+
shutdownDone: make(chan struct{}),
10399
}
100+
e.ctx, e.cancelFunc = context.WithCancel(ctx)
104101
defer close(e.initDone)
105102
if err := e.SetOption(options...); err != nil {
106103
return nil, err
@@ -128,20 +125,25 @@ func New(ctx context.Context, wg *sync.WaitGroup, store *metrics.Store, options
128125
}
129126
e.StartMetricPush()
130127

131-
wg.Add(1)
132128
// This routine manages shutdown of the Exporter.
133129
go func() {
134-
defer wg.Done()
135130
<-e.initDone
136131
// Wait for the context to be completed before waiting for subroutines.
137132
if !e.exportDisabled {
138133
<-e.ctx.Done()
139134
}
140135
e.wg.Wait()
136+
close(e.shutdownDone)
141137
}()
142138
return e, nil
143139
}
144140

141+
// Stop instructs the exporter to shut down. The function returns once the exporter has finished.
142+
func (e *Exporter) Stop() {
143+
e.cancelFunc()
144+
<-e.shutdownDone
145+
}
146+
145147
// SetOption takes one or more option functions and applies them in order to Exporter.
146148
func (e *Exporter) SetOption(options ...Option) error {
147149
for _, option := range options {

internal/exporter/export_test.go

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"reflect"
1010
"sort"
1111
"strings"
12-
"sync"
1312
"testing"
1413
"time"
1514

@@ -22,37 +21,26 @@ const prefix = "prefix"
2221

2322
func TestCreateExporter(t *testing.T) {
2423
ctx, cancel := context.WithCancel(context.Background())
25-
var wg sync.WaitGroup
24+
defer cancel()
2625
store := metrics.NewStore()
27-
_, err := New(ctx, &wg, store)
26+
27+
e, err := New(ctx, store)
2828
if err != nil {
29-
t.Errorf("New(ctx, wg, store) unexpected error: %v", err)
29+
t.Errorf("New(ctx, store) unexpected error: %v", err)
3030
}
31-
cancel()
32-
wg.Wait()
33-
ctx, cancel = context.WithCancel(context.Background())
31+
e.Stop()
32+
3433
failopt := func(*Exporter) error {
3534
return errors.New("busted") // nolint:goerr113
3635
}
37-
_, err = New(ctx, &wg, store, failopt)
36+
_, err = New(ctx, store, failopt)
3837
if err == nil {
39-
t.Errorf("unexpected success")
38+
t.Error("New(ctx, store, fail) -> unexpected success")
4039
}
41-
cancel()
42-
wg.Wait()
43-
}
4440

45-
func TestNewErrors(t *testing.T) {
46-
ctx := context.Background()
47-
store := metrics.NewStore()
48-
var wg sync.WaitGroup
49-
_, err := New(ctx, nil, store)
50-
if err == nil {
51-
t.Error("New(ctx, nil, store) expecting error, received nil")
52-
}
53-
_, err = New(ctx, &wg, nil)
41+
_, err = New(ctx, nil)
5442
if err == nil {
55-
t.Error("New(ctx, wg, nil) expecting error, received nil")
43+
t.Error("New(ctx, nil) -> nil, expecting error")
5644
}
5745
}
5846

internal/exporter/graphite_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,13 @@ func TestHandleGraphite(t *testing.T) {
4747
tc := tc
4848
t.Run(tc.name, func(t *testing.T) {
4949
ctx, cancel := context.WithCancel(context.Background())
50-
var wg sync.WaitGroup
50+
defer cancel()
51+
5152
ms := metrics.NewStore()
5253
for _, metric := range tc.metrics {
5354
testutil.FatalIfErr(t, ms.Add(metric))
5455
}
55-
e, err := New(ctx, &wg, ms, Hostname("gunstar"))
56+
e, err := New(ctx, ms, Hostname("gunstar"))
5657
testutil.FatalIfErr(t, err)
5758
response := httptest.NewRecorder()
5859
e.HandleGraphite(response, &http.Request{})
@@ -64,8 +65,7 @@ func TestHandleGraphite(t *testing.T) {
6465
t.Errorf("failed to read response %s", err)
6566
}
6667
testutil.ExpectNoDiff(t, tc.expected, string(b), testutil.IgnoreUnexported(sync.RWMutex{}))
67-
cancel()
68-
wg.Wait()
68+
e.Stop()
6969
})
7070
}
7171
}

internal/exporter/json_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,13 @@ func TestHandleJSON(t *testing.T) {
141141
tc := tc
142142
t.Run(tc.name, func(t *testing.T) {
143143
ctx, cancel := context.WithCancel(context.Background())
144-
var wg sync.WaitGroup
144+
defer cancel()
145+
145146
ms := metrics.NewStore()
146147
for _, metric := range tc.metrics {
147148
testutil.FatalIfErr(t, ms.Add(metric))
148149
}
149-
e, err := New(ctx, &wg, ms, Hostname("gunstar"))
150+
e, err := New(ctx, ms, Hostname("gunstar"))
150151
testutil.FatalIfErr(t, err)
151152
response := httptest.NewRecorder()
152153
e.HandleJSON(response, &http.Request{})
@@ -158,8 +159,8 @@ func TestHandleJSON(t *testing.T) {
158159
t.Errorf("failed to read response: %s", err)
159160
}
160161
testutil.ExpectNoDiff(t, tc.expected, string(b), testutil.IgnoreUnexported(sync.RWMutex{}))
161-
cancel()
162-
wg.Wait()
162+
163+
e.Stop()
163164
})
164165
}
165166
}

internal/exporter/prometheus_test.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"context"
99
"math"
1010
"strings"
11-
"sync"
1211
"testing"
1312
"time"
1413

@@ -255,8 +254,9 @@ func TestHandlePrometheus(t *testing.T) {
255254
for _, tc := range handlePrometheusTests {
256255
tc := tc
257256
t.Run(tc.name, func(t *testing.T) {
258-
var wg sync.WaitGroup
259257
ctx, cancel := context.WithCancel(context.Background())
258+
defer cancel()
259+
260260
ms := metrics.NewStore()
261261
for _, metric := range tc.metrics {
262262
testutil.FatalIfErr(t, ms.Add(metric))
@@ -267,14 +267,13 @@ func TestHandlePrometheus(t *testing.T) {
267267
if !tc.progLabel {
268268
opts = append(opts, OmitProgLabel())
269269
}
270-
e, err := New(ctx, &wg, ms, opts...)
270+
e, err := New(ctx, ms, opts...)
271271
testutil.FatalIfErr(t, err)
272272
r := strings.NewReader(tc.expected)
273273
if err = promtest.CollectAndCompare(e, r); err != nil {
274274
t.Error(err)
275275
}
276-
cancel()
277-
wg.Wait()
276+
e.Stop()
278277
})
279278
}
280279
}
@@ -334,8 +333,9 @@ func TestWritePrometheus(t *testing.T) {
334333
for _, tc := range writePrometheusTests {
335334
tc := tc
336335
t.Run(tc.name, func(t *testing.T) {
337-
var wg sync.WaitGroup
338336
ctx, cancel := context.WithCancel(context.Background())
337+
defer cancel()
338+
339339
ms := metrics.NewStore()
340340
for _, metric := range tc.metrics {
341341
testutil.FatalIfErr(t, ms.Add(metric))
@@ -344,16 +344,15 @@ func TestWritePrometheus(t *testing.T) {
344344
Hostname("gunstar"),
345345
OmitProgLabel(),
346346
}
347-
e, err := New(ctx, &wg, ms, opts...)
347+
e, err := New(ctx, ms, opts...)
348348
testutil.FatalIfErr(t, err)
349349

350350
var buf bytes.Buffer
351351
err = e.Write(&buf)
352352
testutil.FatalIfErr(t, err)
353353
testutil.ExpectNoDiff(t, tc.expected, buf.String())
354354

355-
cancel()
356-
wg.Wait()
355+
e.Stop()
357356
})
358357
}
359358
}

internal/exporter/varz_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"io"
99
"net/http"
1010
"net/http/httptest"
11-
"sync"
1211
"testing"
1312
"time"
1413

@@ -73,13 +72,14 @@ func TestHandleVarz(t *testing.T) {
7372
for _, tc := range handleVarzTests {
7473
tc := tc
7574
t.Run(tc.name, func(t *testing.T) {
76-
var wg sync.WaitGroup
7775
ctx, cancel := context.WithCancel(context.Background())
76+
defer cancel()
77+
7878
ms := metrics.NewStore()
7979
for _, metric := range tc.metrics {
8080
testutil.FatalIfErr(t, ms.Add(metric))
8181
}
82-
e, err := New(ctx, &wg, ms, Hostname("gunstar"))
82+
e, err := New(ctx, ms, Hostname("gunstar"))
8383
testutil.FatalIfErr(t, err)
8484
response := httptest.NewRecorder()
8585
e.HandleVarz(response, &http.Request{})
@@ -91,8 +91,8 @@ func TestHandleVarz(t *testing.T) {
9191
t.Errorf("failed to read response: %s", err)
9292
}
9393
testutil.ExpectNoDiff(t, tc.expected, string(b))
94-
cancel()
95-
wg.Wait()
94+
95+
e.Stop()
9696
})
9797
}
9898
}

internal/mtail/mtail.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (m *Server) initRuntime() (err error) {
6666

6767
// initExporter sets up an Exporter for this Server.
6868
func (m *Server) initExporter() (err error) {
69-
m.e, err = exporter.New(m.ctx, &m.wg, m.store, m.eOpts...)
69+
m.e, err = exporter.New(m.ctx, m.store, m.eOpts...)
7070
if err != nil {
7171
return err
7272
}
@@ -234,6 +234,7 @@ func (m *Server) SetOption(options ...Option) error {
234234
// TODO(jaq): remove this once the test server is able to trigger polls on the components.
235235
func (m *Server) Run() error {
236236
m.wg.Wait()
237+
m.e.Stop()
237238
if m.compileOnly {
238239
glog.Info("compile-only is set, exiting")
239240
return nil

0 commit comments

Comments
 (0)