Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 79 additions & 15 deletions pkg/proxy/conn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ type Tenant string
// EmptyTenant is an empty tenant.
var EmptyTenant Tenant = ""

// newPlaceholderTunnel creates a new placeholder tunnel object.
// Each placeholder must be a unique object so that tunnelSet can properly
// count multiple pending connections for the same CN server.
func newPlaceholderTunnel() *tunnel {
return &tunnel{}
}

// tunnelSet defines the tunnels map type. map is the container
// to contain all tunnels. It must be used within cnTunnels.
type tunnelSet map[*tunnel]struct{}
Expand Down Expand Up @@ -75,6 +82,7 @@ func newCNTunnels() cnTunnels {
}

// add adds a new tunnel to the CN server. This method needs lock in connManager.
// Note: nil is not allowed, but placeholderTunnel is allowed.
func (t cnTunnels) add(uuid string, tun *tunnel) {
if tun == nil {
return
Expand All @@ -85,7 +93,7 @@ func (t cnTunnels) add(uuid string, tun *tunnel) {
t[uuid].add(tun)
}

// add adds a new tunnel to the CN server. This method needs lock in connManager.
// del deletes a tunnel from the CN server. This method needs lock in connManager.
func (t cnTunnels) del(uuid string, tun *tunnel) {
if tun == nil {
return
Expand All @@ -99,6 +107,22 @@ func (t cnTunnels) del(uuid string, tun *tunnel) {
}
}

// delOnePlaceholder deletes one placeholder tunnel from the CN server.
// Placeholders are identified by checking if ctx is nil (real tunnels always have ctx set).
func (t cnTunnels) delOnePlaceholder(uuid string) {
tunnels, ok := t[uuid]
if !ok {
return
}
// Find and delete one placeholder tunnel (empty tunnel object with ctx == nil).
for tun := range tunnels {
if tun.ctx == nil {
tunnels.del(tun)
return
}
}
}

// count returns number of all tunnels. This method needs lock in connManager.
func (t cnTunnels) count() int {
var r int
Expand Down Expand Up @@ -157,43 +181,70 @@ func newConnManager() *connManager {

// selectOne select the most suitable CN server according the connection count
// on each CN server. The least count CN server is returned.
// This method atomically increments the connection count for the selected CN server
// by adding a placeholder tunnel, ensuring fair distribution during concurrent
// connection establishment.
func (m *connManager) selectOne(hash LabelHash, cns []*CNServer) *CNServer {
m.Lock()
defer m.Unlock()

if len(cns) == 0 {
return nil
}

ci, ok := m.conns[hash]
// There are no connections yet on all CN servers of this tenant.
// Select the first CN server and add a placeholder tunnel atomically.
if !ok {
selected := cns[0]
m.conns[hash] = newConnInfo(cns[0].reqLabel)
m.conns[hash].cnTunnels.add(selected.uuid, newPlaceholderTunnel())
return selected
}

var ret *CNServer
var minCount = math.MaxInt
for _, cn := range cns {
ci, ok := m.conns[hash]
// There are no connections yet on all CN servers of this tenant.
// Means that no CN server has been connected for this tenant.
// So return any of it.
if !ok {
return cn
}
tunnels, ok := ci.cnTunnels[cn.uuid]
// There are no connections on this CN server.
if !ok {
return cn
cnt := 0
if ok {
cnt = tunnels.count()
}

// Choose the CNServer that has the least connections on it.
if tunnels.count() < minCount {
if cnt < minCount {
ret = cn
minCount = tunnels.count()
minCount = cnt
}
}

// Atomically increment the connection count for the selected CN server
// by adding a placeholder tunnel. This will be replaced by the real
// tunnel in connect() if successful, or removed in selectOneFailed() if failed.
if ret != nil {
ci.cnTunnels.add(ret.uuid, newPlaceholderTunnel())
}

return ret
}

// connect adds a new connection to connection manager.
// This method replaces the placeholder tunnel with the real tunnel.
func (m *connManager) connect(cn *CNServer, t *tunnel) {
m.Lock()
defer m.Unlock()
_, ok := m.conns[cn.hash]
ci, ok := m.conns[cn.hash]
if !ok {
// This should not happen if selectOne was called first, but handle it gracefully.
m.conns[cn.hash] = newConnInfo(cn.reqLabel)
ci = m.conns[cn.hash]
ci.cnTunnels.add(cn.uuid, t)
} else {
// Remove one placeholder tunnel and add the real tunnel.
// The placeholder was added in selectOne to reserve the connection slot.
ci.cnTunnels.delOnePlaceholder(cn.uuid)
ci.cnTunnels.add(cn.uuid, t)
}
m.conns[cn.hash].cnTunnels.add(cn.uuid, t)
m.connIDServers[cn.connID] = cn

if _, ok := m.cnTunnels[cn.uuid]; !ok {
Expand All @@ -203,6 +254,19 @@ func (m *connManager) connect(cn *CNServer, t *tunnel) {
logutil.Infof("connect to CN server %s, the conn ID is %d", cn.uuid, cn.connID)
}

// selectOneFailed is called when a connection selected by selectOne fails to establish.
// This method removes the placeholder tunnel that was added in selectOne.
func (m *connManager) selectOneFailed(hash LabelHash, cnUUID string) {
m.Lock()
defer m.Unlock()
ci, ok := m.conns[hash]
if !ok {
return
}
// Remove one placeholder tunnel that was added in selectOne.
ci.cnTunnels.delOnePlaceholder(cnUUID)
}

// disconnect removes a connection from connection manager.
func (m *connManager) disconnect(cn *CNServer, t *tunnel) {
m.Lock()
Expand Down
Loading
Loading