Skip to content

Commit 1aba42b

Browse files
committed
perf: evenly dispatch requests to conns and limit the default conns count in single client
1 parent 2911b22 commit 1aba42b

File tree

3 files changed

+45
-11
lines changed

3 files changed

+45
-11
lines changed

mux.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package rueidis
22

33
import (
44
"context"
5+
"math/rand"
56
"net"
67
"runtime"
78
"sync"
@@ -223,7 +224,7 @@ func (m *mux) blockingMulti(ctx context.Context, cmd []Completed) (resp *redisre
223224
}
224225

225226
func (m *mux) pipeline(ctx context.Context, cmd Completed) (resp RedisResult) {
226-
slot := cmd.Slot() & uint16(len(m.wire)-1)
227+
slot := slotfn(cmd.Slot(), len(m.wire))
227228
wire := m.pipe(slot)
228229
if resp = wire.Do(ctx, cmd); isBroken(resp.NonRedisError(), wire) {
229230
m.wire[slot].CompareAndSwap(wire, m.init)
@@ -232,7 +233,7 @@ func (m *mux) pipeline(ctx context.Context, cmd Completed) (resp RedisResult) {
232233
}
233234

234235
func (m *mux) pipelineMulti(ctx context.Context, cmd []Completed) (resp *redisresults) {
235-
slot := cmd[0].Slot() & uint16(len(m.wire)-1)
236+
slot := slotfn(cmd[0].Slot(), len(m.wire))
236237
wire := m.pipe(slot)
237238
resp = wire.DoMulti(ctx, cmd...)
238239
for _, r := range resp.s {
@@ -311,7 +312,7 @@ func (m *mux) doMultiCache(ctx context.Context, slot uint16, multi []CacheableTT
311312
}
312313

313314
func (m *mux) Receive(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
314-
slot := subscribe.Slot() & uint16(len(m.wire)-1)
315+
slot := slotfn(subscribe.Slot(), len(m.wire))
315316
wire := m.pipe(slot)
316317
err := wire.Receive(ctx, subscribe, fn)
317318
if isBroken(err, wire) {
@@ -346,3 +347,23 @@ func (m *mux) Addr() string {
346347
func isBroken(err error, w wire) bool {
347348
return err != nil && err != ErrClosing && w.Error() != nil
348349
}
350+
351+
var rngPool = sync.Pool{
352+
New: func() any {
353+
return rand.New(rand.NewSource(time.Now().UnixNano()))
354+
},
355+
}
356+
357+
func fastrand(n int) (r int) {
358+
s := rngPool.Get().(*rand.Rand)
359+
r = s.Intn(n)
360+
rngPool.Put(s)
361+
return
362+
}
363+
364+
func slotfn(ks uint16, n int) uint16 {
365+
if n == 1 || ks == cmds.NoSlot {
366+
return 0
367+
}
368+
return uint16(fastrand(n))
369+
}

redis_test.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@ func parallel(p int) (chan func(), func()) {
1818
wg.Add(p)
1919
for i := 0; i < p; i++ {
2020
go func() {
21+
defer func() {
22+
recover()
23+
wg.Done()
24+
}()
2125
for fn := range ch {
2226
fn()
2327
}
24-
wg.Done()
2528
}()
2629
}
2730
return ch, func() {
@@ -120,7 +123,7 @@ func testSETGET(t *testing.T, client Client, csc bool) {
120123
jobs <- func() {
121124
val, err := client.Do(ctx, client.B().Set().Key(key).Value(kvs[key]).Build()).ToString()
122125
if err != nil || val != "OK" {
123-
t.Fatalf("unexpected set response %v %v", val, err)
126+
t.Errorf("unexpected set response %v %v", val, err)
124127
}
125128
}
126129
}
@@ -133,7 +136,7 @@ func testSETGET(t *testing.T, client Client, csc bool) {
133136
jobs <- func() {
134137
val, err := client.Do(ctx, client.B().Get().Key(key).Build()).ToString()
135138
if v, ok := kvs[key]; !((ok && val == v) || (!ok && IsRedisNil(err))) {
136-
t.Fatalf("unexpected get response %v %v %v", val, err, ok)
139+
t.Errorf("unexpected get response %v %v %v", val, err, ok)
137140
}
138141
}
139142
}
@@ -148,7 +151,7 @@ func testSETGET(t *testing.T, client Client, csc bool) {
148151
resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), time.Minute)
149152
val, err := resp.ToString()
150153
if v, ok := kvs[key]; !((ok && val == v) || (!ok && IsRedisNil(err))) {
151-
t.Fatalf("unexpected csc get response %v %v %v", val, err, ok)
154+
t.Errorf("unexpected csc get response %v %v %v", val, err, ok)
152155
}
153156
if resp.IsCacheHit() {
154157
atomic.AddInt64(&hits, 1)
@@ -175,23 +178,25 @@ func testSETGET(t *testing.T, client Client, csc bool) {
175178
jobs <- func() {
176179
val, err := client.Do(ctx, client.B().Del().Key(key).Build()).AsInt64()
177180
if _, ok := kvs[key]; !((val == 1 && ok) || (val == 0 && !ok)) {
178-
t.Fatalf("unexpected del response %v %v %v", val, err, ok)
181+
t.Errorf("unexpected del response %v %v %v", val, err, ok)
179182
}
180183
}
181184
}
182185
wait()
183186

187+
time.Sleep(time.Second)
188+
184189
t.Logf("testing client side caching after delete\n")
185190
jobs, wait = parallel(para)
186191
for i := 0; i < keys/100; i++ {
187192
key := strconv.Itoa(i)
188193
jobs <- func() {
189194
resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), time.Minute)
190195
if !IsRedisNil(resp.Error()) {
191-
t.Fatalf("unexpected csc get response after delete %v", resp)
196+
t.Errorf("unexpected csc get response after delete %v", resp)
192197
}
193198
if resp.IsCacheHit() {
194-
t.Fatalf("unexpected csc cache hit after delete")
199+
t.Errorf("unexpected csc cache hit after delete")
195200
}
196201
}
197202
}
@@ -320,6 +325,8 @@ func testMultiSETGET(t *testing.T, client Client, csc bool) {
320325
}
321326
wait()
322327

328+
time.Sleep(time.Second)
329+
323330
t.Logf("testing client side caching after delete\n")
324331
jobs, wait = parallel(para)
325332
for i := 0; i < keys/100; i += batch {
@@ -415,7 +422,9 @@ func testMultiSETGETHelpers(t *testing.T, client Client, csc bool) {
415422
t.Fatalf("unexpecetd err %v\n", err)
416423
}
417424
}
425+
418426
time.Sleep(time.Second)
427+
419428
t.Logf("testing client side caching after delete\n")
420429
resp, err = MGetCache(client, ctx, time.Minute, cmdKeys)
421430
if err != nil {

rueidis.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import (
55
"context"
66
"crypto/tls"
77
"errors"
8+
"math"
89
"math/rand"
910
"net"
11+
"runtime"
1012
"strings"
1113
"time"
1214

@@ -319,7 +321,9 @@ func NewClient(option ClientOption) (client Client, err error) {
319321

320322
func singleClientMultiplex(multiplex int) int {
321323
if multiplex == 0 {
322-
multiplex = 2
324+
if multiplex = int(math.Log2(float64(runtime.GOMAXPROCS(0)))); multiplex >= 2 {
325+
multiplex = 2
326+
}
323327
}
324328
if multiplex < 0 {
325329
multiplex = 0

0 commit comments

Comments
 (0)