Skip to content

Commit 9f8a06d

Browse files
authored
Merge pull request #291 from redis/perf-reduce-domulti-ch-send
perf: improve DoMulti by reducing chansends
2 parents 47c3d7c + 1aba42b commit 9f8a06d

File tree

6 files changed

+89
-51
lines changed

6 files changed

+89
-51
lines changed

mux.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package rueidis
22

33
import (
44
"context"
5+
"math/rand"
56
"net"
67
"runtime"
78
"sync"
@@ -223,7 +224,7 @@ func (m *mux) blockingMulti(ctx context.Context, cmd []Completed) (resp *redisre
223224
}
224225

225226
func (m *mux) pipeline(ctx context.Context, cmd Completed) (resp RedisResult) {
226-
slot := cmd.Slot() & uint16(len(m.wire)-1)
227+
slot := slotfn(cmd.Slot(), len(m.wire))
227228
wire := m.pipe(slot)
228229
if resp = wire.Do(ctx, cmd); isBroken(resp.NonRedisError(), wire) {
229230
m.wire[slot].CompareAndSwap(wire, m.init)
@@ -232,7 +233,7 @@ func (m *mux) pipeline(ctx context.Context, cmd Completed) (resp RedisResult) {
232233
}
233234

234235
func (m *mux) pipelineMulti(ctx context.Context, cmd []Completed) (resp *redisresults) {
235-
slot := cmd[0].Slot() & uint16(len(m.wire)-1)
236+
slot := slotfn(cmd[0].Slot(), len(m.wire))
236237
wire := m.pipe(slot)
237238
resp = wire.DoMulti(ctx, cmd...)
238239
for _, r := range resp.s {
@@ -311,7 +312,7 @@ func (m *mux) doMultiCache(ctx context.Context, slot uint16, multi []CacheableTT
311312
}
312313

313314
func (m *mux) Receive(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
314-
slot := subscribe.Slot() & uint16(len(m.wire)-1)
315+
slot := slotfn(subscribe.Slot(), len(m.wire))
315316
wire := m.pipe(slot)
316317
err := wire.Receive(ctx, subscribe, fn)
317318
if isBroken(err, wire) {
@@ -346,3 +347,23 @@ func (m *mux) Addr() string {
346347
func isBroken(err error, w wire) bool {
347348
return err != nil && err != ErrClosing && w.Error() != nil
348349
}
350+
351+
var rngPool = sync.Pool{
352+
New: func() any {
353+
return rand.New(rand.NewSource(time.Now().UnixNano()))
354+
},
355+
}
356+
357+
func fastrand(n int) (r int) {
358+
s := rngPool.Get().(*rand.Rand)
359+
r = s.Intn(n)
360+
rngPool.Put(s)
361+
return
362+
}
363+
364+
func slotfn(ks uint16, n int) uint16 {
365+
if n == 1 || ks == cmds.NoSlot {
366+
return 0
367+
}
368+
return uint16(fastrand(n))
369+
}

pipe.go

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -313,8 +313,7 @@ func (p *pipe) _background() {
313313
}
314314

315315
var (
316-
ones = make([]Completed, 1)
317-
multi []Completed
316+
resps []RedisResult
318317
ch chan RedisResult
319318
cond *sync.Cond
320319
)
@@ -332,13 +331,12 @@ func (p *pipe) _background() {
332331
_, _, _ = p.queue.NextWriteCmd()
333332
default:
334333
}
335-
if ones[0], multi, ch, cond = p.queue.NextResultCh(); ch != nil {
336-
if multi == nil {
337-
multi = ones
338-
}
339-
for range multi {
340-
ch <- newErrResult(p.Error())
334+
if _, _, ch, resps, cond = p.queue.NextResultCh(); ch != nil {
335+
err := newErrResult(p.Error())
336+
for i := range resps {
337+
resps[i] = err
341338
}
339+
ch <- err
342340
cond.L.Unlock()
343341
cond.Signal()
344342
} else {
@@ -405,6 +403,7 @@ func (p *pipe) _backgroundRead() (err error) {
405403
cond *sync.Cond
406404
ones = make([]Completed, 1)
407405
multi []Completed
406+
resps []RedisResult
408407
ch chan RedisResult
409408
ff int // fulfilled count
410409
skip int // skip rest push messages
@@ -415,10 +414,12 @@ func (p *pipe) _backgroundRead() (err error) {
415414
)
416415

417416
defer func() {
417+
resp := newErrResult(err)
418418
if err != nil && ff < len(multi) {
419-
for ; ff < len(multi); ff++ {
420-
ch <- newErrResult(err)
419+
for ; ff < len(resps); ff++ {
420+
resps[ff] = resp
421421
}
422+
ch <- resp
422423
cond.L.Unlock()
423424
cond.Signal()
424425
}
@@ -462,7 +463,7 @@ func (p *pipe) _backgroundRead() (err error) {
462463
}
463464
if ff == len(multi) {
464465
ff = 0
465-
ones[0], multi, ch, cond = p.queue.NextResultCh() // ch should not be nil, otherwise it must be a protocol bug
466+
ones[0], multi, ch, resps, cond = p.queue.NextResultCh() // ch should not be nil, otherwise it must be a protocol bug
466467
if ch == nil {
467468
cond.L.Unlock()
468469
// Redis will send sunsubscribe notification proactively in the event of slot migration.
@@ -523,8 +524,12 @@ func (p *pipe) _backgroundRead() (err error) {
523524
} else if multi[ff].NoReply() && msg.string == "QUEUED" {
524525
panic(multiexecsub)
525526
}
526-
ch <- newResult(msg, err)
527+
resp := newResult(msg, err)
528+
if resps != nil {
529+
resps[ff] = resp
530+
}
527531
if ff++; ff == len(multi) {
532+
ch <- resp
528533
cond.L.Unlock()
529534
cond.Signal()
530535
}
@@ -911,32 +916,28 @@ func (p *pipe) DoMulti(ctx context.Context, multi ...Completed) *redisresults {
911916
return resp
912917

913918
queue:
914-
ch := p.queue.PutMulti(multi)
919+
ch := p.queue.PutMulti(multi, resp.s)
915920
var i int
916921
if ctxCh := ctx.Done(); ctxCh == nil {
917-
for ; i < len(resp.s); i++ {
918-
resp.s[i] = <-ch
919-
}
922+
<-ch
920923
} else {
921-
for ; i < len(resp.s); i++ {
922-
select {
923-
case resp.s[i] = <-ch:
924-
case <-ctxCh:
925-
goto abort
926-
}
924+
select {
925+
case <-ch:
926+
case <-ctxCh:
927+
goto abort
927928
}
928929
}
929930
atomic.AddInt32(&p.waits, -1)
930931
atomic.AddInt32(&p.recvs, 1)
931932
return resp
932933
abort:
933-
go func(i int) {
934-
for ; i < len(resp.s); i++ {
935-
<-ch
936-
}
934+
go func(i int, resp *redisresults) {
935+
<-ch
936+
resultsp.Put(resp)
937937
atomic.AddInt32(&p.waits, -1)
938938
atomic.AddInt32(&p.recvs, 1)
939-
}(i)
939+
}(i, resp)
940+
resp = resultsp.Get(len(multi), len(multi))
940941
err := newErrResult(ctx.Err())
941942
for ; i < len(resp.s); i++ {
942943
resp.s[i] = err

redis_test.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@ func parallel(p int) (chan func(), func()) {
1818
wg.Add(p)
1919
for i := 0; i < p; i++ {
2020
go func() {
21+
defer func() {
22+
recover()
23+
wg.Done()
24+
}()
2125
for fn := range ch {
2226
fn()
2327
}
24-
wg.Done()
2528
}()
2629
}
2730
return ch, func() {
@@ -120,7 +123,7 @@ func testSETGET(t *testing.T, client Client, csc bool) {
120123
jobs <- func() {
121124
val, err := client.Do(ctx, client.B().Set().Key(key).Value(kvs[key]).Build()).ToString()
122125
if err != nil || val != "OK" {
123-
t.Fatalf("unexpected set response %v %v", val, err)
126+
t.Errorf("unexpected set response %v %v", val, err)
124127
}
125128
}
126129
}
@@ -133,7 +136,7 @@ func testSETGET(t *testing.T, client Client, csc bool) {
133136
jobs <- func() {
134137
val, err := client.Do(ctx, client.B().Get().Key(key).Build()).ToString()
135138
if v, ok := kvs[key]; !((ok && val == v) || (!ok && IsRedisNil(err))) {
136-
t.Fatalf("unexpected get response %v %v %v", val, err, ok)
139+
t.Errorf("unexpected get response %v %v %v", val, err, ok)
137140
}
138141
}
139142
}
@@ -148,7 +151,7 @@ func testSETGET(t *testing.T, client Client, csc bool) {
148151
resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), time.Minute)
149152
val, err := resp.ToString()
150153
if v, ok := kvs[key]; !((ok && val == v) || (!ok && IsRedisNil(err))) {
151-
t.Fatalf("unexpected csc get response %v %v %v", val, err, ok)
154+
t.Errorf("unexpected csc get response %v %v %v", val, err, ok)
152155
}
153156
if resp.IsCacheHit() {
154157
atomic.AddInt64(&hits, 1)
@@ -175,23 +178,25 @@ func testSETGET(t *testing.T, client Client, csc bool) {
175178
jobs <- func() {
176179
val, err := client.Do(ctx, client.B().Del().Key(key).Build()).AsInt64()
177180
if _, ok := kvs[key]; !((val == 1 && ok) || (val == 0 && !ok)) {
178-
t.Fatalf("unexpected del response %v %v %v", val, err, ok)
181+
t.Errorf("unexpected del response %v %v %v", val, err, ok)
179182
}
180183
}
181184
}
182185
wait()
183186

187+
time.Sleep(time.Second)
188+
184189
t.Logf("testing client side caching after delete\n")
185190
jobs, wait = parallel(para)
186191
for i := 0; i < keys/100; i++ {
187192
key := strconv.Itoa(i)
188193
jobs <- func() {
189194
resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), time.Minute)
190195
if !IsRedisNil(resp.Error()) {
191-
t.Fatalf("unexpected csc get response after delete %v", resp)
196+
t.Errorf("unexpected csc get response after delete %v", resp)
192197
}
193198
if resp.IsCacheHit() {
194-
t.Fatalf("unexpected csc cache hit after delete")
199+
t.Errorf("unexpected csc cache hit after delete")
195200
}
196201
}
197202
}
@@ -320,6 +325,8 @@ func testMultiSETGET(t *testing.T, client Client, csc bool) {
320325
}
321326
wait()
322327

328+
time.Sleep(time.Second)
329+
323330
t.Logf("testing client side caching after delete\n")
324331
jobs, wait = parallel(para)
325332
for i := 0; i < keys/100; i += batch {
@@ -415,7 +422,9 @@ func testMultiSETGETHelpers(t *testing.T, client Client, csc bool) {
415422
t.Fatalf("unexpecetd err %v\n", err)
416423
}
417424
}
425+
418426
time.Sleep(time.Second)
427+
419428
t.Logf("testing client side caching after delete\n")
420429
resp, err = MGetCache(client, ctx, time.Minute, cmdKeys)
421430
if err != nil {

ring.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ import (
77

88
type queue interface {
99
PutOne(m Completed) chan RedisResult
10-
PutMulti(m []Completed) chan RedisResult
10+
PutMulti(m []Completed, resps []RedisResult) chan RedisResult
1111
NextWriteCmd() (Completed, []Completed, chan RedisResult)
1212
WaitForWrite() (Completed, []Completed, chan RedisResult)
13-
NextResultCh() (Completed, []Completed, chan RedisResult, *sync.Cond)
13+
NextResultCh() (Completed, []Completed, chan RedisResult, []RedisResult, *sync.Cond)
1414
}
1515

1616
var _ queue = (*ring)(nil)
@@ -46,6 +46,7 @@ type node struct {
4646
ch chan RedisResult
4747
one Completed
4848
multi []Completed
49+
resps []RedisResult
4950
mark uint32
5051
slept bool
5152
}
@@ -58,6 +59,7 @@ func (r *ring) PutOne(m Completed) chan RedisResult {
5859
}
5960
n.one = m
6061
n.multi = nil
62+
n.resps = nil
6163
n.mark = 1
6264
s := n.slept
6365
n.c1.L.Unlock()
@@ -67,14 +69,15 @@ func (r *ring) PutOne(m Completed) chan RedisResult {
6769
return n.ch
6870
}
6971

70-
func (r *ring) PutMulti(m []Completed) chan RedisResult {
72+
func (r *ring) PutMulti(m []Completed, resps []RedisResult) chan RedisResult {
7173
n := &r.store[atomic.AddUint64(&r.write, 1)&r.mask]
7274
n.c1.L.Lock()
7375
for n.mark != 0 {
7476
n.c1.Wait()
7577
}
7678
n.one = Completed{}
7779
n.multi = m
80+
n.resps = resps
7881
n.mark = 1
7982
s := n.slept
8083
n.c1.L.Unlock()
@@ -118,14 +121,14 @@ func (r *ring) WaitForWrite() (one Completed, multi []Completed, ch chan RedisRe
118121
}
119122

120123
// NextResultCh should be only called by one dedicated thread
121-
func (r *ring) NextResultCh() (one Completed, multi []Completed, ch chan RedisResult, cond *sync.Cond) {
124+
func (r *ring) NextResultCh() (one Completed, multi []Completed, ch chan RedisResult, resps []RedisResult, cond *sync.Cond) {
122125
r.read2++
123126
p := r.read2 & r.mask
124127
n := &r.store[p]
125128
cond = n.c1
126129
n.c1.L.Lock()
127130
if n.mark == 2 {
128-
one, multi, ch = n.one, n.multi, n.ch
131+
one, multi, ch, resps = n.one, n.multi, n.ch, n.resps
129132
n.mark = 0
130133
} else {
131134
r.read2--

ring_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func TestRing(t *testing.T) {
3030
runtime.Gosched()
3131
continue
3232
}
33-
cmd2, _, ch, cond := ring.NextResultCh()
33+
cmd2, _, ch, _, cond := ring.NextResultCh()
3434
cond.L.Unlock()
3535
cond.Signal()
3636
if cmd1.Commands()[0] != cmd2.Commands()[0] {
@@ -53,7 +53,7 @@ func TestRing(t *testing.T) {
5353

5454
base := [][]string{{"a"}, {"b"}, {"c"}, {"d"}}
5555
for cmd := range fixture {
56-
go ring.PutMulti(cmds.NewMultiCompleted(append([][]string{{cmd}}, base...)))
56+
go ring.PutMulti(cmds.NewMultiCompleted(append([][]string{{cmd}}, base...)), nil)
5757
}
5858

5959
for len(fixture) != 0 {
@@ -62,7 +62,7 @@ func TestRing(t *testing.T) {
6262
runtime.Gosched()
6363
continue
6464
}
65-
_, cmd2, ch, cond := ring.NextResultCh()
65+
_, cmd2, ch, _, cond := ring.NextResultCh()
6666
cond.L.Unlock()
6767
cond.Signal()
6868
for j := 0; j < len(cmd1); j++ {
@@ -82,7 +82,7 @@ func TestRing(t *testing.T) {
8282
if one, multi, _ := ring.NextWriteCmd(); !one.IsEmpty() || multi != nil {
8383
t.Fatalf("NextWriteCmd should returns nil if empty")
8484
}
85-
if one, multi, ch, cond := ring.NextResultCh(); !one.IsEmpty() || multi != nil || ch != nil {
85+
if one, multi, ch, _, cond := ring.NextResultCh(); !one.IsEmpty() || multi != nil || ch != nil {
8686
t.Fatalf("NextResultCh should returns nil if not NextWriteCmd yet")
8787
} else {
8888
cond.L.Unlock()
@@ -93,18 +93,18 @@ func TestRing(t *testing.T) {
9393
if one, _, _ := ring.NextWriteCmd(); len(one.Commands()) == 0 || one.Commands()[0] != "0" {
9494
t.Fatalf("NextWriteCmd should returns next cmd")
9595
}
96-
if one, _, ch, cond := ring.NextResultCh(); len(one.Commands()) == 0 || one.Commands()[0] != "0" || ch == nil {
96+
if one, _, ch, _, cond := ring.NextResultCh(); len(one.Commands()) == 0 || one.Commands()[0] != "0" || ch == nil {
9797
t.Fatalf("NextResultCh should returns next cmd after NextWriteCmd")
9898
} else {
9999
cond.L.Unlock()
100100
cond.Signal()
101101
}
102102

103-
ring.PutMulti(cmds.NewMultiCompleted([][]string{{"0"}}))
103+
ring.PutMulti(cmds.NewMultiCompleted([][]string{{"0"}}), nil)
104104
if _, multi, _ := ring.NextWriteCmd(); len(multi) == 0 || multi[0].Commands()[0] != "0" {
105105
t.Fatalf("NextWriteCmd should returns next cmd")
106106
}
107-
if _, multi, ch, cond := ring.NextResultCh(); len(multi) == 0 || multi[0].Commands()[0] != "0" || ch == nil {
107+
if _, multi, ch, _, cond := ring.NextResultCh(); len(multi) == 0 || multi[0].Commands()[0] != "0" || ch == nil {
108108
t.Fatalf("NextResultCh should returns next cmd after NextWriteCmd")
109109
} else {
110110
cond.L.Unlock()
@@ -131,7 +131,7 @@ func TestRing(t *testing.T) {
131131
if _, multi, ch := ring.NextWriteCmd(); ch == nil {
132132
go func() {
133133
time.Sleep(time.Millisecond * 100)
134-
ring.PutMulti([]Completed{cmds.QuitCmd})
134+
ring.PutMulti([]Completed{cmds.QuitCmd}, nil)
135135
}()
136136
if _, multi, ch = ring.WaitForWrite(); ch != nil && multi[0].Commands()[0] == cmds.QuitCmd.Commands()[0] {
137137
return

0 commit comments

Comments
 (0)