Skip to content

Commit bb5920e

Browse files
authored
Merge pull request #879 from google/logstream-context-cancellation
fix: Use context cancellation instead of `Stop` to stop streams.
2 parents ea0eccf + 3d4fdba commit bb5920e

File tree

10 files changed

+196
-184
lines changed

10 files changed

+196
-184
lines changed

internal/tailer/logstream/dgramstream.go

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ import (
1616
)
1717

1818
type dgramStream struct {
19-
ctx context.Context
19+
cancel context.CancelFunc
20+
2021
lines chan<- *logline.LogLine
2122

2223
scheme string // Datagram scheme, either "unixgram" or "udp".
@@ -25,17 +26,15 @@ type dgramStream struct {
2526
mu sync.RWMutex // protects following fields
2627
completed bool // This pipestream is completed and can no longer be used.
2728
lastReadTime time.Time // Last time a log line was read from this named pipe
28-
29-
stopOnce sync.Once // Ensure stopChan only closed once.
30-
stopChan chan struct{} // Close to start graceful shutdown.
3129
}
3230

33-
func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, lines chan<- *logline.LogLine) (LogStream, error) {
31+
func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) {
3432
if address == "" {
3533
return nil, ErrEmptySocketAddress
3634
}
37-
ss := &dgramStream{ctx: ctx, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines, stopChan: make(chan struct{})}
38-
if err := ss.stream(ctx, wg, waker); err != nil {
35+
ctx, cancel := context.WithCancel(ctx)
36+
ss := &dgramStream{cancel: cancel, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines}
37+
if err := ss.stream(ctx, wg, waker, oneShot); err != nil {
3938
return nil, err
4039
}
4140
return ss, nil
@@ -50,22 +49,22 @@ func (ss *dgramStream) LastReadTime() time.Time {
5049
// The read buffer size for datagrams.
5150
const datagramReadBufferSize = 131072
5251

53-
func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker) error {
52+
func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, oneShot OneShotMode) error {
5453
c, err := net.ListenPacket(ss.scheme, ss.address)
5554
if err != nil {
5655
logErrors.Add(ss.address, 1)
5756
return err
5857
}
59-
glog.V(2).Infof("opened new datagram socket %v", c)
58+
glog.V(2).Infof("stream(%s:%s): opened new datagram socket %v", ss.scheme, ss.address, c)
6059
b := make([]byte, datagramReadBufferSize)
6160
partial := bytes.NewBufferString("")
6261
var total int
6362
wg.Add(1)
6463
go func() {
6564
defer wg.Done()
6665
defer func() {
67-
glog.V(2).Infof("%v: read total %d bytes from %s", c, total, ss.address)
68-
glog.V(2).Infof("%v: closing connection", c)
66+
glog.V(2).Infof("stream(%s:%s): read total %d bytes", ss.scheme, ss.address, total)
67+
glog.V(2).Infof("stream(%s:%s): closing connection", ss.scheme, ss.address)
6968
err := c.Close()
7069
if err != nil {
7170
logErrors.Add(ss.address, 1)
@@ -83,15 +82,25 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
8382

8483
for {
8584
n, _, err := c.ReadFrom(b)
86-
glog.V(2).Infof("%v: read %d bytes, err is %v", c, n, err)
85+
glog.V(2).Infof("stream(%s:%s): read %d bytes, err is %v", ss.scheme, ss.address, n, err)
8786

8887
// This is a test-only trick that says if we've already put this
8988
// logstream in graceful shutdown, then a zero-byte read is
9089
// equivalent to an "EOF" in connection and file oriented streams.
9190
if n == 0 {
91+
if oneShot {
92+
glog.V(2).Infof("stream(%s:%s): exiting because zero byte read and one shot", ss.scheme, ss.address)
93+
if partial.Len() > 0 {
94+
sendLine(ctx, ss.address, partial, ss.lines)
95+
}
96+
return
97+
}
9298
select {
93-
case <-ss.stopChan:
94-
glog.V(2).Infof("%v: exiting because zero byte read after Stop", c)
99+
case <-ctx.Done():
100+
glog.V(2).Infof("stream(%s:%s): exiting because zero byte read after cancellation", ss.scheme, ss.address)
101+
if partial.Len() > 0 {
102+
sendLine(ctx, ss.address, partial, ss.lines)
103+
}
95104
return
96105
default:
97106
}
@@ -100,7 +109,7 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
100109
if n > 0 {
101110
total += n
102111
//nolint:contextcheck
103-
decodeAndSend(ss.ctx, ss.lines, ss.address, n, b[:n], partial)
112+
decodeAndSend(ctx, ss.lines, ss.address, n, b[:n], partial)
104113
ss.mu.Lock()
105114
ss.lastReadTime = time.Now()
106115
ss.mu.Unlock()
@@ -110,28 +119,23 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
110119
if partial.Len() > 0 {
111120
sendLine(ctx, ss.address, partial, ss.lines)
112121
}
113-
glog.V(2).Infof("%v: exiting, stream has error %s", c, err)
122+
glog.V(2).Infof("stream(%s:%s): exiting, stream has error %s", ss.scheme, ss.address, err)
114123
return
115124
}
116125

117126
// Yield and wait
118-
glog.V(2).Infof("%v: waiting", c)
127+
glog.V(2).Infof("stream(%s:%s): waiting", ss.scheme, ss.address)
119128
select {
120-
case <-ss.stopChan:
129+
case <-ctx.Done():
121130
// We may have started waiting here when the stop signal
122131
// arrives, but since that wait the file may have been
123132
// written to. The file is not technically yet at EOF so
124133
// we need to go back and try one more read. We'll exit
125134
// the stream in the zero byte handler above.
126-
glog.V(2).Infof("%v: Stopping after next zero byte read", c)
127-
case <-ctx.Done():
128-
// Exit immediately; a cancelled context will set an immediate
129-
// deadline on the next read which will cause us to exit then,
130-
// so don't bother going around the loop again.
131-
return
135+
glog.V(2).Infof("stream(%s:%s): Stopping after next zero byte read", ss.scheme, ss.address)
132136
case <-waker.Wake():
133137
// sleep until next Wake()
134-
glog.V(2).Infof("%v: Wake received", c)
138+
glog.V(2).Infof("stream(%s:%s): Wake received", ss.scheme, ss.address)
135139
}
136140
}
137141
}()
@@ -145,8 +149,6 @@ func (ss *dgramStream) IsComplete() bool {
145149
}
146150

147151
func (ss *dgramStream) Stop() {
148-
glog.V(2).Infof("Stop received on datagram stream.")
149-
ss.stopOnce.Do(func() {
150-
close(ss.stopChan)
151-
})
152+
glog.V(2).Infof("stream(%s:%s): Stop received on datagram stream.", ss.scheme, ss.address)
153+
ss.cancel()
152154
}

internal/tailer/logstream/dgramstream_unix_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
4343
waker, awaken := waker.NewTest(ctx, 1, "stream")
4444

4545
sockName := scheme + "://" + addr
46-
ss, err := logstream.New(ctx, &wg, waker, sockName, lines, logstream.OneShotDisabled)
46+
ss, err := logstream.New(ctx, &wg, waker, sockName, lines, logstream.OneShotEnabled)
4747
testutil.FatalIfErr(t, err)
4848

4949
s, err := net.Dial(scheme, addr)
@@ -54,9 +54,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
5454

5555
awaken(0, 0) // sync past read
5656

57-
ss.Stop()
58-
59-
// "Close" the socket by sending zero bytes, which after Stop tells the stream to act as if we're done.
57+
// "Close" the socket by sending zero bytes, which in oneshot mode tells the stream to act as if we're done.
6058
_, err = s.Write([]byte{})
6159
testutil.FatalIfErr(t, err)
6260

@@ -65,7 +63,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
6563

6664
received := testutil.LinesReceived(lines)
6765
expected := []*logline.LogLine{
68-
{context.TODO(), addr, "1"},
66+
{Context: context.TODO(), Filename: addr, Line: "1"},
6967
}
7068
testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context"))
7169

@@ -118,7 +116,7 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) {
118116

119117
received := testutil.LinesReceived(lines)
120118
expected := []*logline.LogLine{
121-
{context.TODO(), addr, "1"},
119+
{Context: context.TODO(), Filename: addr, Line: "1"},
122120
}
123121
testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context"))
124122

0 commit comments

Comments
 (0)