Skip to content

Commit 291f5a1

Browse files
authored
Merge pull request #868 from google/tailer-shutdown
feat: Handle `stdin` closure and shut down the tailer.
2 parents d8f9c09 + e99f7e5 commit 291f5a1

34 files changed

+487
-432
lines changed

.github/workflows/automerge.yml

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,21 @@ jobs:
1717
enable-automerge:
1818
if: github.event.pull_request.user.login == 'dependabot[bot]' && contains(github.event.pull_request.labels.*.name, 'dependencies')
1919
runs-on: ubuntu-latest
20+
# https://github.com/orgs/community/discussions/24686
2021
permissions:
21-
# enable-automerge is a graphql query, not REST, so isn't documented,
22-
# except in a mention in
23-
# https://github.blog/changelog/2021-02-04-pull-request-auto-merge-is-now-generally-available/
24-
# which says "can only be enabled by users with permissino to merge"; the
25-
# REST documentation says you need contents: write to perform a merge.
26-
# https://github.community/t/what-permission-does-a-github-action-need-to-call-graphql-enablepullrequestautomerge/197708
27-
# says this is it
2822
contents: write
23+
pull-requests: write
2924
steps:
30-
# Enable auto-merge *before* issuing an approval.
31-
- uses: alexwilson/enable-github-automerge-action@main
32-
with:
33-
github-token: "${{ secrets.GITHUB_TOKEN }}"
25+
- run: |
26+
gh api graphql -f pullRequestId="${{ github.event.pull_request.node_id }}" -f query='
27+
mutation EnablePullRequestAutoMerge($pullRequestId: ID!) {
28+
enablePullRequestAutoMerge(input: {pullRequestId: $pullRequestId}) {
29+
clientMutationId
30+
}
31+
}
32+
'
33+
env:
34+
GH_TOKEN: ${{ github.token }}
3435
3536
wait-on-checks:
3637
needs: enable-automerge

cmd/mtail/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func main() {
182182
}
183183
if *staleLogGcTickInterval > 0 {
184184
staleLogGcWaker := waker.NewTimed(ctx, *staleLogGcTickInterval)
185-
opts = append(opts, mtail.StaleLogGcWaker(staleLogGcWaker))
185+
opts = append(opts, mtail.GcWaker(staleLogGcWaker))
186186
}
187187
if *pollInterval > 0 {
188188
logStreamPollWaker := waker.NewTimed(ctx, *pollInterval)

internal/mtail/basic_tail_integration_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ import (
1717
func TestBasicTail(t *testing.T) {
1818
testutil.SkipIfShort(t)
1919
if testing.Verbose() {
20-
testutil.SetFlag(t, "vmodule", "tail=2,log_watcher=2")
20+
testutil.SetFlag(t, "vmodule", "tail=2,filestream=2")
2121
}
2222
logDir := testutil.TestTempDir(t)
2323

24-
m, stopM := mtail.TestStartServer(t, 1, mtail.LogPathPatterns(logDir+"/*"), mtail.ProgramPath("../../examples/linecount.mtail"))
24+
m, stopM := mtail.TestStartServer(t, 1, 1, mtail.LogPathPatterns(logDir+"/*"), mtail.ProgramPath("../../examples/linecount.mtail"))
2525
defer stopM()
2626

2727
logFile := filepath.Join(logDir, "log")
@@ -31,12 +31,13 @@ func TestBasicTail(t *testing.T) {
3131

3232
f := testutil.TestOpenFile(t, logFile)
3333
defer f.Close()
34-
m.PollWatched(1) // Force sync to EOF
34+
m.AwakenPatternPollers(1, 1) // Find `logFile`
35+
m.AwakenLogStreams(1, 1) // Force a sync to EOF
3536

3637
for i := 1; i <= 3; i++ {
3738
testutil.WriteString(t, f, fmt.Sprintf("%d\n", i))
3839
}
39-
m.PollWatched(1) // Expect to read 3 lines here.
40+
m.AwakenLogStreams(1, 1) // Expect to read 3 lines here.
4041

4142
var wg sync.WaitGroup
4243
wg.Add(2)
@@ -57,7 +58,7 @@ func TestNewLogDoesNotMatchIsIgnored(t *testing.T) {
5758

5859
// Start mtail
5960
logFilepath := filepath.Join(workdir, "log")
60-
m, stopM := mtail.TestStartServer(t, 0, mtail.LogPathPatterns(logFilepath))
61+
m, stopM := mtail.TestStartServer(t, 1, 0, mtail.LogPathPatterns(logFilepath))
6162
defer stopM()
6263

6364
logCountCheck := m.ExpectExpvarDeltaWithDeadline("log_count", 0)
@@ -68,7 +69,7 @@ func TestNewLogDoesNotMatchIsIgnored(t *testing.T) {
6869
logFile, err := os.Create(newLogFilepath)
6970
testutil.FatalIfErr(t, err)
7071
defer logFile.Close()
71-
m.PollWatched(0) // No streams so don't wait for any.
72+
// No streams so don't wait for any.
7273

7374
logCountCheck()
7475
}

internal/mtail/examples_integration_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func TestExamplePrograms(t *testing.T) {
9292
t.Run(fmt.Sprintf("%s on %s", tc.programfile, tc.logfile),
9393
testutil.TimeoutTest(exampleTimeout, func(t *testing.T) { //nolint:thelper
9494
ctx, cancel := context.WithCancel(context.Background())
95-
waker, _ := waker.NewTest(ctx, 0) // oneshot means we should never need to wake the stream
95+
waker, _ := waker.NewTest(ctx, 0, "waker") // oneshot means we should never need to wake the stream
9696
store := metrics.NewStore()
9797
programFile := filepath.Join("../..", tc.programfile)
9898
mtail, err := mtail.New(ctx, store, mtail.ProgramPath(programFile), mtail.LogPathPatterns(tc.logfile), mtail.OneShot, mtail.OmitMetricSource, mtail.DumpAstTypes, mtail.DumpBytecode, mtail.LogPatternPollWaker(waker), mtail.LogstreamPollWaker(waker))
@@ -155,7 +155,7 @@ func BenchmarkProgram(b *testing.B) {
155155
logFile := filepath.Join(logDir, "test.log")
156156
log := testutil.TestOpenFile(b, logFile)
157157
ctx, cancel := context.WithCancel(context.Background())
158-
waker, awaken := waker.NewTest(ctx, 1)
158+
waker, awaken := waker.NewTest(ctx, 1, "streams")
159159
store := metrics.NewStore()
160160
programFile := filepath.Join("../..", bm.programfile)
161161
mtail, err := mtail.New(ctx, store, mtail.ProgramPath(programFile), mtail.LogPathPatterns(log.Name()), mtail.LogstreamPollWaker(waker))
@@ -176,7 +176,7 @@ func BenchmarkProgram(b *testing.B) {
176176
count, err := io.Copy(log, l)
177177
testutil.FatalIfErr(b, err)
178178
total += count
179-
awaken(1)
179+
awaken(1, 1)
180180
}
181181
cancel()
182182
wg.Wait()

internal/mtail/examples_integration_unix_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ func TestFileSocketStreamComparison(t *testing.T) {
149149
defer wg.Done()
150150
source, err := os.OpenFile(tc.logfile, os.O_RDONLY, 0)
151151
testutil.FatalIfErr(t, err)
152-
s, err := net.DialUnix(scheme, nil, &net.UnixAddr{sockName, scheme})
152+
s, err := net.DialUnix(scheme, nil, &net.UnixAddr{Name: sockName, Net: scheme})
153153
testutil.FatalIfErr(t, err)
154154
n, err := io.Copy(s, source)
155155
testutil.FatalIfErr(t, err)

internal/mtail/log_deletion_integration_unix_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,20 @@ func TestLogDeletion(t *testing.T) {
2525
logFile := testutil.TestOpenFile(t, logFilepath)
2626
defer logFile.Close()
2727

28-
m, stopM := mtail.TestStartServer(t, 1, mtail.LogPathPatterns(logFilepath))
28+
m, stopM := mtail.TestStartServer(t, 1, 1, mtail.LogPathPatterns(logFilepath))
2929
defer stopM()
3030

3131
logCloseCheck := m.ExpectMapExpvarDeltaWithDeadline("log_closes_total", logFilepath, 1)
3232
logCountCheck := m.ExpectExpvarDeltaWithDeadline("log_count", -1)
3333

34-
m.PollWatched(1) // Force sync to EOF
34+
m.AwakenPatternPollers(1, 1)
35+
m.AwakenLogStreams(1, 1) // Force read to EOF
36+
3537
glog.Info("remove")
3638
testutil.FatalIfErr(t, os.Remove(logFilepath))
3739

38-
m.PollWatched(0) // one pass to stop
40+
m.AwakenLogStreams(1, 0) // run stream to observe it's missing
3941
logCloseCheck()
40-
m.PollWatched(0) // one pass to remove completed stream
42+
m.AwakenGcPoller(1, 1)
4143
logCountCheck()
4244
}

internal/mtail/log_glob_integration_test.go

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func TestGlobBeforeStart(t *testing.T) {
4545
testutil.WriteString(t, log, "\n")
4646
log.Close()
4747
}
48-
m, stopM := mtail.TestStartServer(t, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")))
48+
m, stopM := mtail.TestStartServer(t, 0, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")))
4949
stopM()
5050

5151
if r := m.GetExpvar("log_count"); r.(*expvar.Int).Value() != count {
@@ -75,10 +75,10 @@ func TestGlobAfterStart(t *testing.T) {
7575
false,
7676
},
7777
}
78-
m, stopM := mtail.TestStartServer(t, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")))
78+
m, stopM := mtail.TestStartServer(t, 1, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")))
7979
defer stopM()
8080

81-
m.PollWatched(0) // Force sync to EOF
81+
m.AwakenPatternPollers(1, 1)
8282

8383
var count int64
8484
for _, tt := range globTests {
@@ -90,9 +90,8 @@ func TestGlobAfterStart(t *testing.T) {
9090
for _, tt := range globTests {
9191
log := testutil.TestOpenFile(t, tt.name)
9292
defer log.Close()
93-
m.PollWatched(0) // Force sync to EOF
93+
m.AwakenPatternPollers(1, 1)
9494
}
95-
// m.PollWatched(2)
9695
logCountCheck()
9796
}
9897

@@ -142,7 +141,7 @@ func TestGlobIgnoreFolder(t *testing.T) {
142141
testutil.WriteString(t, log, "\n")
143142
}
144143

145-
m, stopM := mtail.TestStartServer(t, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")), mtail.IgnoreRegexPattern("\\.gz"))
144+
m, stopM := mtail.TestStartServer(t, 0, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")), mtail.IgnoreRegexPattern("\\.gz"))
146145

147146
stopM()
148147

@@ -184,7 +183,7 @@ func TestFilenameRegexIgnore(t *testing.T) {
184183
testutil.WriteString(t, log, "\n")
185184
}
186185

187-
m, stopM := mtail.TestStartServer(t, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")), mtail.IgnoreRegexPattern("\\.gz"))
186+
m, stopM := mtail.TestStartServer(t, 0, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")), mtail.IgnoreRegexPattern("\\.gz"))
188187

189188
stopM()
190189

@@ -207,7 +206,7 @@ func TestGlobRelativeAfterStart(t *testing.T) {
207206
// Move to logdir to make relative paths
208207
testutil.Chdir(t, logDir)
209208

210-
m, stopM := mtail.TestStartServer(t, 1, mtail.ProgramPath(progDir), mtail.LogPathPatterns("log.*"))
209+
m, stopM := mtail.TestStartServer(t, 1, 0, mtail.ProgramPath(progDir), mtail.LogPathPatterns("log.*"))
211210
defer stopM()
212211

213212
{
@@ -217,9 +216,11 @@ func TestGlobRelativeAfterStart(t *testing.T) {
217216
f := testutil.TestOpenFile(t, logFile)
218217
defer f.Close()
219218

220-
m.PollWatched(1) // Force sync to EOF
219+
m.AwakenPatternPollers(1, 1)
220+
m.AwakenLogStreams(0, 1) // Force read to EOF
221+
221222
testutil.WriteString(t, f, "line 1\n")
222-
m.PollWatched(1)
223+
m.AwakenLogStreams(1, 1)
223224

224225
logCountCheck()
225226
}
@@ -232,9 +233,11 @@ func TestGlobRelativeAfterStart(t *testing.T) {
232233
f := testutil.TestOpenFile(t, logFile)
233234
defer f.Close()
234235

235-
m.PollWatched(2)
236+
m.AwakenPatternPollers(1, 1)
237+
m.AwakenLogStreams(1, 2) // Force read to EOF
238+
236239
testutil.WriteString(t, f, "line 1\n")
237-
m.PollWatched(2)
240+
m.AwakenLogStreams(2, 2)
238241

239242
logCountCheck()
240243
}
@@ -245,9 +248,11 @@ func TestGlobRelativeAfterStart(t *testing.T) {
245248
f := testutil.TestOpenFile(t, logFile)
246249
defer f.Close()
247250

248-
m.PollWatched(2)
251+
m.AwakenPatternPollers(1, 1)
252+
m.AwakenLogStreams(2, 2) // Force read to EOF
253+
249254
testutil.WriteString(t, f, "line 2\n")
250-
m.PollWatched(2)
255+
m.AwakenLogStreams(2, 2)
251256

252257
logCountCheck()
253258
}

internal/mtail/log_rotation_integration_test.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
"github.com/google/mtail/internal/testutil"
1616
)
1717

18-
func TestLogSoftLinkChange(t *testing.T) {
18+
func TestLogRotationBySoftLinkChange(t *testing.T) {
1919
testutil.SkipIfShort(t)
2020

2121
for _, tc := range []bool{false, true} {
@@ -29,7 +29,7 @@ func TestLogSoftLinkChange(t *testing.T) {
2929

3030
logFilepath := filepath.Join(workdir, "log")
3131

32-
m, stopM := mtail.TestStartServer(t, 1, mtail.LogPathPatterns(logFilepath))
32+
m, stopM := mtail.TestStartServer(t, 1, 1, mtail.LogPathPatterns(logFilepath))
3333
defer stopM()
3434

3535
logCountCheck := m.ExpectExpvarDeltaWithDeadline("log_count", 1)
@@ -40,33 +40,42 @@ func TestLogSoftLinkChange(t *testing.T) {
4040

4141
testutil.FatalIfErr(t, os.Symlink(logFilepath+".true1", logFilepath))
4242
glog.Info("symlinked")
43-
m.PollWatched(1)
43+
m.AwakenPatternPollers(1, 1)
44+
m.AwakenLogStreams(1, 1)
4445

4546
inputLines := []string{"hi1", "hi2", "hi3"}
4647
for _, x := range inputLines {
4748
testutil.WriteString(t, trueLog1, x+"\n")
4849
}
49-
m.PollWatched(1)
50+
m.AwakenPatternPollers(1, 1)
51+
m.AwakenLogStreams(1, 1)
5052

5153
trueLog2 := testutil.TestOpenFile(t, logFilepath+".true2")
5254
defer trueLog2.Close()
53-
m.PollWatched(1)
55+
m.AwakenPatternPollers(1, 1)
56+
m.AwakenLogStreams(1, 1)
57+
m.AwakenGcPoller(1, 1)
5458
logClosedCheck := m.ExpectMapExpvarDeltaWithDeadline("log_closes_total", logFilepath, 1)
5559
logCompletedCheck := m.ExpectExpvarDeltaWithDeadline("log_count", -1)
5660
testutil.FatalIfErr(t, os.Remove(logFilepath))
5761
if tc {
58-
m.PollWatched(0) // simulate race condition with this poll.
59-
logClosedCheck() // sync when filestream closes fd
60-
m.PollWatched(0) // invoke the GC
61-
logCompletedCheck() // sync to when the logstream is removed from tailer
62+
// Simulate a race where we poll for a pattern and remove the
63+
// existing stream.
64+
m.AwakenPatternPollers(1, 1) // simulate race condition with this poll.
65+
m.AwakenLogStreams(1, 0)
66+
logClosedCheck() // barrier until filestream closes fd
67+
m.AwakenGcPoller(1, 1)
68+
logCompletedCheck() // barrier until the logstream is removed from tailer
6269
}
6370
testutil.FatalIfErr(t, os.Symlink(logFilepath+".true2", logFilepath))
64-
m.PollWatched(1)
71+
m.AwakenPatternPollers(1, 1)
72+
m.AwakenLogStreams(0, 1)
6573

6674
for _, x := range inputLines {
6775
testutil.WriteString(t, trueLog2, x+"\n")
6876
}
69-
m.PollWatched(1)
77+
m.AwakenPatternPollers(1, 1)
78+
m.AwakenLogStreams(1, 1)
7079

7180
var wg sync.WaitGroup
7281
wg.Add(2)

internal/mtail/log_rotation_integration_unix_test.go

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@ import (
1717
"github.com/google/mtail/internal/testutil"
1818
)
1919

20-
// TestLogRotation is a unix-specific test because on Windows, files cannot be removed
21-
// or renamed while there is an open read handle on them. Instead, log rotation would
22-
// have to be implemented by copying and then truncating the original file. That test
23-
// case is already covered by TestLogTruncation.
24-
func TestLogRotation(t *testing.T) {
20+
// TestLogRotationByRename is a unix-specific test because on Windows, files
21+
// cannot be removed or renamed while there is an open read handle on
22+
// them. Instead, log rotation would have to be implemented by copying and then
23+
// truncating the original file. That test case is already covered by
24+
// TestLogTruncation.
25+
func TestLogRotationByRename(t *testing.T) {
2526
testutil.SkipIfShort(t)
2627

2728
for _, tc := range []bool{false, true} {
@@ -45,34 +46,38 @@ func TestLogRotation(t *testing.T) {
4546
f := testutil.TestOpenFile(t, logFile)
4647
defer f.Close()
4748

48-
m, stopM := mtail.TestStartServer(t, 1, mtail.ProgramPath(progDir), mtail.LogPathPatterns(logDir+"/log"))
49+
m, stopM := mtail.TestStartServer(t, 1, 1, mtail.ProgramPath(progDir), mtail.LogPathPatterns(logDir+"/log"))
4950
defer stopM()
5051

5152
logOpensTotalCheck := m.ExpectMapExpvarDeltaWithDeadline("log_opens_total", logFile, 1)
5253
logLinesTotalCheck := m.ExpectMapExpvarDeltaWithDeadline("log_lines_total", logFile, 3)
5354

5455
testutil.WriteString(t, f, "line 1\n")
55-
m.PollWatched(1)
56+
m.AwakenLogStreams(1, 1)
5657

5758
testutil.WriteString(t, f, "line 2\n")
58-
m.PollWatched(1)
59+
m.AwakenLogStreams(1, 1)
5960

6061
logClosedCheck := m.ExpectMapExpvarDeltaWithDeadline("log_closes_total", logFile, 1)
6162
logCompletedCheck := m.ExpectExpvarDeltaWithDeadline("log_count", -1)
6263
glog.Info("rename")
6364
err = os.Rename(logFile, logFile+".1")
6465
testutil.FatalIfErr(t, err)
6566
if tc {
66-
m.PollWatched(0) // simulate race condition with this poll.
67-
logClosedCheck() // sync when filestream closes fd
68-
m.PollWatched(0) // invoke the GC
69-
logCompletedCheck() // sync to when the logstream is removed from tailer
67+
// Simulate a race where we poll for a pattern and remove the
68+
// existing stream.
69+
m.AwakenPatternPollers(1, 1) // simulate race condition with this poll.
70+
m.AwakenLogStreams(1, 0)
71+
logClosedCheck() // barrier until filestream closes fd
72+
m.AwakenGcPoller(1, 1)
73+
logCompletedCheck() // barrier until the logstream is removed from tailer
7074
}
7175
glog.Info("create")
7276
f = testutil.TestOpenFile(t, logFile)
73-
m.PollWatched(1)
77+
m.AwakenPatternPollers(1, 1)
78+
m.AwakenLogStreams(0, 1)
7479
testutil.WriteString(t, f, "line 1\n")
75-
m.PollWatched(1)
80+
m.AwakenLogStreams(1, 1)
7681

7782
var wg sync.WaitGroup
7883
wg.Add(2)

0 commit comments

Comments
 (0)