From 307d77838651b5f133cdb245635e3a6131ceec80 Mon Sep 17 00:00:00 2001 From: Mikhail Elhimov Date: Fri, 8 Aug 2025 14:22:30 +0300 Subject: [PATCH 1/4] test: handle multiple instances in parallel --- test_helpers/pool_helper.go | 45 ++++++++++++++++++++++++------------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/test_helpers/pool_helper.go b/test_helpers/pool_helper.go index b67d05f0..335496ff 100644 --- a/test_helpers/pool_helper.go +++ b/test_helpers/pool_helper.go @@ -2,8 +2,10 @@ package test_helpers import ( "context" + "errors" "fmt" "reflect" + "sync" "time" "github.com/tarantool/go-tarantool/v2" @@ -179,16 +181,22 @@ func InsertOnInstances( return fmt.Errorf("fail to set roles for cluster: %s", err.Error()) } - for _, dialer := range dialers { - ctx, cancel := GetConnectContext() - err := InsertOnInstance(ctx, dialer, connOpts, space, tuple) - cancel() - if err != nil { - return err - } + ctx, cancel := GetConnectContext() + defer cancel() + + errs := make([]error, len(dialers)) + var wg sync.WaitGroup + wg.Add(len(dialers)) + for i, dialer := range dialers { + // Pass loop variable(s) to avoid its capturing by reference (not needed since Go 1.22). + go func(i int, dialer tarantool.Dialer) { + defer wg.Done() + errs[i] = InsertOnInstance(ctx, dialer, connOpts, space, tuple) + }(i, dialer) } + wg.Wait() - return nil + return errors.Join(errs...) } func SetInstanceRO(ctx context.Context, dialer tarantool.Dialer, connOpts tarantool.Opts, @@ -215,16 +223,23 @@ func SetClusterRO(dialers []tarantool.Dialer, connOpts tarantool.Opts, return fmt.Errorf("number of servers should be equal to number of roles") } + ctx, cancel := GetConnectContext() + defer cancel() + + // Apply roles in parallel. + errs := make([]error, len(dialers)) + var wg sync.WaitGroup + wg.Add(len(dialers)) for i, dialer := range dialers { - ctx, cancel := GetConnectContext() - err := SetInstanceRO(ctx, dialer, connOpts, roles[i]) - cancel() - if err != nil { - return err - } + // Pass loop variable(s) to avoid its capturing by reference (not needed since Go 1.22). + go func(i int, dialer tarantool.Dialer) { + defer wg.Done() + errs[i] = SetInstanceRO(ctx, dialer, connOpts, roles[i]) + }(i, dialer) } + wg.Wait() - return nil + return errors.Join(errs...) } func StartTarantoolInstances(instsOpts []StartOpts) ([]*TarantoolInstance, error) { From 909977d44f6b8de0dd31c61be20ce0869b3ba8ac Mon Sep 17 00:00:00 2001 From: Mikhail Elhimov Date: Fri, 8 Aug 2025 15:48:19 +0300 Subject: [PATCH 2/4] test: improve code readability General context approach is used in multi-instances functions -- context is passed as a first argument. --- pool/connection_pool_test.go | 237 +++++++++++++++----------- pool/example_test.go | 12 +- queue/example_connection_pool_test.go | 4 +- queue/queue_test.go | 4 +- test_helpers/pool_helper.go | 11 +- 5 files changed, 151 insertions(+), 117 deletions(-) diff --git a/pool/connection_pool_test.go b/pool/connection_pool_test.go index f3bf5f55..8120c613 100644 --- a/pool/connection_pool_test.go +++ b/pool/connection_pool_test.go @@ -1115,7 +1115,10 @@ func TestConnectionHandlerOpenUpdateClose(t *testing.T) { poolInstances := makeInstances(poolServers, connOpts) roles := []bool{false, true} - err := test_helpers.SetClusterRO(makeDialers(poolServers), connOpts, roles) + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + + err := test_helpers.SetClusterRO(ctx, makeDialers(poolServers), connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") h := &testHandler{} @@ -1123,8 +1126,6 @@ func TestConnectionHandlerOpenUpdateClose(t *testing.T) { CheckTimeout: 100 * time.Microsecond, ConnectionHandler: h, } - ctx, cancel := test_helpers.GetPoolConnectContext() - defer cancel() connPool, err := pool.ConnectWithOpts(ctx, poolInstances, poolOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1249,7 +1250,10 @@ func TestConnectionHandlerUpdateError(t *testing.T) { poolInstances := makeInstances(poolServers, connOpts) roles := []bool{false, false} - err := test_helpers.SetClusterRO(makeDialers(poolServers), connOpts, roles) + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + + err := test_helpers.SetClusterRO(ctx, makeDialers(poolServers), connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") h := &testUpdateErrorHandler{} @@ -1257,8 +1261,6 @@ func TestConnectionHandlerUpdateError(t *testing.T) { CheckTimeout: 100 * time.Microsecond, ConnectionHandler: h, } - ctx, cancel := test_helpers.GetPoolConnectContext() - defer cancel() connPool, err := pool.ConnectWithOpts(ctx, poolInstances, poolOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1324,7 +1326,10 @@ func TestConnectionHandlerDeactivated_on_remove(t *testing.T) { poolInstances := makeInstances(poolServers, connOpts) roles := []bool{false, false} - err := test_helpers.SetClusterRO(makeDialers(poolServers), connOpts, roles) + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + + err := test_helpers.SetClusterRO(ctx, makeDialers(poolServers), connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") h := &testDeactivatedErrorHandler{} @@ -1332,8 +1337,6 @@ func TestConnectionHandlerDeactivated_on_remove(t *testing.T) { CheckTimeout: 100 * time.Microsecond, ConnectionHandler: h, } - ctx, cancel := test_helpers.GetPoolConnectContext() - defer cancel() connPool, err := pool.ConnectWithOpts(ctx, poolInstances, poolOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1413,11 +1416,12 @@ func TestRequestOnClosed(t *testing.T) { func TestCall(t *testing.T) { roles := []bool{false, true, false, false, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1472,11 +1476,12 @@ func TestCall(t *testing.T) { func TestCall16(t *testing.T) { roles := []bool{false, true, false, false, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1531,11 +1536,12 @@ func TestCall16(t *testing.T) { func TestCall17(t *testing.T) { roles := []bool{false, true, false, false, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1590,11 +1596,12 @@ func TestCall17(t *testing.T) { func TestEval(t *testing.T) { roles := []bool{false, true, false, false, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1670,11 +1677,12 @@ func TestExecute(t *testing.T) { roles := []bool{false, true, false, false, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1728,11 +1736,12 @@ func TestRoundRobinStrategy(t *testing.T) { serversNumber := len(servers) - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1807,11 +1816,12 @@ func TestRoundRobinStrategy_NoReplica(t *testing.T) { servers[4]: true, } - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1880,11 +1890,12 @@ func TestRoundRobinStrategy_NoMaster(t *testing.T) { servers[4]: true, } - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1965,11 +1976,12 @@ func TestUpdateInstancesRoles(t *testing.T) { serversNumber := len(servers) - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2044,7 +2056,9 @@ func TestUpdateInstancesRoles(t *testing.T) { servers[3]: true, } - err = test_helpers.SetClusterRO(dialers, connOpts, roles) + ctxSetRoles, cancelSetRoles := test_helpers.GetPoolConnectContext() + err = test_helpers.SetClusterRO(ctxSetRoles, dialers, connOpts, roles) + cancelSetRoles() require.Nilf(t, err, "fail to set roles for cluster") // ANY @@ -2111,11 +2125,12 @@ func TestUpdateInstancesRoles(t *testing.T) { func TestInsert(t *testing.T) { roles := []bool{true, true, false, true, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2210,11 +2225,12 @@ func TestInsert(t *testing.T) { func TestDelete(t *testing.T) { roles := []bool{true, true, false, true, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2274,11 +2290,12 @@ func TestDelete(t *testing.T) { func TestUpsert(t *testing.T) { roles := []bool{true, true, false, true, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2346,11 +2363,12 @@ func TestUpsert(t *testing.T) { func TestUpdate(t *testing.T) { roles := []bool{true, true, false, true, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2435,11 +2453,12 @@ func TestUpdate(t *testing.T) { func TestReplace(t *testing.T) { roles := []bool{true, true, false, true, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2520,11 +2539,12 @@ func TestReplace(t *testing.T) { func TestSelect(t *testing.T) { roles := []bool{true, true, false, true, false} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2543,13 +2563,13 @@ func TestSelect(t *testing.T) { rwKey := []interface{}{"rw_select_key"} anyKey := []interface{}{"any_select_key"} - err = test_helpers.InsertOnInstances(makeDialers(roServers), connOpts, spaceNo, roTpl) + err = test_helpers.InsertOnInstances(ctx, makeDialers(roServers), connOpts, spaceNo, roTpl) require.Nil(t, err) - err = test_helpers.InsertOnInstances(makeDialers(rwServers), connOpts, spaceNo, rwTpl) + err = test_helpers.InsertOnInstances(ctx, makeDialers(rwServers), connOpts, spaceNo, rwTpl) require.Nil(t, err) - err = test_helpers.InsertOnInstances(makeDialers(allServers), connOpts, spaceNo, anyTpl) + err = test_helpers.InsertOnInstances(ctx, makeDialers(allServers), connOpts, spaceNo, anyTpl) require.Nil(t, err) //default: ANY @@ -2642,11 +2662,12 @@ func TestSelect(t *testing.T) { func TestPing(t *testing.T) { roles := []bool{true, true, false, true, false} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2681,11 +2702,12 @@ func TestPing(t *testing.T) { func TestDo(t *testing.T) { roles := []bool{true, true, false, true, false} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2717,11 +2739,12 @@ func TestDo(t *testing.T) { func TestDo_concurrent(t *testing.T) { roles := []bool{true, true, false, true, false} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2766,12 +2789,12 @@ func TestDoInstance(t *testing.T) { func TestDoInstance_not_found(t *testing.T) { roles := []bool{true, true, false, true, false} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, []pool.Instance{}) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2820,11 +2843,12 @@ func TestNewPrepared(t *testing.T) { roles := []bool{true, true, false, true, false} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2892,11 +2916,12 @@ func TestDoWithStrangerConn(t *testing.T) { roles := []bool{true, true, false, true, false} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2922,11 +2947,12 @@ func TestStream_Commit(t *testing.T) { roles := []bool{true, true, false, true, true} - err = test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err = test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -3013,11 +3039,12 @@ func TestStream_Rollback(t *testing.T) { roles := []bool{true, true, false, true, true} - err = test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err = test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -3103,11 +3130,12 @@ func TestStream_TxnIsolationLevel(t *testing.T) { roles := []bool{true, true, false, true, true} - err = test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err = test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -3214,11 +3242,12 @@ func TestConnectionPool_NewWatcher_modes(t *testing.T) { roles := []bool{true, false, false, true, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -3291,14 +3320,15 @@ func TestConnectionPool_NewWatcher_update(t *testing.T) { const initCnt = 2 roles := []bool{true, false, false, true, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") poolOpts := pool.Opts{ CheckTimeout: 500 * time.Millisecond, } - ctx, cancel := test_helpers.GetPoolConnectContext() - defer cancel() pool, err := pool.ConnectWithOpts(ctx, instances, poolOpts) require.Nilf(t, err, "failed to connect") @@ -3338,7 +3368,9 @@ func TestConnectionPool_NewWatcher_update(t *testing.T) { for i, role := range roles { roles[i] = !role } - err = test_helpers.SetClusterRO(dialers, connOpts, roles) + ctxSetRoles, cancelSetRoles := test_helpers.GetPoolConnectContext() + err = test_helpers.SetClusterRO(ctxSetRoles, dialers, connOpts, roles) + cancelSetRoles() require.Nilf(t, err, "fail to set roles for cluster") // Wait for all updated events. @@ -3376,11 +3408,12 @@ func TestWatcher_Unregister(t *testing.T) { const expectedCnt = 2 roles := []bool{true, false, false, true, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + pool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, pool, "conn is nil after Connect") @@ -3433,11 +3466,12 @@ func TestConnectionPool_NewWatcher_concurrent(t *testing.T) { roles := []bool{true, false, false, true, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -3471,11 +3505,12 @@ func TestWatcher_Unregister_concurrent(t *testing.T) { roles := []bool{true, false, false, true, true} - err := test_helpers.SetClusterRO(dialers, connOpts, roles) - require.Nilf(t, err, "fail to set roles for cluster") - ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() + + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") diff --git a/pool/example_test.go b/pool/example_test.go index a4d3d4ba..dce8bb1a 100644 --- a/pool/example_test.go +++ b/pool/example_test.go @@ -23,12 +23,12 @@ var testRoles = []bool{true, true, false, true, true} func examplePool(roles []bool, connOpts tarantool.Opts) (*pool.ConnectionPool, error) { - err := test_helpers.SetClusterRO(dialers, connOpts, roles) + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + err := test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) if err != nil { return nil, fmt.Errorf("ConnectionPool is not established") } - ctx, cancel := test_helpers.GetPoolConnectContext() - defer cancel() connPool, err := pool.Connect(ctx, instances) if err != nil || connPool == nil { return nil, fmt.Errorf("ConnectionPool is not established") @@ -55,12 +55,12 @@ func exampleFeaturesPool(roles []bool, connOpts tarantool.Opts, }) poolDialers = append(poolDialers, dialer) } - err := test_helpers.SetClusterRO(poolDialers, connOpts, roles) + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + err := test_helpers.SetClusterRO(ctx, poolDialers, connOpts, roles) if err != nil { return nil, fmt.Errorf("ConnectionPool is not established") } - ctx, cancel := test_helpers.GetPoolConnectContext() - defer cancel() connPool, err := pool.Connect(ctx, poolInstances) if err != nil || connPool == nil { return nil, fmt.Errorf("ConnectionPool is not established") diff --git a/queue/example_connection_pool_test.go b/queue/example_connection_pool_test.go index 355a491e..a126e13a 100644 --- a/queue/example_connection_pool_test.go +++ b/queue/example_connection_pool_test.go @@ -212,7 +212,9 @@ func Example_connectionPool() { // Switch a master instance in the pool. roles := []bool{true, false} for { - err := test_helpers.SetClusterRO(poolDialers, connOpts, roles) + ctx, cancel := test_helpers.GetPoolConnectContext() + err := test_helpers.SetClusterRO(ctx, poolDialers, connOpts, roles) + cancel() if err == nil { break } diff --git a/queue/queue_test.go b/queue/queue_test.go index e43c4711..840c18b4 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -954,7 +954,9 @@ func runTestMain(m *testing.M) int { }) } - err = test_helpers.SetClusterRO(dialers, connOpts, roles) + ctx, cancel := test_helpers.GetPoolConnectContext() + err = test_helpers.SetClusterRO(ctx, dialers, connOpts, roles) + cancel() if err == nil { break } diff --git a/test_helpers/pool_helper.go b/test_helpers/pool_helper.go index 335496ff..4386596c 100644 --- a/test_helpers/pool_helper.go +++ b/test_helpers/pool_helper.go @@ -166,6 +166,7 @@ func InsertOnInstance(ctx context.Context, dialer tarantool.Dialer, connOpts tar } func InsertOnInstances( + ctx context.Context, dialers []tarantool.Dialer, connOpts tarantool.Opts, space interface{}, @@ -176,14 +177,11 @@ func InsertOnInstances( roles[i] = false } - err := SetClusterRO(dialers, connOpts, roles) + err := SetClusterRO(ctx, dialers, connOpts, roles) if err != nil { return fmt.Errorf("fail to set roles for cluster: %s", err.Error()) } - ctx, cancel := GetConnectContext() - defer cancel() - errs := make([]error, len(dialers)) var wg sync.WaitGroup wg.Add(len(dialers)) @@ -217,15 +215,12 @@ func SetInstanceRO(ctx context.Context, dialer tarantool.Dialer, connOpts tarant return nil } -func SetClusterRO(dialers []tarantool.Dialer, connOpts tarantool.Opts, +func SetClusterRO(ctx context.Context, dialers []tarantool.Dialer, connOpts tarantool.Opts, roles []bool) error { if len(dialers) != len(roles) { return fmt.Errorf("number of servers should be equal to number of roles") } - ctx, cancel := GetConnectContext() - defer cancel() - // Apply roles in parallel. errs := make([]error, len(dialers)) var wg sync.WaitGroup From e3febce249b1c3fbdfcba7f57e8680679446c4ac Mon Sep 17 00:00:00 2001 From: Mikhail Elhimov Date: Thu, 7 Aug 2025 21:31:21 +0300 Subject: [PATCH 3/4] pool: use safe type assertion --- pool/connection_pool.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pool/connection_pool.go b/pool/connection_pool.go index a47ec19a..e62cb2b3 100644 --- a/pool/connection_pool.go +++ b/pool/connection_pool.go @@ -1085,7 +1085,12 @@ func (p *ConnectionPool) getConnectionRole(conn *tarantool.Connection) (Role, er return UnknownRole, ErrIncorrectResponse } - instanceStatus, ok := data[0].(map[interface{}]interface{})["status"] + respFields, ok := data[0].(map[interface{}]interface{}) + if !ok { + return UnknownRole, ErrIncorrectResponse + } + + instanceStatus, ok := respFields["status"] if !ok { return UnknownRole, ErrIncorrectResponse } @@ -1093,7 +1098,7 @@ func (p *ConnectionPool) getConnectionRole(conn *tarantool.Connection) (Role, er return UnknownRole, ErrIncorrectStatus } - replicaRole, ok := data[0].(map[interface{}]interface{})[roFieldName] + replicaRole, ok := respFields[roFieldName] if !ok { return UnknownRole, ErrIncorrectResponse } From f859fca78bfb41753fa6bcda14627b3390ea0b03 Mon Sep 17 00:00:00 2001 From: Mikhail Elhimov Date: Wed, 30 Jul 2025 17:37:05 +0300 Subject: [PATCH 4/4] test: fix flaky test Helper method that performs initial assigning of master/replica roles and is widely used in ConnectionPool tests was adjusted to wait for the roles to be applied successfully. Prior to this patch it doesn't, so sometimes subsequent test code might work unexpectedly (the problem was caught with TestConnectionHandlerOpenUpdateClose) Closes #452 --- test_helpers/pool_helper.go | 44 +++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/test_helpers/pool_helper.go b/test_helpers/pool_helper.go index 4386596c..599e5659 100644 --- a/test_helpers/pool_helper.go +++ b/test_helpers/pool_helper.go @@ -212,6 +212,50 @@ func SetInstanceRO(ctx context.Context, dialer tarantool.Dialer, connOpts tarant return err } + checkRole := func(conn *tarantool.Connection, isReplica bool) string { + data, err := conn.Do(tarantool.NewCallRequest("box.info")).Get() + switch { + case err != nil: + return fmt.Sprintf("failed to get box.info: %s", err) + case len(data) < 1: + return "box.info is empty" + } + + boxInfo, ok := data[0].(map[interface{}]interface{}) + if !ok { + return "unexpected type in box.info response" + } + + status, statusFound := boxInfo["status"] + readonly, readonlyFound := boxInfo["ro"] + switch { + case !statusFound: + return "box.info.status is missing" + case status != "running": + return fmt.Sprintf("box.info.status='%s' (waiting for 'running')", status) + case !readonlyFound: + return "box.info.ro is missing" + case readonly != isReplica: + return fmt.Sprintf("box.info.ro='%v' (waiting for '%v')", readonly, isReplica) + default: + return "" + } + } + + problem := "not checked yet" + + // Wait for the role to be applied. + for len(problem) != 0 { + select { + case <-time.After(10 * time.Millisecond): + case <-ctx.Done(): + return fmt.Errorf("%w: failed to apply role, the last problem: %s", + ctx.Err(), problem) + } + + problem = checkRole(conn, isReplica) + } + return nil }