Skip to content

Commit 5196546

Browse files
authored
Merge pull request #894 from google/read-after-read
fix: Return to read immediately after a successful read.
2 parents b8df61a + afd77e4 commit 5196546

File tree

7 files changed

+40
-31
lines changed

7 files changed

+40
-31
lines changed

internal/tailer/logstream/dgramstream.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
111111
ds.lastReadTime = time.Now()
112112
ds.mu.Unlock()
113113
ds.staleTimer = time.AfterFunc(time.Hour*24, ds.cancel)
114+
115+
// No error implies more to read, so restart the loop.
116+
if err == nil && ctx.Err() == nil {
117+
continue
118+
}
114119
}
115120

116121
if err != nil && IsEndOrCancel(err) {
@@ -125,12 +130,13 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
125130
glog.V(2).Infof("stream(%s:%s): waiting", ds.scheme, ds.address)
126131
select {
127132
case <-ctx.Done():
133+
// Exit after next read attempt.
128134
// We may have started waiting here when the stop signal
129135
// arrives, but since that wait the file may have been
130136
// written to. The file is not technically yet at EOF so
131137
// we need to go back and try one more read. We'll exit
132138
// the stream in the zero byte handler above.
133-
glog.V(2).Infof("stream(%s:%s): Stopping after next zero byte read", ds.scheme, ds.address)
139+
glog.V(2).Infof("stream(%s): context cancelled, exiting after next zero byte read", ds.scheme, ds.address)
134140
case <-waker.Wake():
135141
// sleep until next Wake()
136142
glog.V(2).Infof("stream(%s:%s): Wake received", ds.scheme, ds.address)

internal/tailer/logstream/dgramstream_unix_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
4242
ctx, cancel := context.WithCancel(context.Background())
4343
// Stream is not shut down with cancel in this test
4444
defer cancel()
45-
waker, awaken := waker.NewTest(ctx, 1, "stream")
45+
waker := waker.NewTestAlways()
4646

4747
sockName := scheme + "://" + addr
4848
ds, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotEnabled)
@@ -59,8 +59,6 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
5959
_, err = s.Write([]byte("1\n"))
6060
testutil.FatalIfErr(t, err)
6161

62-
awaken(0, 0) // sync past read
63-
6462
// "Close" the socket by sending zero bytes, which in oneshot mode tells the stream to act as if we're done.
6563
_, err = s.Write([]byte{})
6664
testutil.FatalIfErr(t, err)
@@ -94,7 +92,7 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) {
9492
}
9593

9694
ctx, cancel := context.WithCancel(context.Background())
97-
waker, awaken := waker.NewTest(ctx, 1, "stream")
95+
waker := waker.NewTestAlways()
9896

9997
sockName := scheme + "://" + addr
10098
ds, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotDisabled)
@@ -111,7 +109,8 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) {
111109
_, err = s.Write([]byte("1\n"))
112110
testutil.FatalIfErr(t, err)
113111

114-
awaken(0, 0) // Synchronise past read.
112+
// Yield to give the stream a chance to read.
113+
time.Sleep(10 * time.Millisecond)
115114

116115
cancel() // This cancellation should cause the stream to shut down.
117116
wg.Wait()

internal/tailer/logstream/filestream.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,11 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
121121
fs.lastReadTime = time.Now()
122122
fs.mu.Unlock()
123123
fs.staleTimer = time.AfterFunc(time.Hour*24, fs.cancel)
124+
125+
// No error implies there is more to read so restart the loop.
126+
if err == nil && ctx.Err() == nil {
127+
continue
128+
}
124129
}
125130

126131
if err != nil && err != io.EOF {
@@ -206,12 +211,6 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
206211
}
207212
}
208213

209-
// No error implies there is more to read in this file so go
210-
// straight back to read unless it looks like context is Done.
211-
if err == nil && ctx.Err() == nil {
212-
continue
213-
}
214-
215214
Sleep:
216215
// If we get here it's because we've stalled. First test to see if it's
217216
// time to exit.
@@ -243,6 +242,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
243242
glog.V(2).Infof("stream(%s): waiting", fs.pathname)
244243
select {
245244
case <-ctx.Done():
245+
// Exit after next read attempt.
246246
// We may have started waiting here when the cancellation
247247
// arrives, but since that wait the file may have been
248248
// written to. The file is not technically yet at EOF so
@@ -251,7 +251,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
251251
// could argue exiting immediately is less surprising.
252252
// Assumption is that this doesn't make a difference in
253253
// production.
254-
glog.V(2).Infof("stream(%s): Cancelled after next read", fs.pathname)
254+
glog.V(2).Infof("stream(%s): context cancelled, exiting after next read timeout", fs.pathname)
255255
case <-waker.Wake():
256256
// sleep until next Wake()
257257
glog.V(2).Infof("stream(%s): Wake received", fs.pathname)

internal/tailer/logstream/pipestream.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,8 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
8484
}
8585
logCloses.Add(ps.pathname, 1)
8686
close(ps.lines)
87+
ps.cancel()
8788
}()
88-
ctx, cancel := context.WithCancel(ctx)
89-
defer cancel()
9089
SetReadDeadlineOnDone(ctx, fd)
9190

9291
for {
@@ -105,6 +104,11 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
105104
ps.lastReadTime = time.Now()
106105
ps.mu.Unlock()
107106
ps.staleTimer = time.AfterFunc(time.Hour*24, ps.cancel)
107+
108+
// No error implies there is more to read so restart the loop.
109+
if err == nil && ctx.Err() == nil {
110+
continue
111+
}
108112
}
109113

110114
// Test to see if we should exit.
@@ -120,10 +124,8 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
120124
glog.V(2).Infof("stream(%s): waiting", ps.pathname)
121125
select {
122126
case <-ctx.Done():
123-
// Exit immediately; cancelled context is going to cause the
124-
// next read to be interrupted and exit, so don't bother going
125-
// around the loop again.
126-
return
127+
// Exit after next read attempt.
128+
glog.V(2).Infof("stream(%s): context cancelled, exiting after next read timeout", ps.pathname)
127129
case <-waker.Wake():
128130
// sleep until next Wake()
129131
glog.V(2).Infof("stream(%s): Wake received", ps.pathname)

internal/tailer/logstream/pipestream_unix_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) {
7373
testutil.FatalIfErr(t, unix.Mkfifo(name, 0o666))
7474

7575
ctx, cancel := context.WithCancel(context.Background())
76-
waker, awaken := waker.NewTest(ctx, 1, "stream")
76+
waker := waker.NewTestAlways()
7777

7878
f, err := os.OpenFile(name, os.O_RDWR, os.ModeNamedPipe)
7979
testutil.FatalIfErr(t, err)
@@ -87,9 +87,6 @@ func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) {
8787

8888
testutil.WriteString(t, f, "1\n")
8989

90-
// Avoid a race with cancellation if we can synchronise with waker.Wake()
91-
awaken(0, 0)
92-
9390
cancel() // Cancellation here should cause the stream to shut down.
9491
wg.Wait()
9592

@@ -155,7 +152,7 @@ func TestPipeStreamReadStdin(t *testing.T) {
155152
ctx, cancel := context.WithCancel(context.Background())
156153
// The stream is not shut down by cancel in this test.
157154
defer cancel()
158-
waker, awaken := waker.NewTest(ctx, 1, "stream")
155+
waker := waker.NewTestAlways()
159156

160157
ps, err := logstream.New(ctx, &wg, waker, "-", logstream.OneShotDisabled)
161158
testutil.FatalIfErr(t, err)
@@ -165,7 +162,8 @@ func TestPipeStreamReadStdin(t *testing.T) {
165162
}
166163
checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ps.Lines())
167164

168-
awaken(0, 0)
165+
// Give the stream a chance to wake and read
166+
time.Sleep(10 * time.Millisecond)
169167

170168
testutil.FatalIfErr(t, f.Close())
171169

internal/tailer/logstream/socketstream.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,11 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake
135135
ss.lastReadTime = time.Now()
136136
ss.mu.Unlock()
137137
ss.staleTimer = time.AfterFunc(time.Hour*24, ss.cancel)
138+
139+
// No error implies more to read, so restart the loop.
140+
if err == nil && ctx.Err() == nil {
141+
continue
142+
}
138143
}
139144

140145
if err != nil && IsEndOrCancel(err) {
@@ -150,7 +155,7 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake
150155
glog.V(2).Infof("stream(%s:%s): waiting", ss.scheme, ss.address)
151156
select {
152157
case <-ctx.Done():
153-
// Cancelled context will cause the next read to be interrupted and exit.
158+
// Exit after next read attempt.
154159
glog.V(2).Infof("stream(%s:%s): context cancelled, exiting after next read timeout", ss.scheme, ss.address)
155160
case <-waker.Wake():
156161
// sleep until next Wake()

internal/tailer/logstream/socketstream_unix_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) {
4040
ctx, cancel := context.WithCancel(context.Background())
4141
// The stream is not shut down with cancel in this test.
4242
defer cancel()
43-
waker, awaken := waker.NewTest(ctx, 1, "stream")
43+
waker := waker.NewTestAlways()
4444

4545
sockName := scheme + "://" + addr
4646
ss, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotEnabled)
@@ -57,8 +57,6 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) {
5757
_, err = s.Write([]byte("1\n"))
5858
testutil.FatalIfErr(t, err)
5959

60-
awaken(0, 0) // Sync past read
61-
6260
// Close the socket to signal to the socketStream to shut down.
6361
testutil.FatalIfErr(t, s.Close())
6462

@@ -91,7 +89,7 @@ func TestSocketStreamReadCompletedBecauseCancel(t *testing.T) {
9189
}
9290

9391
ctx, cancel := context.WithCancel(context.Background())
94-
waker, awaken := waker.NewTest(ctx, 1, "stream")
92+
waker := waker.NewTestAlways()
9593

9694
sockName := scheme + "://" + addr
9795
ss, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotDisabled)
@@ -108,7 +106,8 @@ func TestSocketStreamReadCompletedBecauseCancel(t *testing.T) {
108106
_, err = s.Write([]byte("1\n"))
109107
testutil.FatalIfErr(t, err)
110108

111-
awaken(0, 0) // Sync past read to ensure we read
109+
// Yield to give the stream a chance to read.
110+
time.Sleep(10 * time.Millisecond)
112111

113112
cancel() // This cancellation should cause the stream to shut down immediately.
114113
wg.Wait()

0 commit comments

Comments
 (0)