Skip to content

Commit a595a0d

Browse files
Shared pool among clients and fixed pool resources release (#51)
* [add] added option to create clients with shared pool. added option to releases the resources used by the pool ( single, and multihost ) * [add] added Close() test on multi-host pool * [add] extended test for Close on multihost pool * [add] swapped deprecated redis.NewPool to redis.Pool constructor. preserving all errors on multihost pool CLose() * [fix] fixed CI invalid memory address or nil pointer dereference on TestNewMultiHostPool/multi-host_single_address
1 parent 7ccd0b5 commit a595a0d

File tree

8 files changed

+321
-194
lines changed

8 files changed

+321
-194
lines changed

redisearch/aggregate_test.go

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,6 @@ import (
1616
"testing"
1717
)
1818

19-
func createClient(indexName string) *Client {
20-
value, exists := os.LookupEnv("REDISEARCH_TEST_HOST")
21-
host := "localhost:6379"
22-
if exists && value != "" {
23-
host = value
24-
}
25-
return NewClient(host, indexName)
26-
}
2719

2820
// Game struct which contains a Asin, a Description, a Title, a Price, and a list of categories
2921
// a type and a list of social links
@@ -38,6 +30,68 @@ type Game struct {
3830
Categories []string `json:"categories"`
3931
}
4032

33+
34+
func init() {
35+
/* load test data */
36+
value, exists := os.LookupEnv("REDISEARCH_RDB_LOADED")
37+
requiresDatagen := true
38+
if exists && value != "" {
39+
requiresDatagen = false
40+
}
41+
if requiresDatagen {
42+
c := createClient("bench.ft.aggregate")
43+
44+
sc := NewSchema(DefaultOptions).
45+
AddField(NewTextField("foo"))
46+
c.Drop()
47+
if err := c.CreateIndex(sc); err != nil {
48+
log.Fatal(err)
49+
}
50+
ndocs := 10000
51+
docs := make([]Document, ndocs)
52+
for i := 0; i < ndocs; i++ {
53+
docs[i] = NewDocument(fmt.Sprintf("doc%d", i), 1).Set("foo", "hello world")
54+
}
55+
56+
if err := c.IndexOptions(DefaultIndexingOptions, docs...); err != nil {
57+
log.Fatal(err)
58+
}
59+
}
60+
61+
}
62+
63+
func benchmarkAggregate(c *Client, q *AggregateQuery, b *testing.B) {
64+
for n := 0; n < b.N; n++ {
65+
c.Aggregate(q)
66+
}
67+
}
68+
69+
func benchmarkAggregateCursor(c *Client, q *AggregateQuery, b *testing.B) {
70+
for n := 0; n < b.N; n++ {
71+
c.Aggregate(q)
72+
for q.CursorHasResults() {
73+
c.Aggregate(q)
74+
}
75+
}
76+
}
77+
78+
func BenchmarkAgg_1(b *testing.B) {
79+
c := createClient("bench.ft.aggregate")
80+
q := NewAggregateQuery().
81+
SetQuery(NewQuery("*"))
82+
b.ResetTimer()
83+
benchmarkAggregate(c, q, b)
84+
}
85+
86+
func BenchmarkAggCursor_1(b *testing.B) {
87+
c := createClient("bench.ft.aggregate")
88+
q := NewAggregateQuery().
89+
SetQuery(NewQuery("*")).
90+
SetCursor(NewCursor())
91+
b.ResetTimer()
92+
benchmarkAggregateCursor(c, q, b)
93+
}
94+
4195
func AddValues(c *Client) {
4296
// Open our jsonFile
4397
bzipfile := "../tests/games.json.bz2"

redisearch/autocomplete_test.go

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,21 @@
1-
package redisearch_test
1+
package redisearch
22

33
import (
44
"fmt"
5-
"github.com/RediSearch/redisearch-go/redisearch"
65
"github.com/gomodule/redigo/redis"
76
"github.com/stretchr/testify/assert"
8-
"os"
97
"reflect"
108
"testing"
119
)
1210

13-
func createAutocompleter(dictName string) *redisearch.Autocompleter {
14-
value, exists := os.LookupEnv("REDISEARCH_TEST_HOST")
15-
host := "localhost:6379"
16-
if exists && value != "" {
17-
host = value
18-
}
19-
return redisearch.NewAutocompleter(host, dictName)
20-
}
21-
2211
func TestAutocompleter_Serialize(t *testing.T) {
23-
fuzzy := redisearch.DefaultSuggestOptions
12+
fuzzy := DefaultSuggestOptions
2413
fuzzy.Fuzzy = true
25-
withscores := redisearch.DefaultSuggestOptions
14+
withscores := DefaultSuggestOptions
2615
withscores.WithScores = true
27-
withpayloads := redisearch.DefaultSuggestOptions
16+
withpayloads := DefaultSuggestOptions
2817
withpayloads.WithPayloads = true
29-
all := redisearch.DefaultSuggestOptions
18+
all := DefaultSuggestOptions
3019
all.Fuzzy = true
3120
all.WithScores = true
3221
all.WithPayloads = true
@@ -36,7 +25,7 @@ func TestAutocompleter_Serialize(t *testing.T) {
3625
}
3726
type args struct {
3827
prefix string
39-
opts redisearch.SuggestOptions
28+
opts SuggestOptions
4029
}
4130
tests := []struct {
4231
name string
@@ -45,15 +34,15 @@ func TestAutocompleter_Serialize(t *testing.T) {
4534
want redis.Args
4635
want1 int
4736
}{
48-
{"default options", fields{"key1"}, args{"ab", redisearch.DefaultSuggestOptions,}, redis.Args{"key1", "ab", "MAX", 5}, 1},
37+
{"default options", fields{"key1"}, args{"ab", DefaultSuggestOptions,}, redis.Args{"key1", "ab", "MAX", 5}, 1},
4938
{"FUZZY", fields{"key1"}, args{"ab", fuzzy,}, redis.Args{"key1", "ab", "MAX", 5, "FUZZY"}, 1},
5039
{"WITHSCORES", fields{"key1"}, args{"ab", withscores,}, redis.Args{"key1", "ab", "MAX", 5, "WITHSCORES"}, 2},
5140
{"WITHPAYLOADS", fields{"key1"}, args{"ab", withpayloads,}, redis.Args{"key1", "ab", "MAX", 5, "WITHPAYLOADS"}, 2},
5241
{"all", fields{"key1"}, args{"ab", all,}, redis.Args{"key1", "ab", "MAX", 5, "FUZZY", "WITHSCORES", "WITHPAYLOADS"}, 3},
5342
}
5443
for _, tt := range tests {
5544
t.Run(tt.name, func(t *testing.T) {
56-
a := redisearch.NewAutocompleterFromPool(nil, tt.fields.name)
45+
a := NewAutocompleterFromPool(nil, tt.fields.name)
5746
got, got1 := a.Serialize(tt.args.prefix, tt.args.opts)
5847
if !reflect.DeepEqual(got, tt.want) {
5948
t.Errorf("serialize() got = %v, want %v", got, tt.want)
@@ -69,9 +58,9 @@ func TestSuggest(t *testing.T) {
6958
a := createAutocompleter("testing")
7059

7160
// Add Terms to the Autocompleter
72-
terms := make([]redisearch.Suggestion, 10)
61+
terms := make([]Suggestion, 10)
7362
for i := 0; i < 10; i++ {
74-
terms[i] = redisearch.Suggestion{Term: fmt.Sprintf("foo %d", i),
63+
terms[i] = Suggestion{Term: fmt.Sprintf("foo %d", i),
7564
Score: 1.0, Payload: fmt.Sprintf("bar %d", i)}
7665
}
7766
err := a.AddTerms(terms...)
@@ -80,7 +69,7 @@ func TestSuggest(t *testing.T) {
8069
assert.Nil(t, err)
8170
assert.Equal(t, int64(10), suglen)
8271
// Retrieve Terms From Autocompleter - Without Payloads / Scores
83-
suggestions, err := a.SuggestOpts("f", redisearch.SuggestOptions{Num: 10})
72+
suggestions, err := a.SuggestOpts("f", SuggestOptions{Num: 10})
8473
assert.Nil(t, err)
8574
assert.Equal(t, 10, len(suggestions))
8675
for _, suggestion := range suggestions {
@@ -90,7 +79,7 @@ func TestSuggest(t *testing.T) {
9079
}
9180

9281
// Retrieve Terms From Autocompleter - With Payloads & Scores
93-
suggestions, err = a.SuggestOpts("f", redisearch.SuggestOptions{Num: 10, WithScores: true, WithPayloads: true})
82+
suggestions, err = a.SuggestOpts("f", SuggestOptions{Num: 10, WithScores: true, WithPayloads: true})
9483
assert.Nil(t, err)
9584
assert.Equal(t, 10, len(suggestions))
9685
for _, suggestion := range suggestions {

redisearch/client.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,15 @@ func NewClient(addr, name string) *Client {
3838
return ret
3939
}
4040

41+
// NewAutocompleter creates a new Autocompleter with the given pool and index name
42+
func NewClientFromPool(pool *redis.Pool, name string) *Client {
43+
ret := &Client{
44+
pool: pool,
45+
name: name,
46+
}
47+
return ret
48+
}
49+
4150
// CreateIndex configues the index and creates it on redis
4251
func (i *Client) CreateIndex(s *Schema) (err error) {
4352
args := redis.Args{i.name}

redisearch/client_test.go

Lines changed: 17 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -2,73 +2,12 @@ package redisearch
22

33
import (
44
"fmt"
5-
"github.com/stretchr/testify/assert"
6-
"log"
7-
"os"
85
"reflect"
96
"testing"
10-
)
11-
12-
func init() {
13-
/* load test data */
14-
value, exists := os.LookupEnv("REDISEARCH_RDB_LOADED")
15-
requiresDatagen := true
16-
if exists && value != "" {
17-
requiresDatagen = false
18-
}
19-
if requiresDatagen {
20-
c := createClient("bench.ft.aggregate")
21-
22-
sc := NewSchema(DefaultOptions).
23-
AddField(NewTextField("foo"))
24-
c.Drop()
25-
if err := c.CreateIndex(sc); err != nil {
26-
log.Fatal(err)
27-
}
28-
ndocs := 10000
29-
docs := make([]Document, ndocs)
30-
for i := 0; i < ndocs; i++ {
31-
docs[i] = NewDocument(fmt.Sprintf("doc%d", i), 1).Set("foo", "hello world")
32-
}
33-
34-
if err := c.IndexOptions(DefaultIndexingOptions, docs...); err != nil {
35-
log.Fatal(err)
36-
}
37-
}
38-
39-
}
407

41-
func benchmarkAggregate(c *Client, q *AggregateQuery, b *testing.B) {
42-
for n := 0; n < b.N; n++ {
43-
c.Aggregate(q)
44-
}
45-
}
46-
47-
func benchmarkAggregateCursor(c *Client, q *AggregateQuery, b *testing.B) {
48-
for n := 0; n < b.N; n++ {
49-
c.Aggregate(q)
50-
for q.CursorHasResults() {
51-
c.Aggregate(q)
52-
}
53-
}
54-
}
55-
56-
func BenchmarkAgg_1(b *testing.B) {
57-
c := createClient("bench.ft.aggregate")
58-
q := NewAggregateQuery().
59-
SetQuery(NewQuery("*"))
60-
b.ResetTimer()
61-
benchmarkAggregate(c, q, b)
62-
}
63-
64-
func BenchmarkAggCursor_1(b *testing.B) {
65-
c := createClient("bench.ft.aggregate")
66-
q := NewAggregateQuery().
67-
SetQuery(NewQuery("*")).
68-
SetCursor(NewCursor())
69-
b.ResetTimer()
70-
benchmarkAggregateCursor(c, q, b)
71-
}
8+
"github.com/gomodule/redigo/redis"
9+
"github.com/stretchr/testify/assert"
10+
)
7211

7312
func TestClient_Get(t *testing.T) {
7413

@@ -492,6 +431,20 @@ func TestClient_Config(t *testing.T) {
492431
assert.Equal(t, "100", kvs["TIMEOUT"])
493432
}
494433

434+
func TestNewClientFromPool(t *testing.T) {
435+
host, password := getTestConnectionDetails()
436+
pool := &redis.Pool{Dial: func() (redis.Conn, error) {
437+
return redis.Dial("tcp", host, redis.DialPassword(password))
438+
}, MaxIdle: maxConns}
439+
client1 := NewClientFromPool(pool, "index1")
440+
client2 := NewClientFromPool(pool, "index2")
441+
assert.Equal(t, client1.pool, client2.pool)
442+
err1 := client1.pool.Close()
443+
err2 := client2.pool.Close()
444+
assert.Nil(t, err1)
445+
assert.Nil(t, err2)
446+
}
447+
495448
func TestClient_GetTagVals(t *testing.T) {
496449
c := createClient("testgettagvals")
497450

redisearch/document_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
package redisearch_test
1+
package redisearch
22

33
import (
4-
"github.com/RediSearch/redisearch-go/redisearch"
54
"reflect"
65
"testing"
76
)
@@ -24,7 +23,7 @@ func TestEscapeTextFileString(t *testing.T) {
2423
}
2524
for _, tt := range tests {
2625
t.Run(tt.name, func(t *testing.T) {
27-
if got := redisearch.EscapeTextFileString(tt.args.value); got != tt.want {
26+
if got := EscapeTextFileString(tt.args.value); got != tt.want {
2827
t.Errorf("EscapeTextFileString() = %v, want %v", got, tt.want)
2928
}
3029
})
@@ -55,7 +54,7 @@ func TestDocument_EstimateSize(t *testing.T) {
5554
}
5655
for _, tt := range tests {
5756
t.Run(tt.name, func(t *testing.T) {
58-
d := &redisearch.Document{
57+
d := &Document{
5958
Id: tt.fields.Id,
6059
Score: tt.fields.Score,
6160
Payload: tt.fields.Payload,
@@ -90,7 +89,7 @@ func TestDocument_SetPayload(t *testing.T) {
9089
}
9190
for _, tt := range tests {
9291
t.Run(tt.name, func(t *testing.T) {
93-
d := &redisearch.Document{
92+
d := &Document{
9493
Id: tt.fields.Id,
9594
Score: tt.fields.Score,
9695
Payload: tt.fields.Payload,

redisearch/pool.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,33 @@
11
package redisearch
22

33
import (
4+
"fmt"
5+
"github.com/gomodule/redigo/redis"
46
"math/rand"
57
"sync"
68
"time"
7-
"github.com/gomodule/redigo/redis"
89
)
910

1011
type ConnPool interface {
1112
Get() redis.Conn
13+
Close() error
1214
}
1315

1416
type SingleHostPool struct {
1517
*redis.Pool
1618
}
1719

1820
func NewSingleHostPool(host string) *SingleHostPool {
19-
ret := redis.NewPool(func() (redis.Conn, error) {
20-
// TODO: Add timeouts. and 2 separate pools for indexing and querying, with different timeouts
21+
pool := &redis.Pool{Dial: func() (redis.Conn, error) {
2122
return redis.Dial("tcp", host)
22-
}, maxConns)
23-
ret.TestOnBorrow = func(c redis.Conn, t time.Time) (err error) {
23+
}, MaxIdle: maxConns}
24+
pool.TestOnBorrow = func(c redis.Conn, t time.Time) (err error) {
2425
if time.Since(t) > time.Second {
2526
_, err = c.Do("PING")
2627
}
2728
return err
2829
}
29-
return &SingleHostPool{ret}
30+
return &SingleHostPool{pool}
3031
}
3132

3233
type MultiHostPool struct {
@@ -65,3 +66,20 @@ func (p *MultiHostPool) Get() redis.Conn {
6566
return pool.Get()
6667

6768
}
69+
70+
func (p *MultiHostPool) Close() (err error) {
71+
p.Lock()
72+
defer p.Unlock()
73+
for host, pool := range p.pools {
74+
poolErr := pool.Close()
75+
//preserve pool error if not nil but continue
76+
if poolErr != nil {
77+
if err == nil {
78+
err = fmt.Errorf("Error closing pool for host %s. Got %v.", host, poolErr)
79+
} else {
80+
err = fmt.Errorf("%v Error closing pool for host %s. Got %v.", err, host, poolErr)
81+
}
82+
}
83+
}
84+
return
85+
}

0 commit comments

Comments
 (0)