-
Notifications
You must be signed in to change notification settings - Fork 4
Fix relay failover mechanism #1391
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
ab10c8b
be36109
d468152
c090bb5
cfce3bc
c211110
8f7d0db
8f9f910
b0ec49c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,15 @@ type peerCfg struct { | |
enableEncryption bool | ||
encoder encoder.Encoder | ||
cacheFactory cacheFactory | ||
relayCooldown time.Duration | ||
} | ||
|
||
// WithRelayCooldown sets the cooldown duration for relay failover and retry logic. | ||
// If not set, defaults to 10 seconds. | ||
func WithRelayCooldown(d time.Duration) PeerOpt { | ||
return func(cfg *peerCfg) { | ||
cfg.relayCooldown = d | ||
} | ||
} | ||
|
||
type PeerOpt func(*peerCfg) | ||
|
@@ -106,18 +115,21 @@ func WithInMemoryExpiration(ttl uint64) PeerOpt { | |
} | ||
} | ||
|
||
// Peer exposes the functionality to talk directly to an rmb relay | ||
// Peer exposes the functionality to talk directly to an rmb relay. | ||
// | ||
// Relay failover and retry is managed by a thread-safe CooldownRelaySet, which tracks relay health and cooldowns. | ||
// The cooldown duration is configurable via WithRelayCooldown. See documentation for details. | ||
type Peer struct { | ||
source *types.Address | ||
signer substrate.Identity | ||
twinDB TwinDB | ||
privKey *secp256k1.PrivateKey | ||
reader Reader | ||
cons *WeightSlice[InnerConnection] | ||
handler Handler | ||
encoder encoder.Encoder | ||
relays []string | ||
} | ||
source *types.Address | ||
signer substrate.Identity | ||
twinDB TwinDB | ||
privKey *secp256k1.PrivateKey | ||
reader Reader | ||
relayset *CooldownRelaySet[*InnerConnection] // manages relay selection and cooldown | ||
handler Handler | ||
encoder encoder.Encoder | ||
relays []string | ||
} // See WithRelayCooldown for cooldown configuration. | ||
|
||
func generateSecureKey(identity substrate.Identity) (*secp256k1.PrivateKey, error) { | ||
keyPair, err := identity.KeyPair() | ||
|
@@ -189,6 +201,11 @@ func getIdentity(keytype string, mnemonics string) (substrate.Identity, error) { | |
// | ||
// Make sure the context passed to Call() does not outlive the directClient's context. | ||
// Call() will panic if called while the directClient's context is canceled. | ||
// NewPeer creates a new RMB peer client. It connects directly to the RMB-Relay, and tries to reconnect if the connection broke. | ||
// | ||
// You can close the connection by canceling the passed context. | ||
// | ||
// The relay failover and retry logic uses a cooldown-based approach. See WithRelayCooldown for configuration. | ||
func NewPeer( | ||
ctx context.Context, | ||
mnemonics string, | ||
|
@@ -269,16 +286,18 @@ func NewPeer( | |
} | ||
|
||
reader := make(chan []byte) | ||
weightCons := make([]WeightItem[InnerConnection], 0, len(conns)) | ||
for _, conn := range conns { | ||
relayPenalties := make([]RelayPenalty[*InnerConnection], 0, len(conns)) | ||
for i := range conns { | ||
conn := &conns[i] | ||
conn.Start(ctx, reader) | ||
weightCons = append(weightCons, WeightItem[InnerConnection]{Item: conn, Weight: 1}) | ||
relayPenalties = append(relayPenalties, RelayPenalty[*InnerConnection]{Relay: conn, LastErrorAt: 0}) | ||
} | ||
|
||
cons, err := NewWeightSlice(weightCons) | ||
if err != nil { | ||
return nil, err | ||
cooldown := cfg.relayCooldown | ||
if cooldown == 0 { | ||
cooldown = 10 * time.Second // default | ||
} | ||
relayset := &CooldownRelaySet[*InnerConnection]{Relays: relayPenalties, Cooldown: cooldown} | ||
|
||
var sessionP *string | ||
if cfg.session != "" { | ||
|
@@ -290,15 +309,15 @@ func NewPeer( | |
} | ||
|
||
cl := &Peer{ | ||
source: &source, | ||
signer: identity, | ||
twinDB: twinDB, | ||
privKey: privKey, | ||
reader: reader, | ||
cons: cons, | ||
handler: handler, | ||
encoder: cfg.encoder, | ||
relays: relayURLs, | ||
source: &source, | ||
signer: identity, | ||
twinDB: twinDB, | ||
privKey: privKey, | ||
reader: reader, | ||
relayset: relayset, | ||
handler: handler, | ||
encoder: cfg.encoder, | ||
relays: relayURLs, | ||
} | ||
|
||
go cl.process(ctx) | ||
|
@@ -312,6 +331,10 @@ func (p *Peer) Encoder() encoder.Encoder { | |
} | ||
|
||
func (d *Peer) handleIncoming(incoming *types.Envelope) error { | ||
if time.Now().Unix() > int64(incoming.Timestamp+incoming.Expiration) { | ||
return fmt.Errorf("received an expired envelope") | ||
} | ||
|
||
errResp := incoming.GetError() | ||
if incoming.Source == nil { | ||
// an envelope received that has NO source twin | ||
|
@@ -520,26 +543,28 @@ func (d *Peer) send(ctx context.Context, request *types.Envelope) error { | |
if err != nil { | ||
return err | ||
} | ||
|
||
var errs error | ||
|
||
for i := 0; i < len(d.cons.data); i++ { | ||
index, con := d.cons.Choose() | ||
err := con.send(ctx, bytes) | ||
if err != nil { | ||
errs = multierror.Append(errs, err) | ||
if errors.Is(err, errTimeout) && d.cons.data[index].Weight > 0 { | ||
d.cons.data[index].Weight-- | ||
} | ||
continue | ||
} | ||
set := d.relayset | ||
|
||
// Determine message deadline/expiry | ||
expireAt := time.Unix(int64(request.Timestamp+request.Expiration), 0) | ||
|
||
if d.cons.data[index].Weight < 100 { | ||
d.cons.data[index].Weight++ | ||
for time.Now().Before(expireAt) { | ||
items := set.Sorted(time.Now()) | ||
for i := range items { | ||
con := items[i].Relay | ||
err := con.send(ctx, bytes) | ||
if err != nil { | ||
errs = multierror.Append(errs, err) | ||
set.MarkFailure(con, time.Now()) | ||
continue | ||
} | ||
set.MarkSuccess(con) | ||
return nil | ||
} | ||
return nil | ||
time.Sleep(100 * time.Millisecond) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here too, I fear that affects rmb responses There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a very brief pause after exhausting all the relays to prevent a busy loop. If removed, and the connections fail immediately (e.g., the machine is not yet connected to the internet), we won’t leave any breathing space for the CPU to perform other tasks. |
||
} | ||
|
||
return errs | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
package peer | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/golang/mock/gomock" | ||
"github.com/stretchr/testify/require" | ||
"github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go/peer/types" | ||
) | ||
|
||
func TestPeer_HandleIncoming_ExpiredEnvelope(t *testing.T) { | ||
ctrl := gomock.NewController(t) | ||
defer ctrl.Finish() | ||
|
||
twinDB := NewMockTwinDB(ctrl) | ||
|
||
p := Peer{twinDB: twinDB} | ||
|
||
envelope := &types.Envelope{ | ||
Timestamp: uint64(time.Now().Add(-2 * time.Minute).Unix()), // 2 minutes ago | ||
Expiration: 60, // 60 seconds TTL | ||
} | ||
|
||
err := p.handleIncoming(envelope) | ||
require.Error(t, err) | ||
require.Equal(t, "received an expired envelope", err.Error()) | ||
} | ||
|
||
func TestPeer_HandleIncoming_NotExpiredEnvelope(t *testing.T) { | ||
ctrl := gomock.NewController(t) | ||
defer ctrl.Finish() | ||
|
||
twinDB := NewMockTwinDB(ctrl) | ||
// Expect a call to Get during signature verification | ||
twinDB.EXPECT().Get(uint32(1)).Return(Twin{}, nil).AnyTimes() | ||
|
||
p := Peer{twinDB: twinDB} | ||
|
||
envelope := &types.Envelope{ | ||
Source: &types.Address{Twin: 1}, // a source is needed to pass validation | ||
Timestamp: uint64(time.Now().Unix()), | ||
Expiration: 60, // not expired | ||
} | ||
|
||
// We don't care about the error here because it will fail on signature verification, | ||
// which happens AFTER the expiration check. So if we get that error, it means | ||
// the expiration check has passed. | ||
err := p.handleIncoming(envelope) | ||
if err != nil { | ||
require.NotEqual(t, "received an expired envelope", err.Error()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
package peer | ||
|
||
import ( | ||
"math/rand" | ||
"reflect" | ||
"sort" | ||
"sync/atomic" | ||
"time" | ||
) | ||
|
||
// RelayPenalty tracks relay connection and its penalty (last error timestamp). | ||
// Relay should be a pointer type for atomic safety. | ||
type RelayPenalty[T any] struct { | ||
Relay T | ||
LastErrorAt int64 // UnixNano timestamp of last error, 0 means healthy (must be accessed atomically) | ||
} | ||
|
||
// CooldownRelaySet manages a set of relays with cooldown-based penalty logic. | ||
// | ||
// This structure is used as the main relay manager in Peer for fair failover and retry. | ||
// It is thread-safe for penalty updates using atomic operations, but not for concurrent mutation of the relay set itself. | ||
// | ||
// T MUST be a pointer type (e.g., *InnerConnection). Pointer value comparison is used for relay matching. | ||
type CooldownRelaySet[T any] struct { | ||
Relays []RelayPenalty[T] // slice of relay+penalty state, must not be mutated concurrently | ||
Cooldown time.Duration // cooldown period for penalized relays | ||
} | ||
|
||
// Sorted returns relays sorted by penalty (lowest/oldest error first), shuffling among equals. | ||
func (s *CooldownRelaySet[T]) Sorted(now time.Time) []RelayPenalty[T] { | ||
type sortableRelay struct { | ||
RelayPenalty[T] | ||
effectiveError int64 | ||
} | ||
|
||
items := make([]sortableRelay, len(s.Relays)) | ||
for i, r := range s.Relays { | ||
items[i] = sortableRelay{ | ||
RelayPenalty: r, | ||
effectiveError: s.effectiveError(r, now), | ||
} | ||
} | ||
|
||
// Sort by effective error time (ascending). Stable sort preserves original | ||
// order for relays with the same penalty, which is fair. | ||
sort.SliceStable(items, func(i, j int) bool { | ||
return items[i].effectiveError < items[j].effectiveError | ||
}) | ||
|
||
// Shuffle among equals | ||
start := 0 | ||
for start < len(items) { | ||
end := start + 1 | ||
for end < len(items) && items[end].effectiveError == items[start].effectiveError { | ||
end++ | ||
} | ||
rand.Shuffle(end-start, func(i, j int) { | ||
items[start+i], items[start+j] = items[start+j], items[start+i] | ||
}) | ||
start = end | ||
} | ||
|
||
// Unwrap the sorted relays | ||
result := make([]RelayPenalty[T], len(items)) | ||
for i, item := range items { | ||
result[i] = item.RelayPenalty | ||
} | ||
|
||
return result | ||
} | ||
|
||
// MarkFailure updates the penalty for the given relay. | ||
// T must be a pointer type. Pointer value comparison is used. | ||
func (s *CooldownRelaySet[T]) MarkFailure(relay T, now time.Time) { | ||
for i := range s.Relays { | ||
if reflect.ValueOf(s.Relays[i].Relay).Pointer() == reflect.ValueOf(relay).Pointer() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not a fan, I believe this could be overly simplified but having that in penalized list and the keeping the healthy ones in the healthy list and shuffle over both. trying first from the healthy then from the penalized There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I simplified the code by using *InnerConnection directly, and removing reflection-based comparison to improve readability. The code should now be safer and more straightforward to understand, while functionality remains intact. The current implementation is simpler than maintaining two separate lists. By using atomic operations on the Instead of juggling relays between healthy and penalized lists, which would require careful synchronization using a mutex, we now have a single source of truth for each relay's state. The atomic operations ensure (lock-free) thread safety without the overhead of locks during normal operation, and we avoid constantly reallocating slices. The beauty of this approach is in its simplicity. We can mark a relay as failed or successful with a single atomic store operation, and the |
||
atomic.StoreInt64(&s.Relays[i].LastErrorAt, now.UnixNano()) | ||
return | ||
} | ||
} | ||
} | ||
|
||
// MarkSuccess resets the penalty for the given relay. | ||
// T must be a pointer type. Pointer value comparison is used. | ||
func (s *CooldownRelaySet[T]) MarkSuccess(relay T) { | ||
for i := range s.Relays { | ||
if reflect.ValueOf(s.Relays[i].Relay).Pointer() == reflect.ValueOf(relay).Pointer() { | ||
atomic.StoreInt64(&s.Relays[i].LastErrorAt, 0) | ||
return | ||
} | ||
} | ||
} | ||
|
||
// effectiveError returns zero if cooldown expired, otherwise returns LastErrorAt. | ||
func (s *CooldownRelaySet[T]) effectiveError(r RelayPenalty[T], now time.Time) int64 { | ||
lastErr := atomic.LoadInt64(&r.LastErrorAt) | ||
if lastErr == 0 { | ||
return 0 | ||
} | ||
if now.Sub(time.Unix(0, lastErr)) > s.Cooldown { | ||
return 0 | ||
} | ||
return lastErr | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think 10 seconds is much? no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No at all. The cool down will affect only the order in which relays would be tried out