Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rmb-sdk-go/peer/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (c *InnerConnection) send(ctx context.Context, data []byte) error {
case c.writer <- s:
case <-ctx.Done():
return ctx.Err()
case <-time.After(2 * time.Second):
case <-time.After(5 * time.Second):
return errTimeout
}

Expand Down
107 changes: 66 additions & 41 deletions rmb-sdk-go/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ type peerCfg struct {
enableEncryption bool
encoder encoder.Encoder
cacheFactory cacheFactory
relayCooldown time.Duration
}

// WithRelayCooldown sets the cooldown duration for relay failover and retry logic.
// If not set, defaults to 10 seconds.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 10 seconds is much? no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No at all. The cool down will affect only the order in which relays would be tried out

func WithRelayCooldown(d time.Duration) PeerOpt {
return func(cfg *peerCfg) {
cfg.relayCooldown = d
}
}

type PeerOpt func(*peerCfg)
Expand Down Expand Up @@ -106,18 +115,21 @@ func WithInMemoryExpiration(ttl uint64) PeerOpt {
}
}

// Peer exposes the functionality to talk directly to an rmb relay
// Peer exposes the functionality to talk directly to an rmb relay.
//
// Relay failover and retry is managed by a thread-safe CooldownRelaySet, which tracks relay health and cooldowns.
// The cooldown duration is configurable via WithRelayCooldown. See documentation for details.
type Peer struct {
source *types.Address
signer substrate.Identity
twinDB TwinDB
privKey *secp256k1.PrivateKey
reader Reader
cons *WeightSlice[InnerConnection]
handler Handler
encoder encoder.Encoder
relays []string
}
source *types.Address
signer substrate.Identity
twinDB TwinDB
privKey *secp256k1.PrivateKey
reader Reader
relayset *CooldownRelaySet[*InnerConnection] // manages relay selection and cooldown
handler Handler
encoder encoder.Encoder
relays []string
} // See WithRelayCooldown for cooldown configuration.

func generateSecureKey(identity substrate.Identity) (*secp256k1.PrivateKey, error) {
keyPair, err := identity.KeyPair()
Expand Down Expand Up @@ -189,6 +201,11 @@ func getIdentity(keytype string, mnemonics string) (substrate.Identity, error) {
//
// Make sure the context passed to Call() does not outlive the directClient's context.
// Call() will panic if called while the directClient's context is canceled.
// NewPeer creates a new RMB peer client. It connects directly to the RMB-Relay, and tries to reconnect if the connection broke.
//
// You can close the connection by canceling the passed context.
//
// The relay failover and retry logic uses a cooldown-based approach. See WithRelayCooldown for configuration.
func NewPeer(
ctx context.Context,
mnemonics string,
Expand Down Expand Up @@ -269,16 +286,18 @@ func NewPeer(
}

reader := make(chan []byte)
weightCons := make([]WeightItem[InnerConnection], 0, len(conns))
for _, conn := range conns {
relayPenalties := make([]RelayPenalty[*InnerConnection], 0, len(conns))
for i := range conns {
conn := &conns[i]
conn.Start(ctx, reader)
weightCons = append(weightCons, WeightItem[InnerConnection]{Item: conn, Weight: 1})
relayPenalties = append(relayPenalties, RelayPenalty[*InnerConnection]{Relay: conn, LastErrorAt: 0})
}

cons, err := NewWeightSlice(weightCons)
if err != nil {
return nil, err
cooldown := cfg.relayCooldown
if cooldown == 0 {
cooldown = 10 * time.Second // default
}
relayset := &CooldownRelaySet[*InnerConnection]{Relays: relayPenalties, Cooldown: cooldown}

var sessionP *string
if cfg.session != "" {
Expand All @@ -290,15 +309,15 @@ func NewPeer(
}

cl := &Peer{
source: &source,
signer: identity,
twinDB: twinDB,
privKey: privKey,
reader: reader,
cons: cons,
handler: handler,
encoder: cfg.encoder,
relays: relayURLs,
source: &source,
signer: identity,
twinDB: twinDB,
privKey: privKey,
reader: reader,
relayset: relayset,
handler: handler,
encoder: cfg.encoder,
relays: relayURLs,
}

go cl.process(ctx)
Expand All @@ -312,6 +331,10 @@ func (p *Peer) Encoder() encoder.Encoder {
}

func (d *Peer) handleIncoming(incoming *types.Envelope) error {
if time.Now().Unix() > int64(incoming.Timestamp+incoming.Expiration) {
return fmt.Errorf("received an expired envelope")
}

errResp := incoming.GetError()
if incoming.Source == nil {
// an envelope received that has NO source twin
Expand Down Expand Up @@ -520,26 +543,28 @@ func (d *Peer) send(ctx context.Context, request *types.Envelope) error {
if err != nil {
return err
}

var errs error

for i := 0; i < len(d.cons.data); i++ {
index, con := d.cons.Choose()
err := con.send(ctx, bytes)
if err != nil {
errs = multierror.Append(errs, err)
if errors.Is(err, errTimeout) && d.cons.data[index].Weight > 0 {
d.cons.data[index].Weight--
}
continue
}
set := d.relayset

// Determine message deadline/expiry
expireAt := time.Unix(int64(request.Timestamp+request.Expiration), 0)

if d.cons.data[index].Weight < 100 {
d.cons.data[index].Weight++
for time.Now().Before(expireAt) {
items := set.Sorted(time.Now())
for i := range items {
con := items[i].Relay
err := con.send(ctx, bytes)
if err != nil {
errs = multierror.Append(errs, err)
set.MarkFailure(con, time.Now())
continue
}
set.MarkSuccess(con)
return nil
}
return nil
time.Sleep(100 * time.Millisecond)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here too, I fear that affects rmb responses

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very brief pause after exhausting all the relays to prevent a busy loop. If removed, and the connections fail immediately (e.g., the machine is not yet connected to the internet), we won’t leave any breathing space for the CPU to perform other tasks.

}

return errs
}

Expand Down
53 changes: 53 additions & 0 deletions rmb-sdk-go/peer/peer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package peer

import (
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go/peer/types"
)

func TestPeer_HandleIncoming_ExpiredEnvelope(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

twinDB := NewMockTwinDB(ctrl)

p := Peer{twinDB: twinDB}

envelope := &types.Envelope{
Timestamp: uint64(time.Now().Add(-2 * time.Minute).Unix()), // 2 minutes ago
Expiration: 60, // 60 seconds TTL
}

err := p.handleIncoming(envelope)
require.Error(t, err)
require.Equal(t, "received an expired envelope", err.Error())
}

func TestPeer_HandleIncoming_NotExpiredEnvelope(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

twinDB := NewMockTwinDB(ctrl)
// Expect a call to Get during signature verification
twinDB.EXPECT().Get(uint32(1)).Return(Twin{}, nil).AnyTimes()

p := Peer{twinDB: twinDB}

envelope := &types.Envelope{
Source: &types.Address{Twin: 1}, // a source is needed to pass validation
Timestamp: uint64(time.Now().Unix()),
Expiration: 60, // not expired
}

// We don't care about the error here because it will fail on signature verification,
// which happens AFTER the expiration check. So if we get that error, it means
// the expiration check has passed.
err := p.handleIncoming(envelope)
if err != nil {
require.NotEqual(t, "received an expired envelope", err.Error())
}
}
104 changes: 104 additions & 0 deletions rmb-sdk-go/peer/relay_retry_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package peer

import (
"math/rand"
"reflect"
"sort"
"sync/atomic"
"time"
)

// RelayPenalty tracks relay connection and its penalty (last error timestamp).
// Relay should be a pointer type for atomic safety.
type RelayPenalty[T any] struct {
Relay T
LastErrorAt int64 // UnixNano timestamp of last error, 0 means healthy (must be accessed atomically)
}

// CooldownRelaySet manages a set of relays with cooldown-based penalty logic.
//
// This structure is used as the main relay manager in Peer for fair failover and retry.
// It is thread-safe for penalty updates using atomic operations, but not for concurrent mutation of the relay set itself.
//
// T MUST be a pointer type (e.g., *InnerConnection). Pointer value comparison is used for relay matching.
type CooldownRelaySet[T any] struct {
Relays []RelayPenalty[T] // slice of relay+penalty state, must not be mutated concurrently
Cooldown time.Duration // cooldown period for penalized relays
}

// Sorted returns relays sorted by penalty (lowest/oldest error first), shuffling among equals.
func (s *CooldownRelaySet[T]) Sorted(now time.Time) []RelayPenalty[T] {
type sortableRelay struct {
RelayPenalty[T]
effectiveError int64
}

items := make([]sortableRelay, len(s.Relays))
for i, r := range s.Relays {
items[i] = sortableRelay{
RelayPenalty: r,
effectiveError: s.effectiveError(r, now),
}
}

// Sort by effective error time (ascending). Stable sort preserves original
// order for relays with the same penalty, which is fair.
sort.SliceStable(items, func(i, j int) bool {
return items[i].effectiveError < items[j].effectiveError
})

// Shuffle among equals
start := 0
for start < len(items) {
end := start + 1
for end < len(items) && items[end].effectiveError == items[start].effectiveError {
end++
}
rand.Shuffle(end-start, func(i, j int) {
items[start+i], items[start+j] = items[start+j], items[start+i]
})
start = end
}

// Unwrap the sorted relays
result := make([]RelayPenalty[T], len(items))
for i, item := range items {
result[i] = item.RelayPenalty
}

return result
}

// MarkFailure updates the penalty for the given relay.
// T must be a pointer type. Pointer value comparison is used.
func (s *CooldownRelaySet[T]) MarkFailure(relay T, now time.Time) {
for i := range s.Relays {
if reflect.ValueOf(s.Relays[i].Relay).Pointer() == reflect.ValueOf(relay).Pointer() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan, I believe this could be overly simplified but having that in penalized list and the keeping the healthy ones in the healthy list and shuffle over both. trying first from the healthy then from the penalized

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I simplified the code by using *InnerConnection directly, and removing reflection-based comparison to improve readability. The code should now be safer and more straightforward to understand, while functionality remains intact.

The current implementation is simpler than maintaining two separate lists. By using atomic operations on the LastErrorAt field, we've simplified the code while making it more robust.

Instead of juggling relays between healthy and penalized lists, which would require careful synchronization using a mutex, we now have a single source of truth for each relay's state. The atomic operations ensure (lock-free) thread safety without the overhead of locks during normal operation, and we avoid constantly reallocating slices.

The beauty of this approach is in its simplicity. We can mark a relay as failed or successful with a single atomic store operation, and the Sorted() method efficiently organizes the relays based on their current effective state. IMO, this makes the code easier to reason about while being more performant and simpler than the two-list approach.

atomic.StoreInt64(&s.Relays[i].LastErrorAt, now.UnixNano())
return
}
}
}

// MarkSuccess resets the penalty for the given relay.
// T must be a pointer type. Pointer value comparison is used.
func (s *CooldownRelaySet[T]) MarkSuccess(relay T) {
for i := range s.Relays {
if reflect.ValueOf(s.Relays[i].Relay).Pointer() == reflect.ValueOf(relay).Pointer() {
atomic.StoreInt64(&s.Relays[i].LastErrorAt, 0)
return
}
}
}

// effectiveError returns zero if cooldown expired, otherwise returns LastErrorAt.
func (s *CooldownRelaySet[T]) effectiveError(r RelayPenalty[T], now time.Time) int64 {
lastErr := atomic.LoadInt64(&r.LastErrorAt)
if lastErr == 0 {
return 0
}
if now.Sub(time.Unix(0, lastErr)) > s.Cooldown {
return 0
}
return lastErr
}
Loading
Loading