Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ jobs:
# - rmb-sdk-go
- user-contracts-mon
- tfrobot
- messenger

steps:
- name: Checkout the repo
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ jobs:
- rmb-sdk-go
- user-contracts-mon
- tfrobot
- messenger
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ jobs:
- rmb-sdk-go
- user-contracts-mon
- tfrobot
- messenger
12 changes: 11 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
DIRS := "activation-service" "farmerbot" "grid-cli" "grid-client" "grid-proxy" "gridify" "monitoring-bot" "rmb-sdk-go" "user-contracts-mon" "tfrobot" "node-registrar"
DIRS := activation-service \
farmerbot \
grid-cli \
grid-client \
grid-proxy \
gridify \
monitoring-bot \
rmb-sdk-go \
user-contracts-mon \
tfrobot \
messenger

mainnet-release:
cd grid-client && go get github.com/threefoldtech/tfchain/clients/tfchain-client-go@5d6a2dd
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ This repo contains the go clients for Threefold grid.
- [user contracts mon](./user-contracts-mon/README.md)
- [activation service](./activation-service/README.md)
- [farmerbot](./farmerbot/README.md)
- [messenger](./messenger/README.md)

## Release

Expand Down
27 changes: 0 additions & 27 deletions \

This file was deleted.

1 change: 1 addition & 0 deletions go.work
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use (
./grid-client
./grid-proxy
./gridify
./messenger
./monitoring-bot
./rmb-sdk-go
./tfrobot
Expand Down
51 changes: 29 additions & 22 deletions go.work.sum

Large diffs are not rendered by default.

158 changes: 158 additions & 0 deletions messenger/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# Messenger Package

The Messenger package provides a Go SDK for building distributed messaging applications on top of the Mycelium network infrastructure. It offers a topic-based protocol registration system that enables developers to create custom server/client implementations with optional blockchain identity integration.

## Overview

The Messenger package serves as a high-level abstraction over the Mycelium messaging infrastructure, providing:

- **Topic-based message routing**: Register handlers for specific message topics
- **Bidirectional communication**: Send messages and receive replies
- **Optional blockchain identity**: Integrate with ThreeFold Chain for identity management
- **JSON-RPC support**: Built-in JSON-RPC server/client implementation

## Mycelium Infrastructure

The Mycelium daemon provides distinct communication methods:

1. HTTP REST server `:8989`
2. RPC server `:9090`
3. CLI `mycelium message` calling the reset server

For more info check [mycelium docs](https://github.com/threefoldtech/mycelium/tree/master/docs)


### Usage Patterns

- **Sending Messages**: Uses CLI command `mycelium message send <destination> <payload> [--topic <topic>] [--wait] [--timeout <seconds>]`
- **Receiving Messages**: Uses CLI command `mycelium message receive` in a polling loop
- **Sending Replies**: Uses HTTP POST to `/api/v1/messages/reply/{messageId}`
- **Getting Identity**: Uses HTTP GET to `/api/v1/admin`

## Messenger Core Component

The Messenger serves as the main orchestration component with the following key features:

### Topic-Based Protocol Registration

The Messenger implements a topic-based routing system where different message handlers can be registered for specific topics:

```go
// Register a handler for a specific topic
messenger.RegisterHandler("my-topic", func(ctx context.Context, message *Message) ([]byte, error) {
// Handle the message
return response, nil
})
```

### Message Structure

```go
type Message struct {
ID string `json:"id,omitempty"` // Unique message identifier
Topic string `json:"topic,omitempty"` // Message topic for routing
SrcIP string `json:"srcIp,omitempty"` // Source IP address
SrcPK string `json:"srcPk,omitempty"` // Source public key
DstIP string `json:"dstIp,omitempty"` // Destination IP address
DstPK string `json:"dstPk,omitempty"` // Destination public key
Payload string `json:"payload,omitempty"` // Message payload (raw string)
}
```

### Core Operations

- **Send Message**: Send a message to a destination with optional reply waiting
- **Register Handler**: Register topic-specific message handlers
- **Start/Stop Receiver**: Control the message listening loop
- **Send Reply**: Reply to a received message

## Chain Identity Management

The package functions independently without chain identity integration by default. However, it offers optional blockchain identity features:

### Enabling Chain Identity

To enable chain identity management, use the `WithEnableTwinIdentity(true)` configuration option:

```go
messenger, err := messenger.NewMessenger(
messenger.WithSubstrateManager(manager),
messenger.WithMnemonic(mnemonic),
messenger.WithEnableTwinIdentity(true), // Enable chain identity
)
```

### Required Configuration

When chain identity is enabled, the following are required:
- **Substrate Manager**: Connection to ThreeFold Chain
- **Identity or Mnemonic**: Either a substrate identity or mnemonic phrase

### Identity Lifecycle

1. **Startup**: Automatic identity updates on chain during messenger initialization
- Retrieves Mycelium node information via HTTP API
- Updates the MyceliumTwin mapping on the blockchain

2. **Message Processing**: Identity retrieval and context storage
- For each incoming message, retrieves the sender's twin ID from the blockchain
- Stores twin ID in the message context for handler use
- Accessible via `TwinIdContextKey` context key

### MyceliumTwin Storage Mapping

The chain identity feature leverages the MyceliumTwin storage mapping on the ThreeFold blockchain, which:
- Maps Mycelium public keys to Twin IDs
- Enables identity verification and authorization
- Provides a bridge between Mycelium network and blockchain identity

## JSON-RPC Implementation

The package includes a complete JSON-RPC server/client implementation registered under the 'rpc' topic key:

### JSON-RPC Server

```go
server := messenger.NewJSONRPCServer(msgr)
server.RegisterHandler("calculator.add", func(ctx context.Context, params json.RawMessage) (interface{}, error) {
// Handle RPC method
return result, nil
})
server.Start(ctx)
```

### JSON-RPC Client

```go
client := messenger.NewJSONRPCClient(msgr)
var result float64
err := client.Call(ctx, destination, "calculator.add", []float64{10, 20}, &result)
```
## Configuration

### Configuration Options

The Messenger supports various configuration options through functional options:

```go
type MessengerOpt func(*Messenger)

// Available options:
messenger.WithMnemonic(mnemonic) // Set mnemonic phrase
messenger.WithIdentity(identity) // Set substrate identity directly
messenger.WithEnableTwinIdentity(true) // Enable chain identity management
messenger.WithSubstrateManager(manager) // Set substrate manager
messenger.WithBinaryPath("/path/to/mycelium") // Set custom mycelium binary path
messenger.WithAPIAddress("http://localhost:8989") // Set custom API address
```

### Default Values

```go
const (
DefaultMessengerBinary = "mycelium" // Mycelium binary path
DefaultAPIAddress = "http://127.0.0.1:8989" // Mycelium HTTP API address
DefaultTimeout = 60 // Default timeout in seconds
DefaultRetryListenerInterval = 100 * time.Millisecond // Retry interval for message listener
)
```
51 changes: 51 additions & 0 deletions messenger/examples/jsonrpc/client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import (
"context"
"fmt"
"os"
"time"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
substrate "github.com/threefoldtech/tfchain/clients/tfchain-client-go"
"github.com/threefoldtech/tfgrid-sdk-go/messenger"
)

const (
chainUrl = "ws://192.168.1.10:9944"

// destination is mycelium pk or ip
destination = "22b45ca2c6c40650fa4c739942a7c863deeb4a88a6a2cb38b8c9b273f4ad7b0c"
)

func main() {
log.Logger = zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: "15:04"}).With().Logger()
mnemonic := os.Getenv("MNEMONIC")

man := substrate.NewManager(chainUrl)

msgr, err := messenger.NewMessenger(
messenger.WithSubstrateManager(man),
messenger.WithMnemonic(mnemonic),
messenger.WithEnableTwinIdentity(true),
)
if err != nil {
fmt.Printf("Failed to create Mycelium client: %v\n", err)
os.Exit(1)
}
defer msgr.Close()

rpcClient := messenger.NewJSONRPCClient(msgr)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

var addResult float64
// should timeout if no response
err = rpcClient.Call(ctx, destination, "calculator.add", []float64{10, 20}, &addResult)
if err != nil {
fmt.Printf("Failed to call calculator.add: %v\n", err)
os.Exit(1)
}
fmt.Printf("10 + 20 = %f\n", addResult)
}
84 changes: 84 additions & 0 deletions messenger/examples/jsonrpc/server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package main

import (
"context"
"encoding/json"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

substrate "github.com/threefoldtech/tfchain/clients/tfchain-client-go"
"github.com/threefoldtech/tfgrid-sdk-go/messenger"
)

// API
type Calculator struct{}

func (c *Calculator) Add(a, b float64) float64 {
return a + b
}

// HANDLERS
func addHandler(ctx context.Context, calc *Calculator, params json.RawMessage) (interface{}, error) {
var args []float64
if err := json.Unmarshal(params, &args); err != nil {
return nil, fmt.Errorf("invalid parameters: %w", err)
}

if len(args) != 2 {
return nil, fmt.Errorf("expected 2 parameters, got %d", len(args))
}

result := calc.Add(args[0], args[1])
return result, nil
}

const (
chainUrl = "ws://192.168.1.10:9944"
)

func main() {
log.Logger = zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: "15:04"}).With().Timestamp().Logger()
mnemonic := os.Getenv("MNEMONIC")

manager := substrate.NewManager(chainUrl)

msgr, err := messenger.NewMessenger(
messenger.WithSubstrateManager(manager),
messenger.WithMnemonic(mnemonic),
messenger.WithEnableTwinIdentity(true),
)

if err != nil {
fmt.Printf("Failed to create messenger: %v\n", err)
os.Exit(1)
}
defer msgr.Close()

server := messenger.NewJSONRPCServer(msgr)
calc := &Calculator{}

server.RegisterHandler("calculator.add", func(ctx context.Context, params json.RawMessage) (interface{}, error) {
return addHandler(ctx, calc, params)
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if err := server.Start(ctx); err != nil {
fmt.Printf("Failed to start server: %v\n", err)
os.Exit(1)
}

fmt.Println("Server started. Press Ctrl+C to stop.")

sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh

server.Stop()
fmt.Println("Server stopped.")
}
Loading