Skip to content

Commit b8df61a

Browse files
authored
Merge pull request #893 from google/logstream-shutdown
Fix: Use channel semantics to manage `LogStream` lifecycle in `Tailer`.
2 parents 256ab63 + f39cf7b commit b8df61a

19 files changed

+107
-358
lines changed

cmd/mtail/main.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ var (
6565
pollInterval = flag.Duration("poll_interval", 250*time.Millisecond, "Set the interval to poll each log file for data; must be positive, or zero to disable polling. With polling mode, only the files found at mtail startup will be polled.")
6666
pollLogInterval = flag.Duration("poll_log_interval", 250*time.Millisecond, "Set the interval to find all matched log files for polling; must be positive, or zero to disable polling. With polling mode, only the files found at mtail startup will be polled.")
6767
expiredMetricGcTickInterval = flag.Duration("expired_metrics_gc_interval", time.Hour, "interval between expired metric garbage collection runs")
68-
staleLogGcTickInterval = flag.Duration("stale_log_gc_interval", time.Hour, "interval between stale log garbage collection runs")
6968
metricPushInterval = flag.Duration("metric_push_interval", time.Minute, "interval between metric pushes to passive collectors")
7069
maxRegexpLength = flag.Int("max_regexp_length", 1024, "The maximum length a mtail regexp expression can have. Excessively long patterns are likely to cause compilation and runtime performance problems.")
7170
maxRecursionDepth = flag.Int("max_recursion_depth", 100, "The maximum length a mtail statement can be, as measured by parsed tokens. Excessively long mtail expressions are likely to cause compilation and runtime performance problems.")
@@ -83,6 +82,7 @@ var (
8382
// Deprecated.
8483
_ = flag.Bool("disable_fsnotify", true, "DEPRECATED: this flag is no longer in use.")
8584
_ = flag.Int("metric_push_interval_seconds", 0, "DEPRECATED: use --metric_push_interval instead")
85+
_ = flag.Duration("stale_log_gc_interval", time.Hour, "DEPRECATED: this flag is no longer in use")
8686
)
8787

8888
func init() {
@@ -180,10 +180,6 @@ func main() {
180180
if *logRuntimeErrors {
181181
opts = append(opts, mtail.LogRuntimeErrors)
182182
}
183-
if *staleLogGcTickInterval > 0 {
184-
staleLogGcWaker := waker.NewTimed(ctx, *staleLogGcTickInterval)
185-
opts = append(opts, mtail.GcWaker(staleLogGcWaker))
186-
}
187183
if *pollInterval > 0 {
188184
logStreamPollWaker := waker.NewTimed(ctx, *pollInterval)
189185
logPatternPollWaker := waker.NewTimed(ctx, *pollLogInterval)

internal/mtail/log_deletion_integration_unix_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,5 @@ func TestLogDeletion(t *testing.T) {
3939

4040
m.AwakenLogStreams(1, 0) // run stream to observe it's missing
4141
logCloseCheck()
42-
m.AwakenGcPoller(1, 1)
4342
logCountCheck()
4443
}

internal/mtail/log_glob_integration_test.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func TestGlobBeforeStart(t *testing.T) {
4646
log.Close()
4747
}
4848
m, stopM := mtail.TestStartServer(t, 0, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")))
49-
stopM()
49+
defer stopM()
5050

5151
if r := m.GetExpvar("log_count"); r.(*expvar.Int).Value() != count {
5252
t.Errorf("Expecting log count of %d, received %d", count, r)
@@ -142,8 +142,7 @@ func TestGlobIgnoreFolder(t *testing.T) {
142142
}
143143

144144
m, stopM := mtail.TestStartServer(t, 0, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")), mtail.IgnoreRegexPattern("\\.gz"))
145-
146-
stopM()
145+
defer stopM()
147146

148147
if r := m.GetExpvar("log_count"); r.(*expvar.Int).Value() != count {
149148
t.Errorf("Expecting log count of %d, received %v", count, r)
@@ -184,8 +183,7 @@ func TestFilenameRegexIgnore(t *testing.T) {
184183
}
185184

186185
m, stopM := mtail.TestStartServer(t, 0, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")), mtail.IgnoreRegexPattern("\\.gz"))
187-
188-
stopM()
186+
defer stopM()
189187

190188
if r := m.GetExpvar("log_count"); r.(*expvar.Int).Value() != count {
191189
t.Errorf("Log count not matching, expected: %d received: %v", count, r)

internal/mtail/log_rotation_integration_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ func TestLogRotationBySoftLinkChange(t *testing.T) {
5454
defer trueLog2.Close()
5555
m.AwakenPatternPollers(1, 1)
5656
m.AwakenLogStreams(1, 1)
57-
m.AwakenGcPoller(1, 1)
5857
logClosedCheck := m.ExpectMapExpvarDeltaWithDeadline("log_closes_total", logFilepath, 1)
5958
logCompletedCheck := m.ExpectExpvarDeltaWithDeadline("log_count", -1)
6059
testutil.FatalIfErr(t, os.Remove(logFilepath))
@@ -63,8 +62,7 @@ func TestLogRotationBySoftLinkChange(t *testing.T) {
6362
// existing stream.
6463
m.AwakenPatternPollers(1, 1) // simulate race condition with this poll.
6564
m.AwakenLogStreams(1, 0)
66-
logClosedCheck() // barrier until filestream closes fd
67-
m.AwakenGcPoller(1, 1)
65+
logClosedCheck() // barrier until filestream closes fd
6866
logCompletedCheck() // barrier until the logstream is removed from tailer
6967
}
7068
testutil.FatalIfErr(t, os.Symlink(logFilepath+".true2", logFilepath))

internal/mtail/log_rotation_integration_unix_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,7 @@ func TestLogRotationByRename(t *testing.T) {
6868
// existing stream.
6969
m.AwakenPatternPollers(1, 1) // simulate race condition with this poll.
7070
m.AwakenLogStreams(1, 0)
71-
logClosedCheck() // barrier until filestream closes fd
72-
m.AwakenGcPoller(1, 1)
71+
logClosedCheck() // barrier until filestream closes fd
7372
logCompletedCheck() // barrier until the logstream is removed from tailer
7473
}
7574
glog.Info("create")

internal/mtail/options.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -109,20 +109,6 @@ func (opt overrideLocation) apply(m *Server) error {
109109
return nil
110110
}
111111

112-
// GcWaker triggers garbage collection runs for stale logs in the tailer.
113-
func GcWaker(w waker.Waker) Option {
114-
return &gcWaker{w}
115-
}
116-
117-
type gcWaker struct {
118-
waker.Waker
119-
}
120-
121-
func (opt gcWaker) apply(m *Server) error {
122-
m.tOpts = append(m.tOpts, tailer.GcWaker(opt.Waker))
123-
return nil
124-
}
125-
126112
// LogPatternPollWaker triggers polls on the filesystem for new logs that match the log glob patterns.
127113
func LogPatternPollWaker(w waker.Waker) Option {
128114
return &logPatternPollWaker{w}

internal/mtail/testing.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@ type TestServer struct {
3434
// method, synchronising the pattern poll with the test.
3535
AwakenPatternPollers waker.WakeFunc // the glob awakens
3636

37-
gcWaker waker.Waker // activate the cleanup routines
38-
AwakenGcPoller waker.WakeFunc
39-
4037
tb testing.TB
4138

4239
cancel context.CancelFunc
@@ -68,11 +65,9 @@ func TestMakeServer(tb testing.TB, patternWakers int, streamWakers int, options
6865
}
6966
ts.streamWaker, ts.AwakenLogStreams = waker.NewTest(ctx, streamWakers, "streams")
7067
ts.patternWaker, ts.AwakenPatternPollers = waker.NewTest(ctx, patternWakers, "patterns")
71-
ts.gcWaker, ts.AwakenGcPoller = waker.NewTest(ctx, 1, "gc")
7268
options = append(options,
7369
LogstreamPollWaker(ts.streamWaker),
7470
LogPatternPollWaker(ts.patternWaker),
75-
GcWaker(ts.gcWaker),
7671
)
7772
var err error
7873
ts.Server, err = New(ctx, metrics.NewStore(), options...)

internal/tailer/logstream/dgramstream.go

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ type dgramStream struct {
2424
address string // Given name for the underlying socket path on the filesystem or hostport.
2525

2626
mu sync.RWMutex // protects following fields
27-
completed bool // This pipestream is completed and can no longer be used.
2827
lastReadTime time.Time // Last time a log line was read from this named pipe
28+
29+
staleTimer *time.Timer // Expire the stream if no read in 24h
2930
}
3031

3132
func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, oneShot OneShotMode) (LogStream, error) {
@@ -40,12 +41,6 @@ func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker,
4041
return ss, nil
4142
}
4243

43-
func (ds *dgramStream) LastReadTime() time.Time {
44-
ds.mu.RLock()
45-
defer ds.mu.RUnlock()
46-
return ds.lastReadTime
47-
}
48-
4944
// The read buffer size for datagrams.
5045
const datagramReadBufferSize = 131072
5146

@@ -71,11 +66,8 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
7166
glog.Info(err)
7267
}
7368
logCloses.Add(ds.address, 1)
74-
ds.mu.Lock()
75-
ds.completed = true
7669
close(ds.lines)
77-
ds.mu.Unlock()
78-
ds.Stop()
70+
ds.cancel()
7971
}()
8072
ctx, cancel := context.WithCancel(ctx)
8173
defer cancel()
@@ -85,6 +77,10 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
8577
n, _, err := c.ReadFrom(b)
8678
glog.V(2).Infof("stream(%s:%s): read %d bytes, err is %v", ds.scheme, ds.address, n, err)
8779

80+
if ds.staleTimer != nil {
81+
ds.staleTimer.Stop()
82+
}
83+
8884
// This is a test-only trick that says if we've already put this
8985
// logstream in graceful shutdown, then a zero-byte read is
9086
// equivalent to an "EOF" in connection and file oriented streams.
@@ -114,6 +110,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
114110
ds.mu.Lock()
115111
ds.lastReadTime = time.Now()
116112
ds.mu.Unlock()
113+
ds.staleTimer = time.AfterFunc(time.Hour*24, ds.cancel)
117114
}
118115

119116
if err != nil && IsEndOrCancel(err) {
@@ -143,17 +140,6 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
143140
return nil
144141
}
145142

146-
func (ds *dgramStream) IsComplete() bool {
147-
ds.mu.RLock()
148-
defer ds.mu.RUnlock()
149-
return ds.completed
150-
}
151-
152-
func (ds *dgramStream) Stop() {
153-
glog.V(2).Infof("stream(%s:%s): Stop received on datagram stream.", ds.scheme, ds.address)
154-
ds.cancel()
155-
}
156-
157143
// Lines implements the LogStream interface, returning the output lines channel.
158144
func (ds *dgramStream) Lines() <-chan *logline.LogLine {
159145
return ds.lines

internal/tailer/logstream/dgramstream_unix_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
4040
}
4141

4242
ctx, cancel := context.WithCancel(context.Background())
43+
// Stream is not shut down with cancel in this test
44+
defer cancel()
4345
waker, awaken := waker.NewTest(ctx, 1, "stream")
4446

4547
sockName := scheme + "://" + addr
@@ -67,10 +69,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
6769

6870
checkLineDiff()
6971

70-
cancel()
71-
wg.Wait()
72-
73-
if !ds.IsComplete() {
72+
if v := <-ds.Lines(); v != nil {
7473
t.Errorf("expecting dgramstream to be complete because socket closed")
7574
}
7675
}))
@@ -115,12 +114,11 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) {
115114
awaken(0, 0) // Synchronise past read.
116115

117116
cancel() // This cancellation should cause the stream to shut down.
118-
119117
wg.Wait()
120118

121119
checkLineDiff()
122120

123-
if !ds.IsComplete() {
121+
if v := <-ds.Lines(); v != nil {
124122
t.Errorf("expecting dgramstream to be complete because cancel")
125123
}
126124
}))

internal/tailer/logstream/filestream.go

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ type fileStream struct {
4242

4343
mu sync.RWMutex // protects following fields.
4444
lastReadTime time.Time // Last time a log line was read from this file
45-
completed bool // The filestream is completed and can no longer be used.
45+
46+
staleTimer *time.Timer // Expire the stream if no read in 24h
4647
}
4748

4849
// newFileStream creates a new log stream from a regular file.
@@ -57,12 +58,6 @@ func newFileStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, p
5758
return fs, nil
5859
}
5960

60-
func (fs *fileStream) LastReadTime() time.Time {
61-
fs.mu.RLock()
62-
defer fs.mu.RUnlock()
63-
return fs.lastReadTime
64-
}
65-
6661
func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, fi os.FileInfo, oneShot OneShotMode, streamFromStart bool) error {
6762
fd, err := os.OpenFile(fs.pathname, os.O_RDONLY, 0o600)
6863
if err != nil {
@@ -107,6 +102,10 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
107102
count, err := fd.Read(b)
108103
glog.V(2).Infof("stream(%s): read %d bytes, err is %v", fs.pathname, count, err)
109104

105+
if fs.staleTimer != nil {
106+
fs.staleTimer.Stop()
107+
}
108+
110109
if count > 0 {
111110
total += count
112111
glog.V(2).Infof("stream(%s): decode and send", fs.pathname)
@@ -121,6 +120,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
121120
fs.mu.Lock()
122121
fs.lastReadTime = time.Now()
123122
fs.mu.Unlock()
123+
fs.staleTimer = time.AfterFunc(time.Hour*24, fs.cancel)
124124
}
125125

126126
if err != nil && err != io.EOF {
@@ -154,17 +154,13 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
154154
// file is in the middle of a rotation and gets recreated
155155
// in the next moment. We can't rely on the Tailer to tell
156156
// us we're deleted because the tailer can only tell us to
157-
// cancel, which ends up causing us to race here against
158-
// detection of IsCompleted.
157+
// cancel.
159158
if os.IsNotExist(serr) {
160159
glog.V(2).Infof("stream(%s): source no longer exists, exiting", fs.pathname)
161160
if partial.Len() > 0 {
162161
sendLine(ctx, fs.pathname, partial, fs.lines)
163162
}
164-
fs.mu.Lock()
165-
fs.completed = true
166163
close(fs.lines)
167-
fs.mu.Unlock()
168164
return
169165
}
170166
logErrors.Add(fs.pathname, 1)
@@ -226,10 +222,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
226222
if partial.Len() > 0 {
227223
sendLine(ctx, fs.pathname, partial, fs.lines)
228224
}
229-
fs.mu.Lock()
230-
fs.completed = true
231225
close(fs.lines)
232-
fs.mu.Unlock()
233226
return
234227
}
235228
select {
@@ -238,10 +231,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
238231
if partial.Len() > 0 {
239232
sendLine(ctx, fs.pathname, partial, fs.lines)
240233
}
241-
fs.mu.Lock()
242-
fs.completed = true
243234
close(fs.lines)
244-
fs.mu.Unlock()
245235
return
246236
default:
247237
// keep going
@@ -273,17 +263,6 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
273263
return nil
274264
}
275265

276-
func (fs *fileStream) IsComplete() bool {
277-
fs.mu.RLock()
278-
defer fs.mu.RUnlock()
279-
return fs.completed
280-
}
281-
282-
// Stop implements the LogStream interface.
283-
func (fs *fileStream) Stop() {
284-
fs.cancel()
285-
}
286-
287266
// Lines implements the LogStream interface, returning the output lines channel.
288267
func (fs *fileStream) Lines() <-chan *logline.LogLine {
289268
return fs.lines

0 commit comments

Comments
 (0)