Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ab10c8b
refactor: replace weighted relay selection with cooldown-based retry …
sameh-farouk Jul 23, 2025
be36109
fix: use pointer to InnerConnection in relay set to prevent copying
sameh-farouk Jul 23, 2025
d468152
feat: add envelope expiration check with unit tests
sameh-farouk Jul 23, 2025
c090bb5
fix: use request timestamp and expiration for sending deadline inste…
sameh-farouk Jul 23, 2025
cfce3bc
fix: increase connection timeout from 2 to 5 seconds
sameh-farouk Jul 23, 2025
c211110
style: fix formatting and remove unused imports across peer package f…
sameh-farouk Jul 23, 2025
8f7d0db
refactor: remove reflection usage from CooldownRelaySet for better re…
sameh-farouk Aug 3, 2025
c810916
feat: implement TTL handling and expiration checks for RMB messages
sameh-farouk Aug 3, 2025
0f869b3
fix: refactor relay URLs validation
sameh-farouk Aug 10, 2025
8f9f910
Merge remote-tracking branch 'origin/development' into development-fi…
sameh-farouk Aug 10, 2025
6cfd642
normalize relay URL handling and add tests
sameh-farouk Aug 11, 2025
b0ec49c
feat: improve RMB peer connection handling with atomic state tracking
sameh-farouk Aug 12, 2025
8abb85b
feat: implement clean shutdown, relay connection error handling and l…
sameh-farouk Aug 12, 2025
a6c2fc8
make peer close method is intentionally unexported
sameh-farouk Aug 12, 2025
a4441f6
refine peer shutdown model
sameh-farouk Aug 15, 2025
79b97cf
docs: minor md format
sameh-farouk Aug 15, 2025
fe5da5b
peer(connection): backpressure + observability improvements
sameh-farouk Aug 14, 2025
f02c9e1
format: fix gofmt
sameh-farouk Aug 15, 2025
1d9bb25
format: fix gofmt
sameh-farouk Aug 15, 2025
463cfd1
fix unused noop observer
sameh-farouk Aug 20, 2025
1e083d2
Merge branch 'handle-back-pressure' into test-peer-release-20250820-1…
sameh-farouk Aug 20, 2025
bc8efdb
Merge remote-tracking branch 'origin/development-fix-issue-1390' into…
sameh-farouk Aug 20, 2025
4b23105
Merge branch 'development-refactor-relay-urls-validation' into test-p…
sameh-farouk Aug 20, 2025
418643d
fix: non-adjacent duplicate hosts won’t be removed
sameh-farouk Aug 20, 2025
56a0ba1
test: Add non-adjacent duplicates test
sameh-farouk Aug 20, 2025
b0364f3
fix: Handle clock skew when computing message age; avoid unsigned und…
sameh-farouk Aug 20, 2025
bdc2559
feat: Request Tagging and more clear errors distinguish twin-origin v…
sameh-farouk Aug 31, 2025
56c49e8
fmt: apply gofmt
sameh-farouk Sep 1, 2025
fb76b76
fix: improve websocket connection health monitoring with read deadlines
sameh-farouk Sep 18, 2025
3c32c68
refactor: extract connection constants and minor readability refactor
sameh-farouk Sep 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rmb-sdk-go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ require (
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/vedhavyas/go-subkey v1.0.3 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/crypto v0.37.0 // indirect
golang.org/x/sys v0.32.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
Expand Down
2 changes: 2 additions & 0 deletions rmb-sdk-go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ github.com/vedhavyas/go-subkey v1.0.3/go.mod h1:CloUaFQSSTdWnINfBRFjVMkWXZANW+nd
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down
80 changes: 63 additions & 17 deletions rmb-sdk-go/peer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,26 @@ Please check the [examples](examples/) directory

### Peer initialization

```
peer, err := peer.NewPeer(
```go
ctx, cancel := context.WithCancel(context.Background())

p, err := peer.NewPeer(
ctx,
mnemonics,
subManager,
relayCallback,
peer.WithRelay("wss://relay.dev.grid.tf"),
peer.WithSession("test-client"),
)
)
if err != nil {
// handle error
}

// ... use p ...

// When done, initiate shutdown then wait for clean exit:
cancel()
p.Wait()
```

1- After creating a peer like this at first it will try to get the identity from the provided `mnemonics`
Expand Down Expand Up @@ -74,31 +85,66 @@ peer, err := peer.NewPeer(
- To reply for requests you will need the following
1- Your peer needs to create `Router`

```
router := peer.NewRouter()
```go
router := peer.NewRouter()
```

2- Then you need to create a Route for example if you are providing a calculator service

```
```go
app := router.SubRoute("calculator")
```

3- Then you need to register your handlers for this `subRoute` like the following

```
```go
app.WithHandler("sub", func(ctx context.Context, payload []byte) (interface{}, error) {
var numbers []float64
var numbers []float64

if err := json.Unmarshal(payload, &numbers); err != nil {
return nil, fmt.Errorf("failed to load request payload was expecting list of float: %w", err)
}
if err := json.Unmarshal(payload, &numbers); err != nil {
return nil, fmt.Errorf("failed to load request payload was expecting list of float: %w", err)
}

var result float64
for _, v := range numbers {
result -= v
}
var result float64
for _, v := range numbers {
result -= v
}

return result, nil
})
return result, nil
})
```

### Handler expectations and recommended patterns

- __Handler callback semantics__
- The `Peer` calls the configured handler synchronously from its processing loop: `handler(ctx, peer, env, err)`.
- If you do heavy or blocking work inside your handler, you should offload it to a goroutine or a bounded worker pool to avoid stalling the peer loop.

- __Server-side: use `Router.Serve`__
- `Router.Serve` is designed for servers. It immediately spawns a goroutine per incoming request and runs middlewares/handlers there, then replies via `peer.SendResponse(...)`.
- This decouples heavy handler work from the peer loop. Register handlers with `router.SubRoute(...).WithHandler(...)`.

- __Client-side: use `RpcClient`__
- `RpcClient` wraps a `Peer` and correlates responses to callers via `uid`. Its internal handler is fast and non-blocking.
- Prefer `RpcClient.Call(ctx, twin, fn, data, &result)` over wiring your own response handler. Always pass a context with timeout/deadline.

- __Concurrency and ordering__
- With `Router.Serve`, each request is processed concurrently in its own goroutine. Requests to the same or different commands run concurrently; there are no ordering guarantees across requests.
- `RpcClient` handles responses without blocking the peer loop; each call blocks only the caller goroutine until its response arrives or `ctx` cancels.

- __Backpressure__
- Sending a response uses `peer.SendResponse(...)` which ultimately writes to a relay connection. Under sustained load, the write path applies backpressure; only the handler goroutine sending that response will block. Other requests continue.
- The peer employs bounded channel buffering on ingress and per-connection IO to improve burst tolerance while preserving backpressure.

### Shutdown

- Cancel the parent context you passed to `NewPeer(...)` (or `NewRpcClient(...)`) to request shutdown.
- Then call `p.Wait()` (or `rpc.Wait()`) to block until all goroutines have exited (including connection workers).

### Quick reference

- __Server__
- Build routes with `router := peer.NewRouter()` and `router.SubRoute(...).WithHandler(...)`.

- __Client__
- Use `rpc, _ := peer.NewRpcClient(ctx, mnemonics, subManager, opts...)` and call `rpc.Call(...)` with a timeout.
Loading
Loading