Skip to content

Commit 0758427

Browse files
committed
perf: move flush delay after WaitForWrite() to be more reactive
1 parent 4be3a21 commit 0758427

File tree

1 file changed

+10
-8
lines changed

1 file changed

+10
-8
lines changed

pipe.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -286,21 +286,20 @@ func (p *pipe) _backgroundWrite() (err error) {
286286
ones = make([]cmds.Completed, 1)
287287
multi []cmds.Completed
288288
ch chan RedisResult
289-
delay = p.maxFlushDelay
289+
290+
flushDelay = p.maxFlushDelay
291+
flushStart = time.Time{}
290292
)
291293

292294
for atomic.LoadInt32(&p.state) < 3 {
293295
if ones[0], multi, ch = p.queue.NextWriteCmd(); ch == nil {
296+
if flushDelay != 0 {
297+
flushStart = time.Now()
298+
}
294299
if p.w.Buffered() == 0 {
295300
err = p.Error()
296301
} else {
297-
if delay == 0 || atomic.LoadInt32(&p.waits) == 1 { // do not delay for sequential usage
298-
err = p.w.Flush()
299-
} else {
300-
ts := time.Now()
301-
err = p.w.Flush()
302-
time.Sleep(delay - time.Since(ts)) // ref: https://github.com/rueian/rueidis/issues/156
303-
}
302+
err = p.w.Flush()
304303
}
305304
if err == nil {
306305
if atomic.LoadInt32(&p.state) == 1 {
@@ -309,6 +308,9 @@ func (p *pipe) _backgroundWrite() (err error) {
309308
runtime.Gosched()
310309
continue
311310
}
311+
if flushDelay != 0 && atomic.LoadInt32(&p.waits) > 1 { // do not delay for sequential usage
312+
time.Sleep(flushDelay - time.Since(flushStart)) // ref: https://github.com/rueian/rueidis/issues/156
313+
}
312314
}
313315
}
314316
if ch != nil && multi == nil {

0 commit comments

Comments
 (0)