From 5f51212fd6150dcb9af2e2a2088deed6808377b9 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Fri, 23 May 2025 21:05:55 -0700 Subject: [PATCH 01/51] fix race condition in driver closure --- rpc/driver.go | 48 +++++++++++++++++++++++++++--------------------- rpc/rpc_test.go | 2 +- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/rpc/driver.go b/rpc/driver.go index ac0d7f4..03b94f3 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -121,21 +121,27 @@ func ReceiveChan(ctx context.Context, t Transport) <-chan *Message { // Create a channel for sending messages through a Transport. Creates // a thread that won't exit until the returned channel is closed. // Does not close the underlying Transport. -func SendChan(t Transport, onErr func(uint32, error)) chan<- *Message { - ret := make(chan *Message, 1) - go func(c <-chan *Message) { +func SendChan(t Transport, onErr func(uint32, error)) (chan<- *Message, chan<- struct{}) { + ch := make(chan *Message, 1) + chClose := make(chan struct{}) + go func(ch <-chan *Message, cancel <-chan struct{}) { for { - if m, ok := <-c; !ok { + select { + case <-cancel: return - } else { - xid := m.Xid() - if err := t.Send(m); err != nil && onErr != nil { - onErr(xid, err) + case m, ok := <-ch: + if !ok { + return + } else { + xid := m.Xid() + if err := t.Send(m); err != nil && onErr != nil { + onErr(xid, err) + } } } } - }(ret) - return ret + }(ch, chClose) + return ch, chClose } // RPC driver implements all Transport-agnostic logic for handling @@ -184,13 +190,14 @@ type Driver struct { // If non-nil, all panics arising from service method implementations are passed to PanicHandle. PanicHandler PanicHandler - srv RpcSrv - ctx context.Context - cancel context.CancelFunc - out chan<- *Message - in <-chan *Message - cs CallSet - started int32 + srv RpcSrv + ctx context.Context + cancel context.CancelFunc + out chan<- *Message + outClose chan<- struct{} + in <-chan *Message + cs CallSet + started int32 } // PanicHandler defines a handler for panics arising from service method implementations. @@ -213,7 +220,7 @@ func (r *Driver) logXdr(t xdr.XdrType, f string, args ...interface{}) { var out bytes.Buffer fmt.Fprintf(&out, f, args...) out.WriteByte('\n') - t.XdrMarshal(xdr.XdrPrint{&out}, "") + t.XdrMarshal(xdr.XdrPrint{Out: &out}, "") r.Log.Write(out.Bytes()) } @@ -235,13 +242,13 @@ func NewDriver(ctx context.Context, t Transport) *Driver { cancel: cancel, in: ReceiveChan(ctx, t), } - ret.out = SendChan(t, func(xid uint32, _ error) { + ret.out, ret.outClose = SendChan(t, func(xid uint32, _ error) { ret.cs.Cancel(xid, SEND_ERR) }) go func() { <-ctx.Done() t.Close() - close(ret.out) + close(ret.outClose) }() return &ret @@ -260,7 +267,6 @@ func (r *Driver) Close() { } func (r *Driver) safeSend(ctx context.Context, m *Message) (ok bool) { - defer func() { recover() }() select { case r.out <- m: return true diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index b3f8aeb..8e1ee58 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -69,7 +69,7 @@ func TestChannels(t *testing.T) { tx1, tx2 := rpc.NewStreamTransport(cs[0]), rpc.NewStreamTransport(cs[1]) r := rpc.ReceiveChan(ctx, tx1) defer tx1.Close() - s := rpc.SendChan(tx2, nil) + s, _ := rpc.SendChan(tx2, nil) go func() { defer close(s) defer tx2.Close() From db0ac80aeb0aa8d295384659d831006c4606ecf7 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Thu, 5 Jun 2025 09:38:48 -0700 Subject: [PATCH 02/51] avoid goroutine when processing server calls in sequence --- rpc/driver.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/rpc/driver.go b/rpc/driver.go index 03b94f3..e853777 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -373,7 +373,7 @@ loop: unlock: unlock, })) - go func() { + q := func() { defer func() { unlock() if i := recover(); i != nil { @@ -396,7 +396,12 @@ loop: r.safeSend(r.ctx, &reply) }() proc.Do() - }() + } + if r.Lock == nil { + go q() + } else { + q() + } } r.Close() r.cs.CancelAll() From 9d896cd84e42573944999ccf3d562a78435f239c Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Sun, 8 Jun 2025 11:11:10 -0700 Subject: [PATCH 03/51] use a sync.Pool for Messages in rpc.Transport and rpc.Driver --- rpc/driver.go | 25 ++++++++++++++----- rpc/rpc_test.go | 2 +- rpc/transport.go | 62 ++++++++++++++++++++++++++++++++++-------------- 3 files changed, 64 insertions(+), 25 deletions(-) diff --git a/rpc/driver.go b/rpc/driver.go index e853777..79c52e3 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -110,6 +110,7 @@ func ReceiveChan(ctx context.Context, t Transport) <-chan *Message { select { case c <- m: case <-ctx.Done(): + m.Reclaim() close(c) return } @@ -134,7 +135,8 @@ func SendChan(t Transport, onErr func(uint32, error)) (chan<- *Message, chan<- s return } else { xid := m.Xid() - if err := t.Send(m); err != nil && onErr != nil { + err := t.Send(m) + if err != nil && onErr != nil { onErr(xid, err) } } @@ -190,6 +192,7 @@ type Driver struct { // If non-nil, all panics arising from service method implementations are passed to PanicHandle. PanicHandler PanicHandler + t Transport srv RpcSrv ctx context.Context cancel context.CancelFunc @@ -238,6 +241,7 @@ func NewDriver(ctx context.Context, t Transport) *Driver { ret := Driver{ Log: DefaultLog, Lock: &sync.Mutex{}, + t: t, ctx: ctx, cancel: cancel, in: ReceiveChan(ctx, t), @@ -254,6 +258,10 @@ func NewDriver(ctx context.Context, t Transport) *Driver { return &ret } +func (r *Driver) newMessage(peer string) *Message { + return r.t.NewMessage(peer) +} + // Register an RPC server. func (r *Driver) Register(srv xdr.XdrSrv) { r.srv.Register(srv) @@ -271,6 +279,7 @@ func (r *Driver) safeSend(ctx context.Context, m *Message) (ok bool) { case r.out <- m: return true case <-ctx.Done(): + m.Reclaim() return false } } @@ -287,11 +296,11 @@ func (r *Driver) SendCall(ctx context.Context, proc xdr.XdrProc) (err error) { c <- rmsg close(c) }) - m := Message{Peer: peer} + m := r.newMessage(peer) r.logXdr(proc.GetArg(), "->%s CALL(xid=%d) %s", peer, cmsg.Xid, proc.ProcName()) m.Serialize(cmsg, proc.GetArg()) - if !r.safeSend(ctx, &m) { + if !r.safeSend(ctx, m) { r.cs.Delete(cmsg.Xid) return ErrTransportClosed } @@ -345,6 +354,7 @@ loop: } msg, err := GetMsg(m.In()) if err != nil { + m.Reclaim() fmt.Fprintf(os.Stderr, "GetMsg failed: %s\n", err) break } @@ -352,16 +362,19 @@ loop: if pc := r.cs.GetReply(m.Peer, msg, m.In()); pc != nil { r.logXdr(pc.Proc.GetRes(), "<-%s REPLY(xid=%d) %s", m.Peer, msg.Xid, pc.Proc.ProcName()) + m.Reclaim() pc.Cb(msg) continue } rmsg, proc := r.srv.GetProc(msg, m.In()) + m.Reclaim() if rmsg == nil { continue } else if proc == nil { - reply := Message{Peer: m.Peer} + reply := r.newMessage(m.Peer) reply.Serialize(rmsg) + reply.Reclaim() //XXX continue } @@ -386,14 +399,14 @@ loop: SetStat(rmsg, SYSTEM_ERR) } } - reply := Message{Peer: m.Peer} + reply := r.newMessage(m.Peer) reply.Serialize(rmsg) if IsSuccess(rmsg) { reply.Serialize(proc.GetRes()) r.logXdr(proc.GetRes(), "->%s REPLY(xid=%d) %s", m.Peer, msg.Xid, proc.ProcName()) } - r.safeSend(r.ctx, &reply) + r.safeSend(r.ctx, reply) }() proc.Do() } diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index 8e1ee58..763fd36 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -60,7 +60,7 @@ func TestChannels(t *testing.T) { contents := []string{"one\n", "two\n", "three\n"} var ms []*rpc.Message for _, msg := range contents { - m := &rpc.Message{} + m := rpc.NewMessage("", &bytes.Buffer{}, func() {}) m.WriteString(msg) ms = append(ms, m) } diff --git a/rpc/transport.go b/rpc/transport.go index 3d0a2f4..8b2378c 100644 --- a/rpc/transport.go +++ b/rpc/transport.go @@ -1,27 +1,38 @@ - package rpc import ( "bytes" "encoding/binary" "fmt" - "github.com/xdrpp/goxdr/xdr" "io" "net" + "sync" "sync/atomic" + + "github.com/xdrpp/goxdr/xdr" ) type Message struct { - bytes.Buffer - Peer string + *bytes.Buffer + Peer string + reclaim func() +} + +func NewMessage(peer string, buf *bytes.Buffer, reclaim func()) *Message { + return &Message{Buffer: buf, Peer: peer, reclaim: reclaim} +} + +func (m *Message) Reclaim() { + m.reclaim() + m.reclaim = nil } func (m *Message) In() xdr.XDR { - return xdr.XdrIn{m} + return xdr.XdrIn{In: m} } func (m *Message) Out() xdr.XDR { - return xdr.XdrOut{m} + return xdr.XdrOut{Out: m} } func (m *Message) Xid() uint32 { @@ -32,7 +43,7 @@ func (m *Message) Xid() uint32 { return 0 } -func (m *Message) Serialize(vs...xdr.XdrType) { +func (m *Message) Serialize(vs ...xdr.XdrType) { out := m.Out() for i := range vs { vs[i].XdrMarshal(out, "") @@ -61,6 +72,8 @@ type Transport interface { // because each instance of the transport is connected to a single // endpoint (like TCP and unlike an unconnected UDP socket). IsConnected() bool + + NewMessage(peer string) *Message } var ErrTransportClosed = fmt.Errorf("Transport is closed") @@ -68,7 +81,7 @@ var ErrTransportClosed = fmt.Errorf("Transport is closed") // Implements RFC5531 record-marking protocol for stream sockets type StreamTransport struct { MaxMsgSize int - Conn net.Conn + Conn net.Conn // Since a StreamTransport is connected, we ignore Peer in sent // messages, but can hardcode a value for incoming messages. @@ -79,19 +92,28 @@ type StreamTransport struct { // err are synchronized through okay. Shared read-only access to // err is allowed only when okay == 0. Value -1 is used as an // exclusive lock to ensure only one thread updates err. - okay int32 // 1 = okay, 0 = failed, -1 = failing - err error + okay int32 // 1 = okay, 0 = failed, -1 = failing + err error + + mpool sync.Pool } // Create a stream transport from a connected stream socket. This is // the only valid way to initialize a StreamTransport. You can // manually adjust MaxMsgSize after calling this function. func NewStreamTransport(c net.Conn) *StreamTransport { - return &StreamTransport{ + r := &StreamTransport{ MaxMsgSize: 0x100000, - Conn: c, - okay: 1, + Conn: c, + okay: 1, } + r.mpool.New = func() any { return &bytes.Buffer{} } + return r +} + +func (tx *StreamTransport) NewMessage(peer string) *Message { + buf := tx.mpool.Get().(*bytes.Buffer) + return NewMessage(peer, buf, func() { tx.mpool.Put(buf) }) } func (tx *StreamTransport) fail(err error) { @@ -122,6 +144,7 @@ func (tx *StreamTransport) Close() { const maxSegment = 0x7fffffff func (tx *StreamTransport) Send(m *Message) error { + defer m.Reclaim() if tx.failed() { return tx.err } @@ -152,26 +175,29 @@ func (tx *StreamTransport) Receive() (*Message, error) { if tx.failed() { return nil, tx.err } - ret := Message{ Peer: tx.Peer } + ret := tx.NewMessage(tx.Peer) b := make([]byte, 4) for b[0]&0x80 == 0 { if n, err := tx.Conn.Read(b); n != 4 || err != nil { + ret.Reclaim() tx.fail(err) return nil, err } n := binary.BigEndian.Uint32(b) & 0x7fffffff - if int(n) > tx.MaxMsgSize - ret.Len() { + if int(n) > tx.MaxMsgSize-ret.Len() { err := fmt.Errorf("Message length %d exceeds maximum %d", - int(n) + ret.Len(), tx.MaxMsgSize) + int(n)+ret.Len(), tx.MaxMsgSize) + ret.Reclaim() tx.fail(err) return nil, err } - if _, err := io.CopyN(&ret, tx.Conn, int64(n)); err != nil { + if _, err := io.CopyN(ret, tx.Conn, int64(n)); err != nil { + ret.Reclaim() tx.fail(err) return nil, err } } - return &ret, nil + return ret, nil } func (tx *StreamTransport) IsConnected() bool { From 9f3dc84c8d5e2cecf02a2489a044397cc684c6ca Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Sun, 8 Jun 2025 11:24:10 -0700 Subject: [PATCH 04/51] fix --- rpc/transport.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rpc/transport.go b/rpc/transport.go index 8b2378c..1e44979 100644 --- a/rpc/transport.go +++ b/rpc/transport.go @@ -113,7 +113,10 @@ func NewStreamTransport(c net.Conn) *StreamTransport { func (tx *StreamTransport) NewMessage(peer string) *Message { buf := tx.mpool.Get().(*bytes.Buffer) - return NewMessage(peer, buf, func() { tx.mpool.Put(buf) }) + return NewMessage(peer, buf, func() { + buf.Reset() + tx.mpool.Put(buf) + }) } func (tx *StreamTransport) fail(err error) { From cbe282b6e70091beb1447c340cf945235ed76136 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Sun, 8 Jun 2025 11:26:35 -0700 Subject: [PATCH 05/51] fix --- rpc/transport.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/transport.go b/rpc/transport.go index 1e44979..13c0980 100644 --- a/rpc/transport.go +++ b/rpc/transport.go @@ -113,8 +113,8 @@ func NewStreamTransport(c net.Conn) *StreamTransport { func (tx *StreamTransport) NewMessage(peer string) *Message { buf := tx.mpool.Get().(*bytes.Buffer) + buf.Reset() return NewMessage(peer, buf, func() { - buf.Reset() tx.mpool.Put(buf) }) } From 970518847457a9247bca6d20b6d1f6475967d218 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Sun, 8 Jun 2025 15:03:19 -0700 Subject: [PATCH 06/51] cosmetic --- rpc/driver.go | 12 ++++++------ rpc/transport.go | 21 +++++++++++---------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/rpc/driver.go b/rpc/driver.go index 79c52e3..2980f50 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -110,7 +110,7 @@ func ReceiveChan(ctx context.Context, t Transport) <-chan *Message { select { case c <- m: case <-ctx.Done(): - m.Reclaim() + m.Recycle() close(c) return } @@ -279,7 +279,7 @@ func (r *Driver) safeSend(ctx context.Context, m *Message) (ok bool) { case r.out <- m: return true case <-ctx.Done(): - m.Reclaim() + m.Recycle() return false } } @@ -354,7 +354,7 @@ loop: } msg, err := GetMsg(m.In()) if err != nil { - m.Reclaim() + m.Recycle() fmt.Fprintf(os.Stderr, "GetMsg failed: %s\n", err) break } @@ -362,19 +362,19 @@ loop: if pc := r.cs.GetReply(m.Peer, msg, m.In()); pc != nil { r.logXdr(pc.Proc.GetRes(), "<-%s REPLY(xid=%d) %s", m.Peer, msg.Xid, pc.Proc.ProcName()) - m.Reclaim() + m.Recycle() pc.Cb(msg) continue } rmsg, proc := r.srv.GetProc(msg, m.In()) - m.Reclaim() + m.Recycle() if rmsg == nil { continue } else if proc == nil { reply := r.newMessage(m.Peer) reply.Serialize(rmsg) - reply.Reclaim() //XXX + reply.Recycle() //XXX continue } diff --git a/rpc/transport.go b/rpc/transport.go index 13c0980..edf3fb6 100644 --- a/rpc/transport.go +++ b/rpc/transport.go @@ -15,16 +15,17 @@ import ( type Message struct { *bytes.Buffer Peer string - reclaim func() + recycle func() } -func NewMessage(peer string, buf *bytes.Buffer, reclaim func()) *Message { - return &Message{Buffer: buf, Peer: peer, reclaim: reclaim} +func NewMessage(peer string, buf *bytes.Buffer, recycle func()) *Message { + return &Message{Buffer: buf, Peer: peer, recycle: recycle} } -func (m *Message) Reclaim() { - m.reclaim() - m.reclaim = nil +func (m *Message) Recycle() { + m.recycle() + m.Buffer = nil + m.recycle = nil } func (m *Message) In() xdr.XDR { @@ -147,7 +148,7 @@ func (tx *StreamTransport) Close() { const maxSegment = 0x7fffffff func (tx *StreamTransport) Send(m *Message) error { - defer m.Reclaim() + defer m.Recycle() if tx.failed() { return tx.err } @@ -182,7 +183,7 @@ func (tx *StreamTransport) Receive() (*Message, error) { b := make([]byte, 4) for b[0]&0x80 == 0 { if n, err := tx.Conn.Read(b); n != 4 || err != nil { - ret.Reclaim() + ret.Recycle() tx.fail(err) return nil, err } @@ -190,12 +191,12 @@ func (tx *StreamTransport) Receive() (*Message, error) { if int(n) > tx.MaxMsgSize-ret.Len() { err := fmt.Errorf("Message length %d exceeds maximum %d", int(n)+ret.Len(), tx.MaxMsgSize) - ret.Reclaim() + ret.Recycle() tx.fail(err) return nil, err } if _, err := io.CopyN(ret, tx.Conn, int64(n)); err != nil { - ret.Reclaim() + ret.Recycle() tx.fail(err) return nil, err } From 5b659fb9c3a9bd3655570d3d3a5d72ce70d12827 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Sun, 8 Jun 2025 15:27:49 -0700 Subject: [PATCH 07/51] move to a global pool for rpc.Message buffers --- rpc/driver.go | 12 +++--------- rpc/rpc_test.go | 2 +- rpc/transport.go | 36 ++++++++++++++---------------------- 3 files changed, 18 insertions(+), 32 deletions(-) diff --git a/rpc/driver.go b/rpc/driver.go index 2980f50..db5c6af 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -192,7 +192,6 @@ type Driver struct { // If non-nil, all panics arising from service method implementations are passed to PanicHandle. PanicHandler PanicHandler - t Transport srv RpcSrv ctx context.Context cancel context.CancelFunc @@ -241,7 +240,6 @@ func NewDriver(ctx context.Context, t Transport) *Driver { ret := Driver{ Log: DefaultLog, Lock: &sync.Mutex{}, - t: t, ctx: ctx, cancel: cancel, in: ReceiveChan(ctx, t), @@ -258,10 +256,6 @@ func NewDriver(ctx context.Context, t Transport) *Driver { return &ret } -func (r *Driver) newMessage(peer string) *Message { - return r.t.NewMessage(peer) -} - // Register an RPC server. func (r *Driver) Register(srv xdr.XdrSrv) { r.srv.Register(srv) @@ -296,7 +290,7 @@ func (r *Driver) SendCall(ctx context.Context, proc xdr.XdrProc) (err error) { c <- rmsg close(c) }) - m := r.newMessage(peer) + m := NewMessage(peer) r.logXdr(proc.GetArg(), "->%s CALL(xid=%d) %s", peer, cmsg.Xid, proc.ProcName()) m.Serialize(cmsg, proc.GetArg()) @@ -372,7 +366,7 @@ loop: if rmsg == nil { continue } else if proc == nil { - reply := r.newMessage(m.Peer) + reply := NewMessage(m.Peer) reply.Serialize(rmsg) reply.Recycle() //XXX continue @@ -399,7 +393,7 @@ loop: SetStat(rmsg, SYSTEM_ERR) } } - reply := r.newMessage(m.Peer) + reply := NewMessage(m.Peer) reply.Serialize(rmsg) if IsSuccess(rmsg) { reply.Serialize(proc.GetRes()) diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index 763fd36..1e02708 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -60,7 +60,7 @@ func TestChannels(t *testing.T) { contents := []string{"one\n", "two\n", "three\n"} var ms []*rpc.Message for _, msg := range contents { - m := rpc.NewMessage("", &bytes.Buffer{}, func() {}) + m := rpc.NewMessage("") m.WriteString(msg) ms = append(ms, m) } diff --git a/rpc/transport.go b/rpc/transport.go index edf3fb6..374a347 100644 --- a/rpc/transport.go +++ b/rpc/transport.go @@ -14,18 +14,24 @@ import ( type Message struct { *bytes.Buffer - Peer string - recycle func() + Peer string } -func NewMessage(peer string, buf *bytes.Buffer, recycle func()) *Message { - return &Message{Buffer: buf, Peer: peer, recycle: recycle} +var mpool sync.Pool + +func init() { + mpool.New = func() any { return &bytes.Buffer{} } +} + +func NewMessage(peer string) *Message { + buf := mpool.Get().(*bytes.Buffer) + buf.Reset() + return &Message{Buffer: buf, Peer: peer} } func (m *Message) Recycle() { - m.recycle() + mpool.Put(m.Buffer) m.Buffer = nil - m.recycle = nil } func (m *Message) In() xdr.XDR { @@ -73,8 +79,6 @@ type Transport interface { // because each instance of the transport is connected to a single // endpoint (like TCP and unlike an unconnected UDP socket). IsConnected() bool - - NewMessage(peer string) *Message } var ErrTransportClosed = fmt.Errorf("Transport is closed") @@ -95,29 +99,17 @@ type StreamTransport struct { // exclusive lock to ensure only one thread updates err. okay int32 // 1 = okay, 0 = failed, -1 = failing err error - - mpool sync.Pool } // Create a stream transport from a connected stream socket. This is // the only valid way to initialize a StreamTransport. You can // manually adjust MaxMsgSize after calling this function. func NewStreamTransport(c net.Conn) *StreamTransport { - r := &StreamTransport{ + return &StreamTransport{ MaxMsgSize: 0x100000, Conn: c, okay: 1, } - r.mpool.New = func() any { return &bytes.Buffer{} } - return r -} - -func (tx *StreamTransport) NewMessage(peer string) *Message { - buf := tx.mpool.Get().(*bytes.Buffer) - buf.Reset() - return NewMessage(peer, buf, func() { - tx.mpool.Put(buf) - }) } func (tx *StreamTransport) fail(err error) { @@ -179,7 +171,7 @@ func (tx *StreamTransport) Receive() (*Message, error) { if tx.failed() { return nil, tx.err } - ret := tx.NewMessage(tx.Peer) + ret := NewMessage(tx.Peer) b := make([]byte, 4) for b[0]&0x80 == 0 { if n, err := tx.Conn.Read(b); n != 4 || err != nil { From 0c045a932515c9b401152d23aa66e6e74db04e3d Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Sun, 8 Jun 2025 17:57:18 -0700 Subject: [PATCH 08/51] maintain message cache stats --- rpc/transport.go | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/rpc/transport.go b/rpc/transport.go index 374a347..2be9f5e 100644 --- a/rpc/transport.go +++ b/rpc/transport.go @@ -17,15 +17,43 @@ type Message struct { Peer string } -var mpool sync.Pool +var ( + mpool sync.Pool + mpoolLk sync.Mutex + mpoolNumGet int + mpoolNumMiss int + mpoolCap [256]int + mpoolCapPtr int +) + +func MpoolStats() string { + mpoolLk.Lock() + defer mpoolLk.Unlock() + return fmt.Sprintf("rpc.mpool: num_get=%d num_miss=%d miss_ratio=%v%% caps=%v", + mpoolNumGet, mpoolNumMiss, float64(mpoolNumMiss)/float64(mpoolNumGet), mpoolCap) +} func init() { - mpool.New = func() any { return &bytes.Buffer{} } + mpool.New = func() any { + mpoolLk.Lock() + mpoolNumMiss += 1 + mpoolLk.Unlock() + return &bytes.Buffer{} + } } func NewMessage(peer string) *Message { buf := mpool.Get().(*bytes.Buffer) buf.Reset() + + mpoolLk.Lock() + mpoolNumGet += 1 + if buf.Cap() > 0 { + mpoolCap[mpoolCapPtr%len(mpoolCap)] = buf.Cap() + mpoolCapPtr += 1 + } + mpoolLk.Unlock() + return &Message{Buffer: buf, Peer: peer} } From 81c42df7ffd2f340de2baf8dfcf847fd002f4e13 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Sun, 8 Jun 2025 18:11:34 -0700 Subject: [PATCH 09/51] add forgotten safeSend --- rpc/driver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/driver.go b/rpc/driver.go index db5c6af..e44fd7c 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -368,7 +368,7 @@ loop: } else if proc == nil { reply := NewMessage(m.Peer) reply.Serialize(rmsg) - reply.Recycle() //XXX + r.safeSend(r.ctx, reply) continue } From 69ebab7c1a284466ec215b3e876be35958d39988 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Sun, 8 Jun 2025 18:33:03 -0700 Subject: [PATCH 10/51] improve xdrReadN; results in massive speedup --- cmd/goxdr/goxdr.go | 3 +-- cmd/goxdr/header.go.in | 6 +++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/cmd/goxdr/goxdr.go b/cmd/goxdr/goxdr.go index f2e5d82..0157199 100644 --- a/cmd/goxdr/goxdr.go +++ b/cmd/goxdr/goxdr.go @@ -494,7 +494,7 @@ func (r *rpc_const) emit(e *emitter) { if r.comment != "" { e.printf("%s\n", r.comment) } - // Replace the character '\'' with '_' in r.val before printing constants; + // Replace the character '\'' with '_' in r.val before printing constants; // golang supports _ but not ' valStr := strings.ReplaceAll(r.val.String(), "'", "_") e.printf("const %s = %s\n", r.id, valStr) @@ -1386,7 +1386,6 @@ func main() { if *opt_emitbp { fmt.Fprintf(out, `import( - "bytes" "context" "encoding/binary" "fmt" diff --git a/cmd/goxdr/header.go.in b/cmd/goxdr/header.go.in index 66bb1d8..db2540e 100644 --- a/cmd/goxdr/header.go.in +++ b/cmd/goxdr/header.go.in @@ -665,11 +665,11 @@ func (xo XdrOut) Marshal(name string, i XdrType) { } func xdrReadN(in io.Reader, n uint32) []byte { - var b bytes.Buffer - if _, err := io.CopyN(&b, in, int64(n)); err != nil { + p := make([]byte, n) + if k, err := in.Read(p); err != nil || k != int(n){ XdrPanic("%s", err.Error()) } - return b.Bytes() + return p } func xdrReadPad(in io.Reader, n uint32) { From 9f4cadbb1069b227151a0a5742483637b6e55bcd Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Sun, 8 Jun 2025 20:27:09 -0700 Subject: [PATCH 11/51] fix stats display --- rpc/transport.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/transport.go b/rpc/transport.go index 2be9f5e..123a06f 100644 --- a/rpc/transport.go +++ b/rpc/transport.go @@ -30,7 +30,7 @@ func MpoolStats() string { mpoolLk.Lock() defer mpoolLk.Unlock() return fmt.Sprintf("rpc.mpool: num_get=%d num_miss=%d miss_ratio=%v%% caps=%v", - mpoolNumGet, mpoolNumMiss, float64(mpoolNumMiss)/float64(mpoolNumGet), mpoolCap) + mpoolNumGet, mpoolNumMiss, 100*float64(mpoolNumMiss)/float64(mpoolNumGet), mpoolCap) } func init() { From 4d9eb656c62cba950e0d730d190378abbd579ca5 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Tue, 15 Jul 2025 12:21:08 -0700 Subject: [PATCH 12/51] add struct pool and arena --- xdr/apool.go | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++++ xdr/bpool.go | 31 +++++++++++++++++++++ xdr/spool.go | 48 +++++++++++++++++++++++++++++++ 3 files changed, 158 insertions(+) create mode 100644 xdr/apool.go create mode 100644 xdr/bpool.go create mode 100644 xdr/spool.go diff --git a/xdr/apool.go b/xdr/apool.go new file mode 100644 index 0000000..b760573 --- /dev/null +++ b/xdr/apool.go @@ -0,0 +1,79 @@ +package xdr + +import ( + "fmt" + "sync" + "unsafe" +) + +type Arena[T any] struct { + objects []T + lk sync.Mutex + free []*T + used []*T + numGet int + numMiss int +} + +func NewArena[T any](n int) *Arena[T] { + a := &Arena[T]{ + objects: make([]T, n), + free: make([]*T, n), + } + for i := range a.objects { + a.free[i] = &a.objects[i] + } + return a +} + +func (a *Arena[T]) Get() *T { + a.lk.Lock() + defer a.lk.Unlock() + + if len(a.free) == 0 { + a.numMiss += 1 + var z T + return &z + } + + a.numGet += 1 + + ptr := a.free[len(a.free)-1] + a.free = a.free[:len(a.free)-1] + + var z T // reset + *ptr = z + return ptr +} + +func (a *Arena[T]) Recycle(x *T) { + if !contains(a.objects, x) { + return + } + + a.lk.Lock() + defer a.lk.Unlock() + a.free = append(a.free, x) +} + +func (a *Arena[T]) StatString() string { + a.lk.Lock() + defer a.lk.Unlock() + return fmt.Sprintf("num_get=%d num_miss=%d miss_ratio=%.3f%%", + a.numGet, a.numMiss, 100*float64(a.numMiss)/float64(a.numGet)) +} + +func contains[T any](slice []T, ptr *T) bool { + if len(slice) == 0 || ptr == nil { + return false + } + + // Get pointer to first element + first := unsafe.Pointer(&slice[0]) + // Get pointer to one past the last element + last := unsafe.Pointer(uintptr(first) + uintptr(len(slice))*unsafe.Sizeof(slice[0])) + + // Check if ptr is within the range [first, last) + ptrAddr := unsafe.Pointer(ptr) + return uintptr(ptrAddr) >= uintptr(first) && uintptr(ptrAddr) < uintptr(last) +} diff --git a/xdr/bpool.go b/xdr/bpool.go new file mode 100644 index 0000000..75cb7a7 --- /dev/null +++ b/xdr/bpool.go @@ -0,0 +1,31 @@ +package xdr + +import "sync" + +type SmallBytesPool struct { + size [8]sync.Pool +} + +func NewSmallBytesPool() *SmallBytesPool { + return &SmallBytesPool{ + size: [8]sync.Pool{ + {New: func() any { return []byte{0} }}, + {New: func() any { return []byte{0, 0} }}, + {New: func() any { return []byte{0, 0, 0} }}, + {New: func() any { return []byte{0, 0, 0, 0} }}, + {New: func() any { return []byte{0, 0, 0, 0, 0} }}, + {New: func() any { return []byte{0, 0, 0, 0, 0, 0} }}, + {New: func() any { return []byte{0, 0, 0, 0, 0, 0, 0} }}, + {New: func() any { return []byte{0, 0, 0, 0, 0, 0, 0, 0} }}, + }, + } +} + +func (x *SmallBytesPool) Get(size int) []byte { + // XXX: must zero out + return x.size[size-1].Get().([]byte) +} + +func (x *SmallBytesPool) Recycle(b []byte) { + x.size[len(b)-1].Put(b) +} diff --git a/xdr/spool.go b/xdr/spool.go new file mode 100644 index 0000000..3ac670e --- /dev/null +++ b/xdr/spool.go @@ -0,0 +1,48 @@ +package xdr + +import ( + "fmt" + "sync" +) + +type StructPool[T any] struct { + pool sync.Pool + // stats + lk sync.Mutex + numGet int + numMiss int +} + +func NewStructPool[T any]() *StructPool[T] { + x := &StructPool[T]{} + x.pool.New = func() any { + x.lk.Lock() + x.numMiss += 1 + x.lk.Unlock() + var z T + return &z + } + return x +} + +func (x *StructPool[T]) Get() *T { + buf := x.pool.Get().(*T) + var z T + *buf = z + + x.lk.Lock() + x.numGet += 1 + x.lk.Unlock() + return buf +} + +func (x *StructPool[T]) Recycle(b *T) { + x.pool.Put(b) +} + +func (x *StructPool[T]) StatString() string { + x.lk.Lock() + defer x.lk.Unlock() + return fmt.Sprintf("num_get=%d num_miss=%d miss_ratio=%.3f%%", + x.numGet, x.numMiss, 100*float64(x.numMiss)/float64(x.numGet)) +} From f433905e58a5f8a50dfb527537c7e66f743db6e0 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Thu, 17 Jul 2025 09:56:19 -0700 Subject: [PATCH 13/51] fix bug; message pool --- rpc/driver.go | 23 ++++++++----- rpc/rpc_test.go | 3 +- rpc/transport.go | 68 +++++++++++++++++--------------------- xdr/{apool.go => arena.go} | 0 xdr/{spool.go => pool.go} | 28 ++++++++-------- 5 files changed, 60 insertions(+), 62 deletions(-) rename xdr/{apool.go => arena.go} (100%) rename xdr/{spool.go => pool.go} (53%) diff --git a/rpc/driver.go b/rpc/driver.go index e44fd7c..14d011e 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -200,6 +200,8 @@ type Driver struct { in <-chan *Message cs CallSet started int32 + + msgPool *MsgPool } // PanicHandler defines a handler for panics arising from service method implementations. @@ -238,11 +240,12 @@ func NewDriver(ctx context.Context, t Transport) *Driver { } ctx, cancel := context.WithCancel(ctx) ret := Driver{ - Log: DefaultLog, - Lock: &sync.Mutex{}, - ctx: ctx, - cancel: cancel, - in: ReceiveChan(ctx, t), + Log: DefaultLog, + Lock: &sync.Mutex{}, + ctx: ctx, + cancel: cancel, + in: ReceiveChan(ctx, t), + msgPool: NewMsgPool(), } ret.out, ret.outClose = SendChan(t, func(xid uint32, _ error) { ret.cs.Cancel(xid, SEND_ERR) @@ -290,7 +293,7 @@ func (r *Driver) SendCall(ctx context.Context, proc xdr.XdrProc) (err error) { c <- rmsg close(c) }) - m := NewMessage(peer) + m := r.msgPool.NewMessage(peer) r.logXdr(proc.GetArg(), "->%s CALL(xid=%d) %s", peer, cmsg.Xid, proc.ProcName()) m.Serialize(cmsg, proc.GetArg()) @@ -362,11 +365,12 @@ loop: } rmsg, proc := r.srv.GetProc(msg, m.In()) - m.Recycle() if rmsg == nil { + m.Recycle() continue } else if proc == nil { - reply := NewMessage(m.Peer) + reply := r.msgPool.NewMessage(m.Peer) + m.Recycle() reply.Serialize(rmsg) r.safeSend(r.ctx, reply) continue @@ -393,13 +397,14 @@ loop: SetStat(rmsg, SYSTEM_ERR) } } - reply := NewMessage(m.Peer) + reply := r.msgPool.NewMessage(m.Peer) reply.Serialize(rmsg) if IsSuccess(rmsg) { reply.Serialize(proc.GetRes()) r.logXdr(proc.GetRes(), "->%s REPLY(xid=%d) %s", m.Peer, msg.Xid, proc.ProcName()) } + m.Recycle() r.safeSend(r.ctx, reply) }() proc.Do() diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index 1e02708..7eb02ec 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -59,8 +59,9 @@ func TestChannels(t *testing.T) { contents := []string{"one\n", "two\n", "three\n"} var ms []*rpc.Message + msgPool := rpc.NewMsgPool() for _, msg := range contents { - m := rpc.NewMessage("") + m := msgPool.NewMessage("") m.WriteString(msg) ms = append(ms, m) } diff --git a/rpc/transport.go b/rpc/transport.go index 123a06f..02b42ac 100644 --- a/rpc/transport.go +++ b/rpc/transport.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "net" - "sync" "sync/atomic" "github.com/xdrpp/goxdr/xdr" @@ -15,51 +14,40 @@ import ( type Message struct { *bytes.Buffer Peer string + pool *MsgPool } -var ( - mpool sync.Pool - mpoolLk sync.Mutex - mpoolNumGet int - mpoolNumMiss int - mpoolCap [256]int - mpoolCapPtr int -) +type MsgPool xdr.Pool[*Message] -func MpoolStats() string { - mpoolLk.Lock() - defer mpoolLk.Unlock() - return fmt.Sprintf("rpc.mpool: num_get=%d num_miss=%d miss_ratio=%v%% caps=%v", - mpoolNumGet, mpoolNumMiss, 100*float64(mpoolNumMiss)/float64(mpoolNumGet), mpoolCap) +func NewMsgPool() *MsgPool { + pool := &xdr.Pool[*Message]{} + pool.SetMkReset( + func() *Message { + return &Message{Buffer: &bytes.Buffer{}, pool: (*MsgPool)(pool)} + }, + func(m *Message) { + m.Peer = "" + m.Buffer.Reset() + }, + ) + return (*MsgPool)(pool) } -func init() { - mpool.New = func() any { - mpoolLk.Lock() - mpoolNumMiss += 1 - mpoolLk.Unlock() - return &bytes.Buffer{} - } +func (msgPool *MsgPool) NewMessage(peer string) *Message { + msg := (*xdr.Pool[*Message])(msgPool).Get() + msg.Peer = peer + return msg } -func NewMessage(peer string) *Message { - buf := mpool.Get().(*bytes.Buffer) - buf.Reset() - - mpoolLk.Lock() - mpoolNumGet += 1 - if buf.Cap() > 0 { - mpoolCap[mpoolCapPtr%len(mpoolCap)] = buf.Cap() - mpoolCapPtr += 1 - } - mpoolLk.Unlock() - - return &Message{Buffer: buf, Peer: peer} +func (msgPool *MsgPool) Reycle(msg *Message) { + (*xdr.Pool[*Message])(msgPool).Recycle(msg) } func (m *Message) Recycle() { - mpool.Put(m.Buffer) - m.Buffer = nil + if m == nil { + panic("XXX m=nil") + } + m.pool.Reycle(m) } func (m *Message) In() xdr.XDR { @@ -127,6 +115,8 @@ type StreamTransport struct { // exclusive lock to ensure only one thread updates err. okay int32 // 1 = okay, 0 = failed, -1 = failing err error + + msgPool *MsgPool } // Create a stream transport from a connected stream socket. This is @@ -137,6 +127,7 @@ func NewStreamTransport(c net.Conn) *StreamTransport { MaxMsgSize: 0x100000, Conn: c, okay: 1, + msgPool: NewMsgPool(), } } @@ -199,11 +190,14 @@ func (tx *StreamTransport) Receive() (*Message, error) { if tx.failed() { return nil, tx.err } - ret := NewMessage(tx.Peer) + ret := tx.msgPool.NewMessage(tx.Peer) b := make([]byte, 4) for b[0]&0x80 == 0 { if n, err := tx.Conn.Read(b); n != 4 || err != nil { ret.Recycle() + if err == nil { + err = fmt.Errorf("short read") + } tx.fail(err) return nil, err } diff --git a/xdr/apool.go b/xdr/arena.go similarity index 100% rename from xdr/apool.go rename to xdr/arena.go diff --git a/xdr/spool.go b/xdr/pool.go similarity index 53% rename from xdr/spool.go rename to xdr/pool.go index 3ac670e..a122df5 100644 --- a/xdr/spool.go +++ b/xdr/pool.go @@ -5,42 +5,40 @@ import ( "sync" ) -type StructPool[T any] struct { - pool sync.Pool +type Pool[T any] struct { + pool sync.Pool + reset func(T) // stats lk sync.Mutex numGet int numMiss int } -func NewStructPool[T any]() *StructPool[T] { - x := &StructPool[T]{} +func (x *Pool[T]) SetMkReset(mk func() T, reset func(T)) { x.pool.New = func() any { x.lk.Lock() x.numMiss += 1 x.lk.Unlock() - var z T - return &z + return mk() } - return x + x.reset = reset } -func (x *StructPool[T]) Get() *T { - buf := x.pool.Get().(*T) - var z T - *buf = z +func (x *Pool[T]) Get() T { + o := x.pool.Get().(T) + x.reset(o) x.lk.Lock() x.numGet += 1 x.lk.Unlock() - return buf + return o } -func (x *StructPool[T]) Recycle(b *T) { - x.pool.Put(b) +func (x *Pool[T]) Recycle(o T) { + x.pool.Put(o) } -func (x *StructPool[T]) StatString() string { +func (x *Pool[T]) StatString() string { x.lk.Lock() defer x.lk.Unlock() return fmt.Sprintf("num_get=%d num_miss=%d miss_ratio=%.3f%%", From 8b19b475450d76fd6f4f08ab49564986c0f90f16 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Thu, 17 Jul 2025 10:41:25 -0700 Subject: [PATCH 14/51] msgpool as init param --- rpc/driver.go | 4 ++-- rpc/rpc_test.go | 8 +++++--- rpc/transport.go | 4 ++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/rpc/driver.go b/rpc/driver.go index 14d011e..f64fb47 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -234,7 +234,7 @@ func (r *Driver) logXdr(t xdr.XdrType, f string, args ...interface{}) { // // If you will never need to cancel the driver, or plan to do so by // calling Close(), then you may supply a nil ctx. -func NewDriver(ctx context.Context, t Transport) *Driver { +func NewDriver(ctx context.Context, mp *MsgPool, t Transport) *Driver { if ctx == nil { ctx = context.Background() } @@ -245,7 +245,7 @@ func NewDriver(ctx context.Context, t Transport) *Driver { ctx: ctx, cancel: cancel, in: ReceiveChan(ctx, t), - msgPool: NewMsgPool(), + msgPool: mp, } ret.out, ret.outClose = SendChan(t, func(xid uint32, _ error) { ret.cs.Cancel(xid, SEND_ERR) diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index 7eb02ec..3cfa9ad 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -67,7 +67,7 @@ func TestChannels(t *testing.T) { } cs := streampair() - tx1, tx2 := rpc.NewStreamTransport(cs[0]), rpc.NewStreamTransport(cs[1]) + tx1, tx2 := rpc.NewStreamTransport(cs[0], msgPool), rpc.NewStreamTransport(cs[1], msgPool) r := rpc.ReceiveChan(ctx, tx1) defer tx1.Close() s, _ := rpc.SendChan(tx2, nil) @@ -95,14 +95,16 @@ func TestRPC(t *testing.T) { defer cancel() cs := streampair() - r1 := rpc.NewDriver(ctx, rpc.NewStreamTransport(cs[0])) + mp1 := rpc.NewMsgPool() + r1 := rpc.NewDriver(ctx, mp1, rpc.NewStreamTransport(cs[0], mp1)) r1.Register(TEST_V1_Server{&Server{}}) go func() { r1.Go() fmt.Println("loop1 returned") }() - r2 := rpc.NewDriver(ctx, rpc.NewStreamTransport(cs[1])) + mp2 := rpc.NewMsgPool() + r2 := rpc.NewDriver(ctx, mp2, rpc.NewStreamTransport(cs[1], mp2)) r2.Log = os.Stderr go func() { r2.Go() diff --git a/rpc/transport.go b/rpc/transport.go index 02b42ac..1aa9b1e 100644 --- a/rpc/transport.go +++ b/rpc/transport.go @@ -122,12 +122,12 @@ type StreamTransport struct { // Create a stream transport from a connected stream socket. This is // the only valid way to initialize a StreamTransport. You can // manually adjust MaxMsgSize after calling this function. -func NewStreamTransport(c net.Conn) *StreamTransport { +func NewStreamTransport(c net.Conn, mp *MsgPool) *StreamTransport { return &StreamTransport{ MaxMsgSize: 0x100000, Conn: c, okay: 1, - msgPool: NewMsgPool(), + msgPool: mp, } } From 10fbd2f7fbae8999586e86f66f016f69df48b387 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Thu, 24 Jul 2025 11:55:20 -0700 Subject: [PATCH 15/51] process incoming messages in parallel --- rpc/driver.go | 157 +++++++++++++++++++++++++++++--------------------- 1 file changed, 91 insertions(+), 66 deletions(-) diff --git a/rpc/driver.go b/rpc/driver.go index f64fb47..c29d31b 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -202,6 +202,7 @@ type Driver struct { started int32 msgPool *MsgPool + msgCh chan *Message } // PanicHandler defines a handler for panics arising from service method implementations. @@ -246,6 +247,7 @@ func NewDriver(ctx context.Context, mp *MsgPool, t Transport) *Driver { cancel: cancel, in: ReceiveChan(ctx, t), msgPool: mp, + msgCh: make(chan *Message), } ret.out, ret.outClose = SendChan(t, func(xid uint32, _ error) { ret.cs.Cancel(xid, SEND_ERR) @@ -255,6 +257,9 @@ func NewDriver(ctx context.Context, mp *MsgPool, t Transport) *Driver { t.Close() close(ret.outClose) }() + for i := 0; i < 5; i++ { //XXX + go ret.doMsgs() + } return &ret } @@ -338,83 +343,103 @@ func (r *Driver) Go() { } loop: for { - // check whether the context was cancelled or timed out select { case <-r.ctx.Done(): break loop - default: + case m := <-r.in: + select { + case <-r.ctx.Done(): + break loop + case r.msgCh <- m: + } } + } + r.Close() + r.cs.CancelAll() +} - m := <-r.in - if m == nil { - break - } - msg, err := GetMsg(m.In()) - if err != nil { - m.Recycle() - fmt.Fprintf(os.Stderr, "GetMsg failed: %s\n", err) - break +func (r *Driver) doMsgs() { + for { + select { + case <-r.ctx.Done(): + return + case m := <-r.msgCh: + r.doMsg(m) } + } +} - if pc := r.cs.GetReply(m.Peer, msg, m.In()); pc != nil { - r.logXdr(pc.Proc.GetRes(), "<-%s REPLY(xid=%d) %s", - m.Peer, msg.Xid, pc.Proc.ProcName()) - m.Recycle() - pc.Cb(msg) - continue - } +func (r *Driver) doMsg(m *Message) { - rmsg, proc := r.srv.GetProc(msg, m.In()) - if rmsg == nil { - m.Recycle() - continue - } else if proc == nil { - reply := r.msgPool.NewMessage(m.Peer) - m.Recycle() - reply.Serialize(rmsg) - r.safeSend(r.ctx, reply) - continue - } + if m == nil { + r.cancel() + return + } - unlock := mkUnlocker(r.Lock) - r.logXdr(proc.GetArg(), "<-%s CALL(xid=%d) %s", - m.Peer, msg.Xid, proc.ProcName()) - proc.SetContext(context.WithValue(r.ctx, ctxKey, &srvCtx{ - peerCtx: peerCtx{Peer: m.Peer}, - unlock: unlock, - })) - - q := func() { - defer func() { - unlock() - if i := recover(); i != nil { - if r.PanicHandler != nil { - r.PanicHandler.PanicHandle(i) - } else { - fmt.Fprintf(os.Stderr, "%s\n", i) - } - if IsSuccess(rmsg) { - SetStat(rmsg, SYSTEM_ERR) - } + msg, err := GetMsg(m.In()) + if err != nil { + m.Recycle() + fmt.Fprintf(os.Stderr, "GetMsg failed: %s\n", err) + r.cancel() + return + } + + if pc := r.cs.GetReply(m.Peer, msg, m.In()); pc != nil { + r.logXdr(pc.Proc.GetRes(), "<-%s REPLY(xid=%d) %s", m.Peer, msg.Xid, pc.Proc.ProcName()) + m.Recycle() + pc.Cb(msg) + return + } + + rmsg, proc := r.srv.GetProc(msg, m.In()) + if rmsg == nil { + m.Recycle() + return + } else if proc == nil { + reply := r.msgPool.NewMessage(m.Peer) + m.Recycle() + reply.Serialize(rmsg) + r.safeSend(r.ctx, reply) + return + } + + unlock := mkUnlocker(r.Lock) + r.logXdr(proc.GetArg(), "<-%s CALL(xid=%d) %s", m.Peer, msg.Xid, proc.ProcName()) + proc.SetContext(context.WithValue(r.ctx, ctxKey, &srvCtx{ + peerCtx: peerCtx{Peer: m.Peer}, + unlock: unlock, + })) + + q := func() { + defer func() { + unlock() + if i := recover(); i != nil { + if r.PanicHandler != nil { + r.PanicHandler.PanicHandle(i) + } else { + fmt.Fprintf(os.Stderr, "%s\n", i) } - reply := r.msgPool.NewMessage(m.Peer) - reply.Serialize(rmsg) if IsSuccess(rmsg) { - reply.Serialize(proc.GetRes()) - r.logXdr(proc.GetRes(), "->%s REPLY(xid=%d) %s", - m.Peer, msg.Xid, proc.ProcName()) + SetStat(rmsg, SYSTEM_ERR) } - m.Recycle() - r.safeSend(r.ctx, reply) - }() - proc.Do() - } - if r.Lock == nil { - go q() - } else { - q() - } + } + reply := r.msgPool.NewMessage(m.Peer) + reply.Serialize(rmsg) + if IsSuccess(rmsg) { + reply.Serialize(proc.GetRes()) + r.logXdr(proc.GetRes(), "->%s REPLY(xid=%d) %s", + m.Peer, msg.Xid, proc.ProcName()) + } + m.Recycle() + r.safeSend(r.ctx, reply) + }() + proc.Do() } - r.Close() - r.cs.CancelAll() + + if r.Lock == nil { + go q() + } else { + q() + } + } From 5128944ae9400e13116a66266507e75a5800555b Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Thu, 24 Jul 2025 11:59:31 -0700 Subject: [PATCH 16/51] adjust par to 10 --- rpc/driver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/driver.go b/rpc/driver.go index c29d31b..1f1e157 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -257,7 +257,7 @@ func NewDriver(ctx context.Context, mp *MsgPool, t Transport) *Driver { t.Close() close(ret.outClose) }() - for i := 0; i < 5; i++ { //XXX + for i := 0; i < 10; i++ { //XXX go ret.doMsgs() } From 04234aac9c2a8f4dca517119641ed3f500affcef Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Thu, 24 Jul 2025 12:04:48 -0700 Subject: [PATCH 17/51] add a buffer to the message processing channel --- rpc/driver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/driver.go b/rpc/driver.go index 1f1e157..b20b577 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -247,7 +247,7 @@ func NewDriver(ctx context.Context, mp *MsgPool, t Transport) *Driver { cancel: cancel, in: ReceiveChan(ctx, t), msgPool: mp, - msgCh: make(chan *Message), + msgCh: make(chan *Message, 100), //XXX } ret.out, ret.outClose = SendChan(t, func(xid uint32, _ error) { ret.cs.Cancel(xid, SEND_ERR) From 8deae8c9f3779000924a170d538929c665afcf4f Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Thu, 24 Jul 2025 12:10:31 -0700 Subject: [PATCH 18/51] add buffer to send/receive msg chans --- rpc/driver.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rpc/driver.go b/rpc/driver.go index b20b577..7dccd30 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -96,7 +96,7 @@ func Detach(ctx context.Context) { // is Done or when the transport returns an error. Does not close the // Transport. func ReceiveChan(ctx context.Context, t Transport) <-chan *Message { - ret := make(chan *Message) + ret := make(chan *Message, 100) //XXX: used to be 0 go func(c chan<- *Message) { for { m, err := t.Receive() @@ -123,7 +123,7 @@ func ReceiveChan(ctx context.Context, t Transport) <-chan *Message { // a thread that won't exit until the returned channel is closed. // Does not close the underlying Transport. func SendChan(t Transport, onErr func(uint32, error)) (chan<- *Message, chan<- struct{}) { - ch := make(chan *Message, 1) + ch := make(chan *Message, 100) //XXX: used to be 1 chClose := make(chan struct{}) go func(ch <-chan *Message, cancel <-chan struct{}) { for { From c2515280b35823a61ee495b27277df02633e31df Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Thu, 24 Jul 2025 14:03:50 -0700 Subject: [PATCH 19/51] simplify par msg processing --- rpc/driver.go | 43 +++++++++++++++++++++++-------------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/rpc/driver.go b/rpc/driver.go index 7dccd30..67d5521 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -202,7 +202,6 @@ type Driver struct { started int32 msgPool *MsgPool - msgCh chan *Message } // PanicHandler defines a handler for panics arising from service method implementations. @@ -247,7 +246,6 @@ func NewDriver(ctx context.Context, mp *MsgPool, t Transport) *Driver { cancel: cancel, in: ReceiveChan(ctx, t), msgPool: mp, - msgCh: make(chan *Message, 100), //XXX } ret.out, ret.outClose = SendChan(t, func(xid uint32, _ error) { ret.cs.Cancel(xid, SEND_ERR) @@ -257,9 +255,6 @@ func NewDriver(ctx context.Context, mp *MsgPool, t Transport) *Driver { t.Close() close(ret.outClose) }() - for i := 0; i < 10; i++ { //XXX - go ret.doMsgs() - } return &ret } @@ -341,29 +336,37 @@ func (r *Driver) Go() { if atomic.SwapInt32(&r.started, 1) == 1 { panic("rpc.Driver.Go called multiple times") } -loop: - for { - select { - case <-r.ctx.Done(): - break loop - case m := <-r.in: - select { - case <-r.ctx.Done(): - break loop - case r.msgCh <- m: - } - } + for i := 0; i < 10; i++ { //XXX + go r.doMsgs() } - r.Close() - r.cs.CancelAll() + // loop: + // + // for { + // select { + // case <-r.ctx.Done(): + // break loop + // case m := <-r.in: + // select { + // case <-r.ctx.Done(): + // break loop + // case r.msgCh <- m: + // } + // } + // } + // r.Close() + // r.cs.CancelAll() } func (r *Driver) doMsgs() { + defer func() { + r.Close() + r.cs.CancelAll() + }() for { select { case <-r.ctx.Done(): return - case m := <-r.msgCh: + case m := <-r.in: r.doMsg(m) } } From 18190a9a935f2f5de1470a076fedb1808b95ede1 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Thu, 24 Jul 2025 14:43:43 -0700 Subject: [PATCH 20/51] implement msg pool with arena --- rpc/driver.go | 16 -------- rpc/msg.go | 105 +++++++++++++++++++++++++++++++++++++++++++++++ rpc/transport.go | 65 ----------------------------- xdr/arena.go | 18 +++++--- 4 files changed, 118 insertions(+), 86 deletions(-) create mode 100644 rpc/msg.go diff --git a/rpc/driver.go b/rpc/driver.go index 67d5521..2e4c26d 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -339,22 +339,6 @@ func (r *Driver) Go() { for i := 0; i < 10; i++ { //XXX go r.doMsgs() } - // loop: - // - // for { - // select { - // case <-r.ctx.Done(): - // break loop - // case m := <-r.in: - // select { - // case <-r.ctx.Done(): - // break loop - // case r.msgCh <- m: - // } - // } - // } - // r.Close() - // r.cs.CancelAll() } func (r *Driver) doMsgs() { diff --git a/rpc/msg.go b/rpc/msg.go new file mode 100644 index 0000000..74a1d6c --- /dev/null +++ b/rpc/msg.go @@ -0,0 +1,105 @@ +package rpc + +import ( + "bytes" + + "github.com/xdrpp/goxdr/xdr" +) + +type Message struct { + *bytes.Buffer + Peer string + pool *MsgPool +} + +type MsgPool struct { + arena *xdr.Arena[Message] +} + +func NewMsgPool() *MsgPool { + return NewMsgPoolCap(10000) +} + +func NewMsgPoolCap(cap int) *MsgPool { + msgPool := &MsgPool{} + msgPool.arena = xdr.NewArena( + cap, + func(m *Message) { + m.Buffer = &bytes.Buffer{} + m.pool = msgPool + }, + func(m *Message) { + m.Peer = "" + m.Buffer.Reset() + }, + ) + return msgPool +} + +func (msgPool *MsgPool) NewMessage(peer string) *Message { + msg := msgPool.arena.Get() + msg.Peer = peer + return msg +} + +func (msgPool *MsgPool) Reycle(msg *Message) { + msgPool.arena.Recycle(msg) +} + +/* +type MsgPool xdr.Pool[*Message] + +func NewMsgPool() *MsgPool { + pool := &xdr.Pool[*Message]{} + pool.SetMkReset( + func() *Message { + return &Message{Buffer: &bytes.Buffer{}, pool: (*MsgPool)(pool)} + }, + func(m *Message) { + m.Peer = "" + m.Buffer.Reset() + }, + ) + return (*MsgPool)(pool) +} + +func (msgPool *MsgPool) NewMessage(peer string) *Message { + msg := (*xdr.Pool[*Message])(msgPool).Get() + msg.Peer = peer + return msg +} + +func (msgPool *MsgPool) Reycle(msg *Message) { + (*xdr.Pool[*Message])(msgPool).Recycle(msg) +} +*/ + +func (m *Message) Recycle() { + if m == nil { + panic("XXX m=nil") + } + m.pool.Reycle(m) +} + +func (m *Message) In() xdr.XDR { + return xdr.XdrIn{In: m} +} + +func (m *Message) Out() xdr.XDR { + return xdr.XdrOut{Out: m} +} + +func (m *Message) Xid() uint32 { + if bs := m.Bytes(); len(bs) >= 4 { + return uint32(bs[0])<<24 | uint32(bs[1])<<16 | + uint32(bs[2])<<8 | uint32(bs[3]) + } + return 0 +} + +func (m *Message) Serialize(vs ...xdr.XdrType) { + out := m.Out() + for i := range vs { + vs[i].XdrMarshal(out, "") + } +} diff --git a/rpc/transport.go b/rpc/transport.go index 1aa9b1e..3e5af8c 100644 --- a/rpc/transport.go +++ b/rpc/transport.go @@ -1,78 +1,13 @@ package rpc import ( - "bytes" "encoding/binary" "fmt" "io" "net" "sync/atomic" - - "github.com/xdrpp/goxdr/xdr" ) -type Message struct { - *bytes.Buffer - Peer string - pool *MsgPool -} - -type MsgPool xdr.Pool[*Message] - -func NewMsgPool() *MsgPool { - pool := &xdr.Pool[*Message]{} - pool.SetMkReset( - func() *Message { - return &Message{Buffer: &bytes.Buffer{}, pool: (*MsgPool)(pool)} - }, - func(m *Message) { - m.Peer = "" - m.Buffer.Reset() - }, - ) - return (*MsgPool)(pool) -} - -func (msgPool *MsgPool) NewMessage(peer string) *Message { - msg := (*xdr.Pool[*Message])(msgPool).Get() - msg.Peer = peer - return msg -} - -func (msgPool *MsgPool) Reycle(msg *Message) { - (*xdr.Pool[*Message])(msgPool).Recycle(msg) -} - -func (m *Message) Recycle() { - if m == nil { - panic("XXX m=nil") - } - m.pool.Reycle(m) -} - -func (m *Message) In() xdr.XDR { - return xdr.XdrIn{In: m} -} - -func (m *Message) Out() xdr.XDR { - return xdr.XdrOut{Out: m} -} - -func (m *Message) Xid() uint32 { - if bs := m.Bytes(); len(bs) >= 4 { - return uint32(bs[0])<<24 | uint32(bs[1])<<16 | - uint32(bs[2])<<8 | uint32(bs[3]) - } - return 0 -} - -func (m *Message) Serialize(vs ...xdr.XdrType) { - out := m.Out() - for i := range vs { - vs[i].XdrMarshal(out, "") - } -} - // Abstraction for an RPC transport. Note that Send() should not be // called multiple times concurrently and Receive() should not be // called multiple times concurrently, but it is okay to call Send() diff --git a/xdr/arena.go b/xdr/arena.go index b760573..74e8b9c 100644 --- a/xdr/arena.go +++ b/xdr/arena.go @@ -2,12 +2,16 @@ package xdr import ( "fmt" + "os" "sync" "unsafe" ) type Arena[T any] struct { + init func(*T) + reset func(*T) objects []T + // lk sync.Mutex free []*T used []*T @@ -15,12 +19,15 @@ type Arena[T any] struct { numMiss int } -func NewArena[T any](n int) *Arena[T] { +func NewArena[T any](n int, init func(*T), reset func(*T)) *Arena[T] { a := &Arena[T]{ + init: init, + reset: reset, objects: make([]T, n), free: make([]*T, n), } for i := range a.objects { + init(&a.objects[i]) a.free[i] = &a.objects[i] } return a @@ -31,9 +38,11 @@ func (a *Arena[T]) Get() *T { defer a.lk.Unlock() if len(a.free) == 0 { + fmt.Fprintf(os.Stderr, "xdr rpc arena miss\n") a.numMiss += 1 - var z T - return &z + var obj T + a.init(&obj) + return &obj } a.numGet += 1 @@ -41,8 +50,7 @@ func (a *Arena[T]) Get() *T { ptr := a.free[len(a.free)-1] a.free = a.free[:len(a.free)-1] - var z T // reset - *ptr = z + a.reset(ptr) return ptr } From f15d546e2f4e8f0722cadbf3c88cb5b65e3e18c8 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Thu, 24 Jul 2025 15:26:21 -0700 Subject: [PATCH 21/51] install latency meters in sendchan and receivechan --- rpc/driver.go | 25 +++++++++++ stat/counter.go | 116 ++++++++++++++++++++++++++++++++++++++++++++++++ stat/latency.go | 95 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 236 insertions(+) create mode 100644 stat/counter.go create mode 100644 stat/latency.go diff --git a/rpc/driver.go b/rpc/driver.go index 2e4c26d..89ef258 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -9,6 +9,7 @@ import ( "sync" "sync/atomic" + "github.com/xdrpp/goxdr/stat" "github.com/xdrpp/goxdr/xdr" ) @@ -98,8 +99,20 @@ func Detach(ctx context.Context) { func ReceiveChan(ctx context.Context, t Transport) <-chan *Message { ret := make(chan *Message, 100) //XXX: used to be 0 go func(c chan<- *Message) { + + var receiveLat stat.LatencyMeter + var betweenLat stat.LatencyMeter + defer func() { + fmt.Printf("ReceiveChan: receive-latency=%v between-latency=%v\n", &receiveLat, &betweenLat) + }() + + bspan := betweenLat.Start() for { + bspan.Stop() + rspan := receiveLat.Start() m, err := t.Receive() + rspan.Stop() + bspan = betweenLat.Start() if err != nil { if err != io.EOF { fmt.Fprintf(os.Stderr, "ReceiveChan: %s\n", err) @@ -126,6 +139,14 @@ func SendChan(t Transport, onErr func(uint32, error)) (chan<- *Message, chan<- s ch := make(chan *Message, 100) //XXX: used to be 1 chClose := make(chan struct{}) go func(ch <-chan *Message, cancel <-chan struct{}) { + + var sendLat stat.LatencyMeter + var betweenLat stat.LatencyMeter + defer func() { + fmt.Printf("SendChan: send-latency=%v between-latency=%v\n", &sendLat, &betweenLat) + }() + + bspan := betweenLat.Start() for { select { case <-cancel: @@ -135,7 +156,11 @@ func SendChan(t Transport, onErr func(uint32, error)) (chan<- *Message, chan<- s return } else { xid := m.Xid() + bspan.Stop() + sspan := sendLat.Start() err := t.Send(m) + sspan.Stop() + bspan = betweenLat.Start() if err != nil && onErr != nil { onErr(xid, err) } diff --git a/stat/counter.go b/stat/counter.go new file mode 100644 index 0000000..fdc6a2b --- /dev/null +++ b/stat/counter.go @@ -0,0 +1,116 @@ +package stat + +import ( + "math" + "sync" + "time" +) + +type Counter struct { + lk sync.Mutex + first time.Time + last time.Time + sum float64 + sum2 float64 + min float64 + max float64 + count int +} + +func (x *Counter) Add(v float64) int { + x.lk.Lock() + defer x.lk.Unlock() + + if x.first.IsZero() { + x.first = time.Now() + x.min = v + x.max = v + // discard first v for purposes of sum, average, stddev + } else { + x.min = min(x.min, v) + x.max = max(x.max, v) + x.sum += v + x.sum2 += v * v + x.count += 1 + x.last = time.Now() + } + return x.count +} + +func (x *Counter) Count() int { + x.lk.Lock() + defer x.lk.Unlock() + return x.count +} + +func (x *Counter) Min() float64 { + x.lk.Lock() + defer x.lk.Unlock() + return x.min +} + +func (x *Counter) Max() float64 { + x.lk.Lock() + defer x.lk.Unlock() + return x.max +} + +func (x *Counter) Sum() float64 { + x.lk.Lock() + defer x.lk.Unlock() + return x.sum +} + +func (x *Counter) Average() float64 { + x.lk.Lock() + defer x.lk.Unlock() + return x.average() +} + +func (x *Counter) average() float64 { + return x.sum / float64(x.count) +} + +func (x *Counter) Stddev() float64 { + x.lk.Lock() + defer x.lk.Unlock() + return x.stddev() +} + +func (x *Counter) stddev() float64 { + avg := x.sum / float64(x.count) + avg2 := x.sum2 / float64(x.count) + return math.Sqrt(avg2 - avg*avg) +} + +func (x *Counter) SpeedPerSec() float64 { + x.lk.Lock() + defer x.lk.Unlock() + return x.speedPerSec() +} + +func (x *Counter) speedPerSec() float64 { + return x.sum / x.last.Sub(x.first).Seconds() +} + +type CounterStats struct { + Min float64 + Max float64 + Average float64 + Stddev float64 + Count int + SpeedPerSec float64 +} + +func (x *Counter) Stats() CounterStats { + x.lk.Lock() + defer x.lk.Unlock() + return CounterStats{ + Min: x.min, + Max: x.max, + Average: x.average(), + Stddev: x.stddev(), + Count: x.count, + SpeedPerSec: x.speedPerSec(), + } +} diff --git a/stat/latency.go b/stat/latency.go new file mode 100644 index 0000000..6f2a364 --- /dev/null +++ b/stat/latency.go @@ -0,0 +1,95 @@ +package stat + +import ( + "fmt" + "time" +) + +type LatencyMeter struct { + ctr Counter +} + +type LatencyMeterSpan struct { + meter *LatencyMeter + start time.Time +} + +func (x *LatencyMeter) Start() LatencyMeterSpan { + return LatencyMeterSpan{ + meter: x, + start: time.Now(), + } +} + +func (x *LatencyMeter) StartAt(s time.Time) LatencyMeterSpan { + return LatencyMeterSpan{ + meter: x, + start: s, + } +} + +func (x LatencyMeterSpan) Stop() { + x.meter.Add(time.Since(x.start)) +} + +func (x *LatencyMeter) Add(d time.Duration) int { + return x.ctr.Add(float64(d.Nanoseconds())) +} + +func (x *LatencyMeter) Average() time.Duration { + return time.Duration(x.ctr.Average()) +} + +func (x *LatencyMeter) Min() time.Duration { + return time.Duration(x.ctr.Min()) +} + +func (x *LatencyMeter) Max() time.Duration { + return time.Duration(x.ctr.Max()) +} + +func (x *LatencyMeter) Stddev() time.Duration { + return time.Duration(x.ctr.Stddev()) +} + +func (x *LatencyMeter) String() string { + return fmt.Sprintf("%s±%s [%s:%s]", + fmtDur(x.Average()), fmtDur(x.Stddev()), fmtDur(x.Min()), fmtDur(x.Max())) +} + +type LatencyStats struct { + Summary string + NSec CounterStats +} + +func (x LatencyStats) Summarize() string { + return fmt.Sprintf("%s±%s [%s:%s]", + fmtDur(time.Duration(x.NSec.Average)), + fmtDur(time.Duration(x.NSec.Stddev)), + fmtDur(time.Duration(x.NSec.Min)), + fmtDur(time.Duration(x.NSec.Max)), + ) +} + +func (x *LatencyMeter) Stats() LatencyStats { + s := LatencyStats{NSec: x.ctr.Stats()} + s.Summary = s.Summarize() + return s +} + +func fmtDur(d time.Duration) string { + switch { + case d >= time.Hour: + return fmt.Sprintf("%.1fh", d.Hours()) + case d >= time.Minute: + return fmt.Sprintf("%dm", int64(d.Minutes())) + case d >= time.Second: + return fmt.Sprintf("%ds", int64(d.Seconds())) + case d >= time.Millisecond: + return fmt.Sprintf("%dms", int64(d.Milliseconds())) + case d >= time.Microsecond: + return fmt.Sprintf("%dµs", int64(d.Microseconds())) + default: + return fmt.Sprintf("%dns", int64(d.Nanoseconds())) + } +} From 082365eff3edf4cf16d1317a6d26acc79e3c5381 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Thu, 24 Jul 2025 15:34:19 -0700 Subject: [PATCH 22/51] adjust stat printout --- rpc/driver.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/rpc/driver.go b/rpc/driver.go index 89ef258..6b7073a 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -102,12 +102,14 @@ func ReceiveChan(ctx context.Context, t Transport) <-chan *Message { var receiveLat stat.LatencyMeter var betweenLat stat.LatencyMeter - defer func() { - fmt.Printf("ReceiveChan: receive-latency=%v between-latency=%v\n", &receiveLat, &betweenLat) - }() bspan := betweenLat.Start() + i := 0 for { + i += 1 + if i%100000 == 0 { + fmt.Printf("ReceiveChan: receive-latency=%v between-latency=%v\n", &receiveLat, &betweenLat) + } bspan.Stop() rspan := receiveLat.Start() m, err := t.Receive() @@ -142,12 +144,14 @@ func SendChan(t Transport, onErr func(uint32, error)) (chan<- *Message, chan<- s var sendLat stat.LatencyMeter var betweenLat stat.LatencyMeter - defer func() { - fmt.Printf("SendChan: send-latency=%v between-latency=%v\n", &sendLat, &betweenLat) - }() bspan := betweenLat.Start() + i := 0 for { + i += 1 + if i%100000 == 0 { + fmt.Printf("SendChan: send-latency=%v between-latency=%v\n", &sendLat, &betweenLat) + } select { case <-cancel: return From 145daf0e4c2ca2bfc16857243af36a03a9838873 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Thu, 24 Jul 2025 22:44:50 -0700 Subject: [PATCH 23/51] fix small bytes pool; use small bytes pool for xdrGet32 and xdrGet64 --- cmd/goxdr/header.go.in | 26 +++++++++++++++++++++++--- xdr/bpool.go | 7 ++++--- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/cmd/goxdr/header.go.in b/cmd/goxdr/header.go.in index db2540e..d816043 100644 --- a/cmd/goxdr/header.go.in +++ b/cmd/goxdr/header.go.in @@ -683,17 +683,37 @@ func xdrReadPad(in io.Reader, n uint32) { } } +// func xdrGet32(in io.Reader) uint32 { +// b := xdrReadN(in, 4) +// return binary.BigEndian.Uint32(b) +// } +// +// func xdrGet64(in io.Reader) uint64 { +// b := xdrReadN(in, 8) +// return binary.BigEndian.Uint64(b) +// } + +func xdrReadInto(in io.Reader, p []byte) []byte { + if k, err := in.Read(p); err != nil || k != len(p) { + XdrPanic("%s", err.Error()) + } + return p +} + +var smallPool = NewSmallBytesPool() + func xdrGet32(in io.Reader) uint32 { - b := xdrReadN(in, 4) + b := xdrReadInto(in, smallPool.Get(4)) + defer smallPool.Recycle(b) return binary.BigEndian.Uint32(b) } func xdrGet64(in io.Reader) uint64 { - b := xdrReadN(in, 8) + b := xdrReadInto(in, smallPool.Get(8)) + defer smallPool.Recycle(b) return binary.BigEndian.Uint64(b) } - // XDR that unmarshals from canonical binary format type XdrIn struct { In io.Reader diff --git a/xdr/bpool.go b/xdr/bpool.go index 75cb7a7..ddd409c 100644 --- a/xdr/bpool.go +++ b/xdr/bpool.go @@ -21,9 +21,10 @@ func NewSmallBytesPool() *SmallBytesPool { } } -func (x *SmallBytesPool) Get(size int) []byte { - // XXX: must zero out - return x.size[size-1].Get().([]byte) +func (x *SmallBytesPool) Get(len int) []byte { + p := x.size[len-1].Get().([]byte) + clear(p) + return p } func (x *SmallBytesPool) Recycle(b []byte) { From 0a3a9ef9517ee208ba60f459a3b69aa7b4639c23 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Thu, 24 Jul 2025 22:47:20 -0700 Subject: [PATCH 24/51] revert to original xdrGet32/64 --- cmd/goxdr/header.go.in | 46 +++++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/cmd/goxdr/header.go.in b/cmd/goxdr/header.go.in index d816043..78c7eda 100644 --- a/cmd/goxdr/header.go.in +++ b/cmd/goxdr/header.go.in @@ -683,37 +683,37 @@ func xdrReadPad(in io.Reader, n uint32) { } } -// func xdrGet32(in io.Reader) uint32 { -// b := xdrReadN(in, 4) -// return binary.BigEndian.Uint32(b) -// } -// -// func xdrGet64(in io.Reader) uint64 { -// b := xdrReadN(in, 8) -// return binary.BigEndian.Uint64(b) -// } - -func xdrReadInto(in io.Reader, p []byte) []byte { - if k, err := in.Read(p); err != nil || k != len(p) { - XdrPanic("%s", err.Error()) - } - return p -} - -var smallPool = NewSmallBytesPool() - func xdrGet32(in io.Reader) uint32 { - b := xdrReadInto(in, smallPool.Get(4)) - defer smallPool.Recycle(b) + b := xdrReadN(in, 4) return binary.BigEndian.Uint32(b) } func xdrGet64(in io.Reader) uint64 { - b := xdrReadInto(in, smallPool.Get(8)) - defer smallPool.Recycle(b) + b := xdrReadN(in, 8) return binary.BigEndian.Uint64(b) } +// func xdrReadInto(in io.Reader, p []byte) []byte { +// if k, err := in.Read(p); err != nil || k != len(p) { +// XdrPanic("%s", err.Error()) +// } +// return p +// } +// +// var smallPool = NewSmallBytesPool() +// +// func xdrGet32(in io.Reader) uint32 { +// b := xdrReadInto(in, smallPool.Get(4)) +// defer smallPool.Recycle(b) +// return binary.BigEndian.Uint32(b) +// } +// +// func xdrGet64(in io.Reader) uint64 { +// b := xdrReadInto(in, smallPool.Get(8)) +// defer smallPool.Recycle(b) +// return binary.BigEndian.Uint64(b) +// } + // XDR that unmarshals from canonical binary format type XdrIn struct { In io.Reader From d215576fbe794bb47169f5bbcf8dd87d1becb3cf Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Fri, 25 Jul 2025 15:31:45 -0700 Subject: [PATCH 25/51] make operation parameter of driver customizable --- rpc/driver.go | 41 +++++++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/rpc/driver.go b/rpc/driver.go index 6b7073a..369817f 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -96,8 +96,8 @@ func Detach(ctx context.Context) { // Creates a thread that closes the channel and exits when the Context // is Done or when the transport returns an error. Does not close the // Transport. -func ReceiveChan(ctx context.Context, t Transport) <-chan *Message { - ret := make(chan *Message, 100) //XXX: used to be 0 +func ReceiveChan(ctx context.Context, t Transport, recvQueueLen int) <-chan *Message { + ret := make(chan *Message, recvQueueLen) go func(c chan<- *Message) { var receiveLat stat.LatencyMeter @@ -137,8 +137,8 @@ func ReceiveChan(ctx context.Context, t Transport) <-chan *Message { // Create a channel for sending messages through a Transport. Creates // a thread that won't exit until the returned channel is closed. // Does not close the underlying Transport. -func SendChan(t Transport, onErr func(uint32, error)) (chan<- *Message, chan<- struct{}) { - ch := make(chan *Message, 100) //XXX: used to be 1 +func SendChan(t Transport, onErr func(uint32, error), sendQueueLen int) (chan<- *Message, chan<- struct{}) { + ch := make(chan *Message, sendQueueLen) // queue len must be at least 1 chClose := make(chan struct{}) go func(ch <-chan *Message, cancel <-chan struct{}) { @@ -231,6 +231,7 @@ type Driver struct { started int32 msgPool *MsgPool + numProc int } // PanicHandler defines a handler for panics arising from service method implementations. @@ -263,7 +264,22 @@ func (r *Driver) logXdr(t xdr.XdrType, f string, args ...interface{}) { // // If you will never need to cancel the driver, or plan to do so by // calling Close(), then you may supply a nil ctx. -func NewDriver(ctx context.Context, mp *MsgPool, t Transport) *Driver { +func NewDriver( + ctx context.Context, + mp *MsgPool, + t Transport, +) *Driver { + return NewDriverTuned(ctx, mp, t, 100, 100, 5) +} + +func NewDriverTuned( + ctx context.Context, + mp *MsgPool, + t Transport, + recvQueueLen int, + sendQueueLen int, + numProc int, +) *Driver { if ctx == nil { ctx = context.Background() } @@ -273,12 +289,17 @@ func NewDriver(ctx context.Context, mp *MsgPool, t Transport) *Driver { Lock: &sync.Mutex{}, ctx: ctx, cancel: cancel, - in: ReceiveChan(ctx, t), + in: ReceiveChan(ctx, t, recvQueueLen), msgPool: mp, + numProc: numProc, } - ret.out, ret.outClose = SendChan(t, func(xid uint32, _ error) { - ret.cs.Cancel(xid, SEND_ERR) - }) + ret.out, ret.outClose = SendChan( + t, + func(xid uint32, _ error) { + ret.cs.Cancel(xid, SEND_ERR) + }, + sendQueueLen, + ) go func() { <-ctx.Done() t.Close() @@ -365,7 +386,7 @@ func (r *Driver) Go() { if atomic.SwapInt32(&r.started, 1) == 1 { panic("rpc.Driver.Go called multiple times") } - for i := 0; i < 10; i++ { //XXX + for i := 0; i < r.numProc; i++ { go r.doMsgs() } } From 68cf81aca59634815d6c52a5e117110ff21ff26d Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Fri, 25 Jul 2025 15:38:58 -0700 Subject: [PATCH 26/51] fix test --- rpc/rpc_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index 3cfa9ad..f77bd42 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -68,9 +68,9 @@ func TestChannels(t *testing.T) { cs := streampair() tx1, tx2 := rpc.NewStreamTransport(cs[0], msgPool), rpc.NewStreamTransport(cs[1], msgPool) - r := rpc.ReceiveChan(ctx, tx1) + r := rpc.ReceiveChan(ctx, tx1, 10) defer tx1.Close() - s, _ := rpc.SendChan(tx2, nil) + s, _ := rpc.SendChan(tx2, nil, 10) go func() { defer close(s) defer tx2.Close() From bfd977dc9616f5d6c8bc1dd184bccc4a8d328955 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Sun, 27 Jul 2025 08:15:31 -0700 Subject: [PATCH 27/51] adjust params --- rpc/driver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/driver.go b/rpc/driver.go index 369817f..0a3140e 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -269,7 +269,7 @@ func NewDriver( mp *MsgPool, t Transport, ) *Driver { - return NewDriverTuned(ctx, mp, t, 100, 100, 5) + return NewDriverTuned(ctx, mp, t, 10, 10, 3) } func NewDriverTuned( From 5bef373e12de584dd81cecf778b0e1a984fb5b9e Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Sun, 27 Jul 2025 14:05:18 -0700 Subject: [PATCH 28/51] avoid bytes.Buffer allocation --- rpc/msg.go | 7 +++++-- xdr/arena.go | 6 +++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/rpc/msg.go b/rpc/msg.go index 74a1d6c..d32f99e 100644 --- a/rpc/msg.go +++ b/rpc/msg.go @@ -7,7 +7,7 @@ import ( ) type Message struct { - *bytes.Buffer + bytes.Buffer Peer string pool *MsgPool } @@ -25,7 +25,6 @@ func NewMsgPoolCap(cap int) *MsgPool { msgPool.arena = xdr.NewArena( cap, func(m *Message) { - m.Buffer = &bytes.Buffer{} m.pool = msgPool }, func(m *Message) { @@ -36,6 +35,10 @@ func NewMsgPoolCap(cap int) *MsgPool { return msgPool } +func (msgPool *MsgPool) StatString() string { + return msgPool.arena.StatString() +} + func (msgPool *MsgPool) NewMessage(peer string) *Message { msg := msgPool.arena.Get() msg.Peer = peer diff --git a/xdr/arena.go b/xdr/arena.go index 74e8b9c..ce367b1 100644 --- a/xdr/arena.go +++ b/xdr/arena.go @@ -14,7 +14,6 @@ type Arena[T any] struct { // lk sync.Mutex free []*T - used []*T numGet int numMiss int } @@ -34,6 +33,7 @@ func NewArena[T any](n int, init func(*T), reset func(*T)) *Arena[T] { } func (a *Arena[T]) Get() *T { + a.lk.Lock() defer a.lk.Unlock() @@ -67,6 +67,10 @@ func (a *Arena[T]) Recycle(x *T) { func (a *Arena[T]) StatString() string { a.lk.Lock() defer a.lk.Unlock() + return a.statString() +} + +func (a *Arena[T]) statString() string { return fmt.Sprintf("num_get=%d num_miss=%d miss_ratio=%.3f%%", a.numGet, a.numMiss, 100*float64(a.numMiss)/float64(a.numGet)) } From 32e3fb6ff08fda684fe85765b03afc511c4b918f Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Tue, 29 Jul 2025 11:26:12 -0700 Subject: [PATCH 29/51] put msgpool behind an interface --- rpc/driver.go | 6 ++-- rpc/msg.go | 68 -------------------------------------------- rpc/msgpool.go | 74 ++++++++++++++++++++++++++++++++++++++++++++++++ rpc/transport.go | 4 +-- 4 files changed, 79 insertions(+), 73 deletions(-) create mode 100644 rpc/msgpool.go diff --git a/rpc/driver.go b/rpc/driver.go index 0a3140e..aaa2a30 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -230,7 +230,7 @@ type Driver struct { cs CallSet started int32 - msgPool *MsgPool + msgPool MessagePool numProc int } @@ -266,7 +266,7 @@ func (r *Driver) logXdr(t xdr.XdrType, f string, args ...interface{}) { // calling Close(), then you may supply a nil ctx. func NewDriver( ctx context.Context, - mp *MsgPool, + mp MessagePool, t Transport, ) *Driver { return NewDriverTuned(ctx, mp, t, 10, 10, 3) @@ -274,7 +274,7 @@ func NewDriver( func NewDriverTuned( ctx context.Context, - mp *MsgPool, + mp MessagePool, t Transport, recvQueueLen int, sendQueueLen int, diff --git a/rpc/msg.go b/rpc/msg.go index d32f99e..0d83a74 100644 --- a/rpc/msg.go +++ b/rpc/msg.go @@ -12,75 +12,7 @@ type Message struct { pool *MsgPool } -type MsgPool struct { - arena *xdr.Arena[Message] -} - -func NewMsgPool() *MsgPool { - return NewMsgPoolCap(10000) -} - -func NewMsgPoolCap(cap int) *MsgPool { - msgPool := &MsgPool{} - msgPool.arena = xdr.NewArena( - cap, - func(m *Message) { - m.pool = msgPool - }, - func(m *Message) { - m.Peer = "" - m.Buffer.Reset() - }, - ) - return msgPool -} - -func (msgPool *MsgPool) StatString() string { - return msgPool.arena.StatString() -} - -func (msgPool *MsgPool) NewMessage(peer string) *Message { - msg := msgPool.arena.Get() - msg.Peer = peer - return msg -} - -func (msgPool *MsgPool) Reycle(msg *Message) { - msgPool.arena.Recycle(msg) -} - -/* -type MsgPool xdr.Pool[*Message] - -func NewMsgPool() *MsgPool { - pool := &xdr.Pool[*Message]{} - pool.SetMkReset( - func() *Message { - return &Message{Buffer: &bytes.Buffer{}, pool: (*MsgPool)(pool)} - }, - func(m *Message) { - m.Peer = "" - m.Buffer.Reset() - }, - ) - return (*MsgPool)(pool) -} - -func (msgPool *MsgPool) NewMessage(peer string) *Message { - msg := (*xdr.Pool[*Message])(msgPool).Get() - msg.Peer = peer - return msg -} - -func (msgPool *MsgPool) Reycle(msg *Message) { - (*xdr.Pool[*Message])(msgPool).Recycle(msg) -} -*/ - func (m *Message) Recycle() { - if m == nil { - panic("XXX m=nil") - } m.pool.Reycle(m) } diff --git a/rpc/msgpool.go b/rpc/msgpool.go new file mode 100644 index 0000000..2b35a4b --- /dev/null +++ b/rpc/msgpool.go @@ -0,0 +1,74 @@ +package rpc + +import "github.com/xdrpp/goxdr/xdr" + +type MessagePool interface { + NewMessage(peer string) *Message + Reycle(msg *Message) + StatString() string +} + +type MsgPool struct { + arena *xdr.Arena[Message] +} + +func NewMsgPool() MessagePool { + return NewMsgPoolCap(5000) +} + +func NewMsgPoolCap(cap int) MessagePool { + msgPool := &MsgPool{} + msgPool.arena = xdr.NewArena( + cap, + func(m *Message) { + m.pool = msgPool + }, + func(m *Message) { + m.Peer = "" + m.Buffer.Reset() + }, + ) + return msgPool +} + +func (msgPool *MsgPool) StatString() string { + return msgPool.arena.StatString() +} + +func (msgPool *MsgPool) NewMessage(peer string) *Message { + msg := msgPool.arena.Get() + msg.Peer = peer + return msg +} + +func (msgPool *MsgPool) Reycle(msg *Message) { + msgPool.arena.Recycle(msg) +} + +/* +type MsgPool xdr.Pool[*Message] + +func NewMsgPool() *MsgPool { + pool := &xdr.Pool[*Message]{} + pool.SetMkReset( + func() *Message { + return &Message{Buffer: &bytes.Buffer{}, pool: (*MsgPool)(pool)} + }, + func(m *Message) { + m.Peer = "" + m.Buffer.Reset() + }, + ) + return (*MsgPool)(pool) +} + +func (msgPool *MsgPool) NewMessage(peer string) *Message { + msg := (*xdr.Pool[*Message])(msgPool).Get() + msg.Peer = peer + return msg +} + +func (msgPool *MsgPool) Reycle(msg *Message) { + (*xdr.Pool[*Message])(msgPool).Recycle(msg) +} +*/ diff --git a/rpc/transport.go b/rpc/transport.go index 3e5af8c..a7969c1 100644 --- a/rpc/transport.go +++ b/rpc/transport.go @@ -51,13 +51,13 @@ type StreamTransport struct { okay int32 // 1 = okay, 0 = failed, -1 = failing err error - msgPool *MsgPool + msgPool MessagePool } // Create a stream transport from a connected stream socket. This is // the only valid way to initialize a StreamTransport. You can // manually adjust MaxMsgSize after calling this function. -func NewStreamTransport(c net.Conn, mp *MsgPool) *StreamTransport { +func NewStreamTransport(c net.Conn, mp MessagePool) *StreamTransport { return &StreamTransport{ MaxMsgSize: 0x100000, Conn: c, From fb2a5d7f4d6d71a89c1aee5d85c49f986813fdf9 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Tue, 29 Jul 2025 11:37:06 -0700 Subject: [PATCH 30/51] remove init from arena --- rpc/msg.go | 2 +- rpc/msgpool.go | 18 ++++++++---------- rpc/rpc_test.go | 6 +++--- xdr/arena.go | 10 ++-------- 4 files changed, 14 insertions(+), 22 deletions(-) diff --git a/rpc/msg.go b/rpc/msg.go index 0d83a74..0c4fe40 100644 --- a/rpc/msg.go +++ b/rpc/msg.go @@ -9,7 +9,7 @@ import ( type Message struct { bytes.Buffer Peer string - pool *MsgPool + pool *msgArena } func (m *Message) Recycle() { diff --git a/rpc/msgpool.go b/rpc/msgpool.go index 2b35a4b..b22fe91 100644 --- a/rpc/msgpool.go +++ b/rpc/msgpool.go @@ -8,22 +8,20 @@ type MessagePool interface { StatString() string } -type MsgPool struct { +type msgArena struct { arena *xdr.Arena[Message] } -func NewMsgPool() MessagePool { - return NewMsgPoolCap(5000) +func NewMsgArena() MessagePool { + return NewMsgArenaCap(5000) } -func NewMsgPoolCap(cap int) MessagePool { - msgPool := &MsgPool{} +func NewMsgArenaCap(cap int) MessagePool { + msgPool := &msgArena{} msgPool.arena = xdr.NewArena( cap, func(m *Message) { m.pool = msgPool - }, - func(m *Message) { m.Peer = "" m.Buffer.Reset() }, @@ -31,17 +29,17 @@ func NewMsgPoolCap(cap int) MessagePool { return msgPool } -func (msgPool *MsgPool) StatString() string { +func (msgPool *msgArena) StatString() string { return msgPool.arena.StatString() } -func (msgPool *MsgPool) NewMessage(peer string) *Message { +func (msgPool *msgArena) NewMessage(peer string) *Message { msg := msgPool.arena.Get() msg.Peer = peer return msg } -func (msgPool *MsgPool) Reycle(msg *Message) { +func (msgPool *msgArena) Reycle(msg *Message) { msgPool.arena.Recycle(msg) } diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index f77bd42..a621684 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -59,7 +59,7 @@ func TestChannels(t *testing.T) { contents := []string{"one\n", "two\n", "three\n"} var ms []*rpc.Message - msgPool := rpc.NewMsgPool() + msgPool := rpc.NewMsgArena() for _, msg := range contents { m := msgPool.NewMessage("") m.WriteString(msg) @@ -95,7 +95,7 @@ func TestRPC(t *testing.T) { defer cancel() cs := streampair() - mp1 := rpc.NewMsgPool() + mp1 := rpc.NewMsgArena() r1 := rpc.NewDriver(ctx, mp1, rpc.NewStreamTransport(cs[0], mp1)) r1.Register(TEST_V1_Server{&Server{}}) go func() { @@ -103,7 +103,7 @@ func TestRPC(t *testing.T) { fmt.Println("loop1 returned") }() - mp2 := rpc.NewMsgPool() + mp2 := rpc.NewMsgArena() r2 := rpc.NewDriver(ctx, mp2, rpc.NewStreamTransport(cs[1], mp2)) r2.Log = os.Stderr go func() { diff --git a/xdr/arena.go b/xdr/arena.go index ce367b1..3be1ee9 100644 --- a/xdr/arena.go +++ b/xdr/arena.go @@ -8,7 +8,6 @@ import ( ) type Arena[T any] struct { - init func(*T) reset func(*T) objects []T // @@ -18,15 +17,13 @@ type Arena[T any] struct { numMiss int } -func NewArena[T any](n int, init func(*T), reset func(*T)) *Arena[T] { +func NewArena[T any](n int, reset func(*T)) *Arena[T] { a := &Arena[T]{ - init: init, reset: reset, objects: make([]T, n), free: make([]*T, n), } for i := range a.objects { - init(&a.objects[i]) a.free[i] = &a.objects[i] } return a @@ -41,7 +38,7 @@ func (a *Arena[T]) Get() *T { fmt.Fprintf(os.Stderr, "xdr rpc arena miss\n") a.numMiss += 1 var obj T - a.init(&obj) + a.reset(&obj) return &obj } @@ -80,12 +77,9 @@ func contains[T any](slice []T, ptr *T) bool { return false } - // Get pointer to first element first := unsafe.Pointer(&slice[0]) - // Get pointer to one past the last element last := unsafe.Pointer(uintptr(first) + uintptr(len(slice))*unsafe.Sizeof(slice[0])) - // Check if ptr is within the range [first, last) ptrAddr := unsafe.Pointer(ptr) return uintptr(ptrAddr) >= uintptr(first) && uintptr(ptrAddr) < uintptr(last) } From 6cacee547cbe90d3c8047202319802c47ca54f36 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Tue, 29 Jul 2025 12:05:41 -0700 Subject: [PATCH 31/51] swap in message pool --- rpc/msg.go | 2 +- rpc/msgpool.go | 67 ++++++++++++++++++++++++++++--------------------- rpc/rpc_test.go | 6 ++--- xdr/pool.go | 17 +++++++------ 4 files changed, 52 insertions(+), 40 deletions(-) diff --git a/rpc/msg.go b/rpc/msg.go index 0c4fe40..182051b 100644 --- a/rpc/msg.go +++ b/rpc/msg.go @@ -9,7 +9,7 @@ import ( type Message struct { bytes.Buffer Peer string - pool *msgArena + pool MessagePool } func (m *Message) Recycle() { diff --git a/rpc/msgpool.go b/rpc/msgpool.go index b22fe91..cd97bf0 100644 --- a/rpc/msgpool.go +++ b/rpc/msgpool.go @@ -1,6 +1,8 @@ package rpc -import "github.com/xdrpp/goxdr/xdr" +import ( + "github.com/xdrpp/goxdr/xdr" +) type MessagePool interface { NewMessage(peer string) *Message @@ -8,65 +10,72 @@ type MessagePool interface { StatString() string } -type msgArena struct { - arena *xdr.Arena[Message] +func NewMessagePool() MessagePool { + return NewMsgPool() + // return NewMsgArenaCap(5000) } -func NewMsgArena() MessagePool { - return NewMsgArenaCap(5000) +// msg arena + +type msgArena struct { + arena *xdr.Arena[Message] } func NewMsgArenaCap(cap int) MessagePool { - msgPool := &msgArena{} - msgPool.arena = xdr.NewArena( + msgArena := &msgArena{} + msgArena.arena = xdr.NewArena( cap, func(m *Message) { - m.pool = msgPool + m.pool = msgArena m.Peer = "" m.Buffer.Reset() }, ) - return msgPool + return msgArena } -func (msgPool *msgArena) StatString() string { - return msgPool.arena.StatString() +func (msgArena *msgArena) StatString() string { + return msgArena.arena.StatString() } -func (msgPool *msgArena) NewMessage(peer string) *Message { - msg := msgPool.arena.Get() +func (msgArena *msgArena) NewMessage(peer string) *Message { + msg := msgArena.arena.Get() msg.Peer = peer return msg } -func (msgPool *msgArena) Reycle(msg *Message) { - msgPool.arena.Recycle(msg) +func (msgArena *msgArena) Reycle(msg *Message) { + msgArena.arena.Recycle(msg) } -/* -type MsgPool xdr.Pool[*Message] +// msg pool -func NewMsgPool() *MsgPool { - pool := &xdr.Pool[*Message]{} - pool.SetMkReset( - func() *Message { - return &Message{Buffer: &bytes.Buffer{}, pool: (*MsgPool)(pool)} - }, +type msgPool struct { + pool *xdr.Pool[Message] +} + +func NewMsgPool() MessagePool { + x := &msgPool{} + x.pool = xdr.NewPool( func(m *Message) { + m.pool = x m.Peer = "" m.Buffer.Reset() }, ) - return (*MsgPool)(pool) + return x } -func (msgPool *MsgPool) NewMessage(peer string) *Message { - msg := (*xdr.Pool[*Message])(msgPool).Get() +func (msgPool *msgPool) NewMessage(peer string) *Message { + msg := msgPool.pool.Get() msg.Peer = peer return msg } -func (msgPool *MsgPool) Reycle(msg *Message) { - (*xdr.Pool[*Message])(msgPool).Recycle(msg) +func (msgPool *msgPool) Reycle(msg *Message) { + msgPool.pool.Recycle(msg) +} + +func (msgPool *msgPool) StatString() string { + return msgPool.pool.StatString() } -*/ diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index a621684..9a211f2 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -59,7 +59,7 @@ func TestChannels(t *testing.T) { contents := []string{"one\n", "two\n", "three\n"} var ms []*rpc.Message - msgPool := rpc.NewMsgArena() + msgPool := rpc.NewMessagePool() for _, msg := range contents { m := msgPool.NewMessage("") m.WriteString(msg) @@ -95,7 +95,7 @@ func TestRPC(t *testing.T) { defer cancel() cs := streampair() - mp1 := rpc.NewMsgArena() + mp1 := rpc.NewMessagePool() r1 := rpc.NewDriver(ctx, mp1, rpc.NewStreamTransport(cs[0], mp1)) r1.Register(TEST_V1_Server{&Server{}}) go func() { @@ -103,7 +103,7 @@ func TestRPC(t *testing.T) { fmt.Println("loop1 returned") }() - mp2 := rpc.NewMsgArena() + mp2 := rpc.NewMessagePool() r2 := rpc.NewDriver(ctx, mp2, rpc.NewStreamTransport(cs[1], mp2)) r2.Log = os.Stderr go func() { diff --git a/xdr/pool.go b/xdr/pool.go index a122df5..982cf45 100644 --- a/xdr/pool.go +++ b/xdr/pool.go @@ -7,25 +7,28 @@ import ( type Pool[T any] struct { pool sync.Pool - reset func(T) + reset func(*T) // stats lk sync.Mutex numGet int numMiss int } -func (x *Pool[T]) SetMkReset(mk func() T, reset func(T)) { +func NewPool[T any](reset func(*T)) *Pool[T] { + x := &Pool[T]{reset: reset} x.pool.New = func() any { x.lk.Lock() x.numMiss += 1 x.lk.Unlock() - return mk() + var obj T + reset(&obj) + return &obj } - x.reset = reset + return x } -func (x *Pool[T]) Get() T { - o := x.pool.Get().(T) +func (x *Pool[T]) Get() *T { + o := x.pool.Get().(*T) x.reset(o) x.lk.Lock() @@ -34,7 +37,7 @@ func (x *Pool[T]) Get() T { return o } -func (x *Pool[T]) Recycle(o T) { +func (x *Pool[T]) Recycle(o *T) { x.pool.Put(o) } From 1651d875fdd41d22dfbb8f1354e527e36b345bdb Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Tue, 29 Jul 2025 12:34:50 -0700 Subject: [PATCH 32/51] prealloc 200 bytes for msg buffer --- rpc/msgpool.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/rpc/msgpool.go b/rpc/msgpool.go index cd97bf0..5a8bd46 100644 --- a/rpc/msgpool.go +++ b/rpc/msgpool.go @@ -11,7 +11,7 @@ type MessagePool interface { } func NewMessagePool() MessagePool { - return NewMsgPool() + return NewMsgPool(200) // return NewMsgArenaCap(5000) } @@ -54,13 +54,16 @@ type msgPool struct { pool *xdr.Pool[Message] } -func NewMsgPool() MessagePool { +func NewMsgPool(preAlloc int) MessagePool { x := &msgPool{} x.pool = xdr.NewPool( func(m *Message) { m.pool = x m.Peer = "" - m.Buffer.Reset() + if m.Buffer.Cap() == 0 { + m.Buffer.Grow(preAlloc) + m.Buffer.Reset() + } }, ) return x From 99a8c97f4f292979756673d2bbf7dc5c5c75d4e5 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Tue, 29 Jul 2025 12:37:06 -0700 Subject: [PATCH 33/51] rm prealloc --- rpc/msgpool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/msgpool.go b/rpc/msgpool.go index 5a8bd46..76bbfea 100644 --- a/rpc/msgpool.go +++ b/rpc/msgpool.go @@ -11,7 +11,7 @@ type MessagePool interface { } func NewMessagePool() MessagePool { - return NewMsgPool(200) + return NewMsgPool(0) // return NewMsgArenaCap(5000) } From 32f59fee3300cd2a5c3e122d5dfae4e2833ba32e Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Tue, 29 Jul 2025 12:39:27 -0700 Subject: [PATCH 34/51] fix bug in prealloc --- rpc/msgpool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rpc/msgpool.go b/rpc/msgpool.go index 76bbfea..8b4d8ef 100644 --- a/rpc/msgpool.go +++ b/rpc/msgpool.go @@ -11,7 +11,7 @@ type MessagePool interface { } func NewMessagePool() MessagePool { - return NewMsgPool(0) + return NewMsgPool(200) // return NewMsgArenaCap(5000) } @@ -62,8 +62,8 @@ func NewMsgPool(preAlloc int) MessagePool { m.Peer = "" if m.Buffer.Cap() == 0 { m.Buffer.Grow(preAlloc) - m.Buffer.Reset() } + m.Buffer.Reset() }, ) return x From 5b687d9c05d3e0e32aa4f1b3ff83ab39fa273655 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Tue, 29 Jul 2025 12:42:12 -0700 Subject: [PATCH 35/51] wipe prealloc --- rpc/msgpool.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/rpc/msgpool.go b/rpc/msgpool.go index 8b4d8ef..cd97bf0 100644 --- a/rpc/msgpool.go +++ b/rpc/msgpool.go @@ -11,7 +11,7 @@ type MessagePool interface { } func NewMessagePool() MessagePool { - return NewMsgPool(200) + return NewMsgPool() // return NewMsgArenaCap(5000) } @@ -54,15 +54,12 @@ type msgPool struct { pool *xdr.Pool[Message] } -func NewMsgPool(preAlloc int) MessagePool { +func NewMsgPool() MessagePool { x := &msgPool{} x.pool = xdr.NewPool( func(m *Message) { m.pool = x m.Peer = "" - if m.Buffer.Cap() == 0 { - m.Buffer.Grow(preAlloc) - } m.Buffer.Reset() }, ) From 151017a392253c805ca9d2483d38f7e36dbecdbb Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Tue, 29 Jul 2025 12:55:01 -0700 Subject: [PATCH 36/51] add rmsgChanPool --- rpc/chanpool.go | 27 +++++++++++++++++++++++++++ rpc/driver.go | 26 +++++++++++++++----------- 2 files changed, 42 insertions(+), 11 deletions(-) create mode 100644 rpc/chanpool.go diff --git a/rpc/chanpool.go b/rpc/chanpool.go new file mode 100644 index 0000000..b5955cb --- /dev/null +++ b/rpc/chanpool.go @@ -0,0 +1,27 @@ +package rpc + +import "sync" + +type RmsgChanPool struct { + pool sync.Pool +} + +func NewRmsgChanPool() *RmsgChanPool { + x := &RmsgChanPool{} + x.pool.New = func() any { + return make(chan *Rpc_msg, 1) + } + return x +} + +func (x *RmsgChanPool) Get() chan *Rpc_msg { + return x.pool.Get().(chan *Rpc_msg) +} + +func (x *RmsgChanPool) Recycle(c chan *Rpc_msg) { + select { // drain at most one message + case <-c: + default: + } + x.pool.Put(c) +} diff --git a/rpc/driver.go b/rpc/driver.go index aaa2a30..0acf530 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -230,8 +230,9 @@ type Driver struct { cs CallSet started int32 - msgPool MessagePool - numProc int + rmsgChanPool *RmsgChanPool + msgPool MessagePool + numProc int } // PanicHandler defines a handler for panics arising from service method implementations. @@ -285,13 +286,14 @@ func NewDriverTuned( } ctx, cancel := context.WithCancel(ctx) ret := Driver{ - Log: DefaultLog, - Lock: &sync.Mutex{}, - ctx: ctx, - cancel: cancel, - in: ReceiveChan(ctx, t, recvQueueLen), - msgPool: mp, - numProc: numProc, + Log: DefaultLog, + Lock: &sync.Mutex{}, + ctx: ctx, + cancel: cancel, + in: ReceiveChan(ctx, t, recvQueueLen), + rmsgChanPool: NewRmsgChanPool(), + msgPool: mp, + numProc: numProc, } ret.out, ret.outClose = SendChan( t, @@ -334,14 +336,16 @@ func (r *Driver) safeSend(ctx context.Context, m *Message) (ok bool) { // Driver implements the XdrSendCall interface, allowing it to be // used as the Send field of generated RPC client structures. func (r *Driver) SendCall(ctx context.Context, proc xdr.XdrProc) (err error) { - c := make(chan *Rpc_msg, 1) + // c := make(chan *Rpc_msg, 1) + c := r.rmsgChanPool.Get() + defer r.rmsgChanPool.Recycle(c) if ctx == nil { ctx = r.ctx } peer := GetPeer(ctx) cmsg := r.cs.NewCall(peer, proc, func(rmsg *Rpc_msg) { c <- rmsg - close(c) + // close(c) }) m := r.msgPool.NewMessage(peer) r.logXdr(proc.GetArg(), "->%s CALL(xid=%d) %s", peer, cmsg.Xid, From a421d3000f4c39f4235d0fce7dff3334a832d233 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Tue, 29 Jul 2025 12:59:57 -0700 Subject: [PATCH 37/51] Revert "add rmsgChanPool" This reverts commit 151017a392253c805ca9d2483d38f7e36dbecdbb. --- rpc/chanpool.go | 27 --------------------------- rpc/driver.go | 26 +++++++++++--------------- 2 files changed, 11 insertions(+), 42 deletions(-) delete mode 100644 rpc/chanpool.go diff --git a/rpc/chanpool.go b/rpc/chanpool.go deleted file mode 100644 index b5955cb..0000000 --- a/rpc/chanpool.go +++ /dev/null @@ -1,27 +0,0 @@ -package rpc - -import "sync" - -type RmsgChanPool struct { - pool sync.Pool -} - -func NewRmsgChanPool() *RmsgChanPool { - x := &RmsgChanPool{} - x.pool.New = func() any { - return make(chan *Rpc_msg, 1) - } - return x -} - -func (x *RmsgChanPool) Get() chan *Rpc_msg { - return x.pool.Get().(chan *Rpc_msg) -} - -func (x *RmsgChanPool) Recycle(c chan *Rpc_msg) { - select { // drain at most one message - case <-c: - default: - } - x.pool.Put(c) -} diff --git a/rpc/driver.go b/rpc/driver.go index 0acf530..aaa2a30 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -230,9 +230,8 @@ type Driver struct { cs CallSet started int32 - rmsgChanPool *RmsgChanPool - msgPool MessagePool - numProc int + msgPool MessagePool + numProc int } // PanicHandler defines a handler for panics arising from service method implementations. @@ -286,14 +285,13 @@ func NewDriverTuned( } ctx, cancel := context.WithCancel(ctx) ret := Driver{ - Log: DefaultLog, - Lock: &sync.Mutex{}, - ctx: ctx, - cancel: cancel, - in: ReceiveChan(ctx, t, recvQueueLen), - rmsgChanPool: NewRmsgChanPool(), - msgPool: mp, - numProc: numProc, + Log: DefaultLog, + Lock: &sync.Mutex{}, + ctx: ctx, + cancel: cancel, + in: ReceiveChan(ctx, t, recvQueueLen), + msgPool: mp, + numProc: numProc, } ret.out, ret.outClose = SendChan( t, @@ -336,16 +334,14 @@ func (r *Driver) safeSend(ctx context.Context, m *Message) (ok bool) { // Driver implements the XdrSendCall interface, allowing it to be // used as the Send field of generated RPC client structures. func (r *Driver) SendCall(ctx context.Context, proc xdr.XdrProc) (err error) { - // c := make(chan *Rpc_msg, 1) - c := r.rmsgChanPool.Get() - defer r.rmsgChanPool.Recycle(c) + c := make(chan *Rpc_msg, 1) if ctx == nil { ctx = r.ctx } peer := GetPeer(ctx) cmsg := r.cs.NewCall(peer, proc, func(rmsg *Rpc_msg) { c <- rmsg - // close(c) + close(c) }) m := r.msgPool.NewMessage(peer) r.logXdr(proc.GetArg(), "->%s CALL(xid=%d) %s", peer, cmsg.Xid, From ac068bb9cff95e0638684f371d2a792359981f16 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Tue, 29 Jul 2025 13:02:36 -0700 Subject: [PATCH 38/51] test doMsg par = 1 --- rpc/driver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/driver.go b/rpc/driver.go index aaa2a30..859061f 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -269,7 +269,7 @@ func NewDriver( mp MessagePool, t Transport, ) *Driver { - return NewDriverTuned(ctx, mp, t, 10, 10, 3) + return NewDriverTuned(ctx, mp, t, 10, 10, 1) } func NewDriverTuned( From 17d19dcc0a54901308ee66b11a2ab151ce9bb7ee Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Tue, 29 Jul 2025 13:08:32 -0700 Subject: [PATCH 39/51] remove latency meters from driver --- rpc/driver.go | 31 +------------------------------ 1 file changed, 1 insertion(+), 30 deletions(-) diff --git a/rpc/driver.go b/rpc/driver.go index 859061f..f042561 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -9,7 +9,6 @@ import ( "sync" "sync/atomic" - "github.com/xdrpp/goxdr/stat" "github.com/xdrpp/goxdr/xdr" ) @@ -99,22 +98,8 @@ func Detach(ctx context.Context) { func ReceiveChan(ctx context.Context, t Transport, recvQueueLen int) <-chan *Message { ret := make(chan *Message, recvQueueLen) go func(c chan<- *Message) { - - var receiveLat stat.LatencyMeter - var betweenLat stat.LatencyMeter - - bspan := betweenLat.Start() - i := 0 for { - i += 1 - if i%100000 == 0 { - fmt.Printf("ReceiveChan: receive-latency=%v between-latency=%v\n", &receiveLat, &betweenLat) - } - bspan.Stop() - rspan := receiveLat.Start() m, err := t.Receive() - rspan.Stop() - bspan = betweenLat.Start() if err != nil { if err != io.EOF { fmt.Fprintf(os.Stderr, "ReceiveChan: %s\n", err) @@ -141,17 +126,7 @@ func SendChan(t Transport, onErr func(uint32, error), sendQueueLen int) (chan<- ch := make(chan *Message, sendQueueLen) // queue len must be at least 1 chClose := make(chan struct{}) go func(ch <-chan *Message, cancel <-chan struct{}) { - - var sendLat stat.LatencyMeter - var betweenLat stat.LatencyMeter - - bspan := betweenLat.Start() - i := 0 for { - i += 1 - if i%100000 == 0 { - fmt.Printf("SendChan: send-latency=%v between-latency=%v\n", &sendLat, &betweenLat) - } select { case <-cancel: return @@ -160,11 +135,7 @@ func SendChan(t Transport, onErr func(uint32, error), sendQueueLen int) (chan<- return } else { xid := m.Xid() - bspan.Stop() - sspan := sendLat.Start() err := t.Send(m) - sspan.Stop() - bspan = betweenLat.Start() if err != nil && onErr != nil { onErr(xid, err) } @@ -269,7 +240,7 @@ func NewDriver( mp MessagePool, t Transport, ) *Driver { - return NewDriverTuned(ctx, mp, t, 10, 10, 1) + return NewDriverTuned(ctx, mp, t, 10, 10, 3) } func NewDriverTuned( From df6ca7c41191e919702c8e90b0388e64acaece10 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Tue, 29 Jul 2025 16:10:38 -0700 Subject: [PATCH 40/51] switch to arena --- rpc/msgpool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rpc/msgpool.go b/rpc/msgpool.go index cd97bf0..a4ea1d0 100644 --- a/rpc/msgpool.go +++ b/rpc/msgpool.go @@ -11,8 +11,8 @@ type MessagePool interface { } func NewMessagePool() MessagePool { - return NewMsgPool() - // return NewMsgArenaCap(5000) + // return NewMsgPool() + return NewMsgArenaCap(5000) } // msg arena From da8dc1b584b655dcd737142bdb5d7039727f43cf Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Tue, 29 Jul 2025 16:18:06 -0700 Subject: [PATCH 41/51] turn off receive parallelism --- rpc/driver.go | 10 +++++++--- rpc/msgpool.go | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/rpc/driver.go b/rpc/driver.go index f042561..e9e6194 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -240,7 +240,7 @@ func NewDriver( mp MessagePool, t Transport, ) *Driver { - return NewDriverTuned(ctx, mp, t, 10, 10, 3) + return NewDriverTuned(ctx, mp, t, 10, 10, 1) } func NewDriverTuned( @@ -357,8 +357,12 @@ func (r *Driver) Go() { if atomic.SwapInt32(&r.started, 1) == 1 { panic("rpc.Driver.Go called multiple times") } - for i := 0; i < r.numProc; i++ { - go r.doMsgs() + if r.numProc == 1 { + r.doMsgs() + } else { + for i := 0; i < r.numProc; i++ { + go r.doMsgs() + } } } diff --git a/rpc/msgpool.go b/rpc/msgpool.go index a4ea1d0..7b5793f 100644 --- a/rpc/msgpool.go +++ b/rpc/msgpool.go @@ -12,7 +12,7 @@ type MessagePool interface { func NewMessagePool() MessagePool { // return NewMsgPool() - return NewMsgArenaCap(5000) + return NewMsgArenaCap(5000) // dogwood benchmark performs better } // msg arena From b50289e690c377d9ee96ae36c6a7179e267503dd Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Tue, 29 Jul 2025 16:25:38 -0700 Subject: [PATCH 42/51] receive par 3; reset objects on returning to pool as well --- rpc/driver.go | 2 +- xdr/arena.go | 1 + xdr/pool.go | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/rpc/driver.go b/rpc/driver.go index e9e6194..6d34e52 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -240,7 +240,7 @@ func NewDriver( mp MessagePool, t Transport, ) *Driver { - return NewDriverTuned(ctx, mp, t, 10, 10, 1) + return NewDriverTuned(ctx, mp, t, 10, 10, 3) } func NewDriverTuned( diff --git a/xdr/arena.go b/xdr/arena.go index 3be1ee9..9d989d7 100644 --- a/xdr/arena.go +++ b/xdr/arena.go @@ -55,6 +55,7 @@ func (a *Arena[T]) Recycle(x *T) { if !contains(a.objects, x) { return } + a.reset(x) a.lk.Lock() defer a.lk.Unlock() diff --git a/xdr/pool.go b/xdr/pool.go index 982cf45..8894279 100644 --- a/xdr/pool.go +++ b/xdr/pool.go @@ -38,6 +38,7 @@ func (x *Pool[T]) Get() *T { } func (x *Pool[T]) Recycle(o *T) { + x.reset(o) x.pool.Put(o) } From ec0acb0e6b7b56e2f96ce06d388006a86edb2c83 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Wed, 30 Jul 2025 09:30:39 -0700 Subject: [PATCH 43/51] remove default msg pool allocator --- rpc/msgpool.go | 5 ----- rpc/rpc_test.go | 6 +++--- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/rpc/msgpool.go b/rpc/msgpool.go index 7b5793f..ec97a1d 100644 --- a/rpc/msgpool.go +++ b/rpc/msgpool.go @@ -10,11 +10,6 @@ type MessagePool interface { StatString() string } -func NewMessagePool() MessagePool { - // return NewMsgPool() - return NewMsgArenaCap(5000) // dogwood benchmark performs better -} - // msg arena type msgArena struct { diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index 9a211f2..5d1d5ae 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -59,7 +59,7 @@ func TestChannels(t *testing.T) { contents := []string{"one\n", "two\n", "three\n"} var ms []*rpc.Message - msgPool := rpc.NewMessagePool() + msgPool := rpc.NewMsgArenaCap(5000) for _, msg := range contents { m := msgPool.NewMessage("") m.WriteString(msg) @@ -95,7 +95,7 @@ func TestRPC(t *testing.T) { defer cancel() cs := streampair() - mp1 := rpc.NewMessagePool() + mp1 := rpc.NewMsgArenaCap(5000) r1 := rpc.NewDriver(ctx, mp1, rpc.NewStreamTransport(cs[0], mp1)) r1.Register(TEST_V1_Server{&Server{}}) go func() { @@ -103,7 +103,7 @@ func TestRPC(t *testing.T) { fmt.Println("loop1 returned") }() - mp2 := rpc.NewMessagePool() + mp2 := rpc.NewMsgArenaCap(5000) r2 := rpc.NewDriver(ctx, mp2, rpc.NewStreamTransport(cs[1], mp2)) r2.Log = os.Stderr go func() { From 8dcb5154ed816817bbaf9ae9de0f5c91fc8bbc5a Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Wed, 30 Jul 2025 09:40:48 -0700 Subject: [PATCH 44/51] improve doMsg --- rpc/driver.go | 52 ++++++++++++++++++++++----------------------------- 1 file changed, 22 insertions(+), 30 deletions(-) diff --git a/rpc/driver.go b/rpc/driver.go index 6d34e52..5df340c 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -237,7 +237,7 @@ func (r *Driver) logXdr(t xdr.XdrType, f string, args ...interface{}) { // calling Close(), then you may supply a nil ctx. func NewDriver( ctx context.Context, - mp MessagePool, + mp MessagePool, //XXX: remove t Transport, ) *Driver { return NewDriverTuned(ctx, mp, t, 10, 10, 3) @@ -422,36 +422,28 @@ func (r *Driver) doMsg(m *Message) { unlock: unlock, })) - q := func() { - defer func() { - unlock() - if i := recover(); i != nil { - if r.PanicHandler != nil { - r.PanicHandler.PanicHandle(i) - } else { - fmt.Fprintf(os.Stderr, "%s\n", i) - } - if IsSuccess(rmsg) { - SetStat(rmsg, SYSTEM_ERR) - } + // process call + defer func() { + unlock() + if i := recover(); i != nil { + if r.PanicHandler != nil { + r.PanicHandler.PanicHandle(i) + } else { + fmt.Fprintf(os.Stderr, "%s\n", i) } - reply := r.msgPool.NewMessage(m.Peer) - reply.Serialize(rmsg) if IsSuccess(rmsg) { - reply.Serialize(proc.GetRes()) - r.logXdr(proc.GetRes(), "->%s REPLY(xid=%d) %s", - m.Peer, msg.Xid, proc.ProcName()) + SetStat(rmsg, SYSTEM_ERR) } - m.Recycle() - r.safeSend(r.ctx, reply) - }() - proc.Do() - } - - if r.Lock == nil { - go q() - } else { - q() - } - + } + reply := r.msgPool.NewMessage(m.Peer) + reply.Serialize(rmsg) + if IsSuccess(rmsg) { + reply.Serialize(proc.GetRes()) + r.logXdr(proc.GetRes(), "->%s REPLY(xid=%d) %s", + m.Peer, msg.Xid, proc.ProcName()) + } + m.Recycle() + r.safeSend(r.ctx, reply) + }() + proc.Do() } From 70514800e9d87fa2e68cacaa0219cccaf197db3a Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Wed, 30 Jul 2025 10:54:36 -0700 Subject: [PATCH 45/51] compat with old interface --- rpc/driver.go | 8 ++------ rpc/rpc_test.go | 8 +++----- rpc/transport.go | 6 +++++- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/rpc/driver.go b/rpc/driver.go index 5df340c..759ec7e 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -235,12 +235,8 @@ func (r *Driver) logXdr(t xdr.XdrType, f string, args ...interface{}) { // // If you will never need to cancel the driver, or plan to do so by // calling Close(), then you may supply a nil ctx. -func NewDriver( - ctx context.Context, - mp MessagePool, //XXX: remove - t Transport, -) *Driver { - return NewDriverTuned(ctx, mp, t, 10, 10, 3) +func NewDriver(ctx context.Context, t Transport) *Driver { + return NewDriverTuned(ctx, NewMsgPool(), t, 10, 10, 3) } func NewDriverTuned( diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index 5d1d5ae..792723d 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -67,7 +67,7 @@ func TestChannels(t *testing.T) { } cs := streampair() - tx1, tx2 := rpc.NewStreamTransport(cs[0], msgPool), rpc.NewStreamTransport(cs[1], msgPool) + tx1, tx2 := rpc.NewStreamTransportWithPool(cs[0], msgPool), rpc.NewStreamTransportWithPool(cs[1], msgPool) r := rpc.ReceiveChan(ctx, tx1, 10) defer tx1.Close() s, _ := rpc.SendChan(tx2, nil, 10) @@ -95,16 +95,14 @@ func TestRPC(t *testing.T) { defer cancel() cs := streampair() - mp1 := rpc.NewMsgArenaCap(5000) - r1 := rpc.NewDriver(ctx, mp1, rpc.NewStreamTransport(cs[0], mp1)) + r1 := rpc.NewDriver(ctx, rpc.NewStreamTransport(cs[0])) r1.Register(TEST_V1_Server{&Server{}}) go func() { r1.Go() fmt.Println("loop1 returned") }() - mp2 := rpc.NewMsgArenaCap(5000) - r2 := rpc.NewDriver(ctx, mp2, rpc.NewStreamTransport(cs[1], mp2)) + r2 := rpc.NewDriver(ctx, rpc.NewStreamTransport(cs[1])) r2.Log = os.Stderr go func() { r2.Go() diff --git a/rpc/transport.go b/rpc/transport.go index a7969c1..b6b466d 100644 --- a/rpc/transport.go +++ b/rpc/transport.go @@ -57,7 +57,11 @@ type StreamTransport struct { // Create a stream transport from a connected stream socket. This is // the only valid way to initialize a StreamTransport. You can // manually adjust MaxMsgSize after calling this function. -func NewStreamTransport(c net.Conn, mp MessagePool) *StreamTransport { +func NewStreamTransport(c net.Conn) *StreamTransport { + return NewStreamTransportWithPool(c, NewMsgPool()) +} + +func NewStreamTransportWithPool(c net.Conn, mp MessagePool) *StreamTransport { return &StreamTransport{ MaxMsgSize: 0x100000, Conn: c, From b2fbdfa1c6a3ae787ddfa6fb380c5599305da3ed Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Wed, 30 Jul 2025 11:26:56 -0700 Subject: [PATCH 46/51] intro call pool --- rpc/callpool.go | 66 +++++++++++++++++++++++++++++++++++++++++++++++++ rpc/driver.go | 10 +++++--- rpc/msg.go | 2 +- rpc/msgpool.go | 8 +++--- rpc/rpc_test.go | 2 +- rpc/util.go | 64 ++++++++++++++++++++++++++++++----------------- 6 files changed, 120 insertions(+), 32 deletions(-) create mode 100644 rpc/callpool.go diff --git a/rpc/callpool.go b/rpc/callpool.go new file mode 100644 index 0000000..e18722d --- /dev/null +++ b/rpc/callpool.go @@ -0,0 +1,66 @@ +package rpc + +import "github.com/xdrpp/goxdr/xdr" + +type CallPool interface { + NewCall() *PendingCall + Recycle(*PendingCall) + StatString() string +} + +// arena + +type callArena struct { + arena *xdr.Arena[PendingCall] +} + +func NewCallArena(cap int) CallPool { + callArena := &callArena{} + callArena.arena = xdr.NewArena( + cap, + func(p *PendingCall) { + *p = PendingCall{} + }, + ) + return callArena +} + +func (callArena *callArena) StatString() string { + return callArena.arena.StatString() +} + +func (callArena *callArena) NewCall() *PendingCall { + return callArena.arena.Get() +} + +func (callArena *callArena) Recycle(call *PendingCall) { + callArena.arena.Recycle(call) +} + +// pool + +type callPool struct { + pool *xdr.Pool[PendingCall] +} + +func NewCallPool() CallPool { + x := &callPool{} + x.pool = xdr.NewPool( + func(call *PendingCall) { + *call = PendingCall{} + }, + ) + return x +} + +func (callPool *callPool) NewCall() *PendingCall { + return callPool.pool.Get() +} + +func (callPool *callPool) Recycle(call *PendingCall) { + callPool.pool.Recycle(call) +} + +func (callPool *callPool) StatString() string { + return callPool.pool.StatString() +} diff --git a/rpc/driver.go b/rpc/driver.go index 759ec7e..3961c1b 100644 --- a/rpc/driver.go +++ b/rpc/driver.go @@ -236,12 +236,13 @@ func (r *Driver) logXdr(t xdr.XdrType, f string, args ...interface{}) { // If you will never need to cancel the driver, or plan to do so by // calling Close(), then you may supply a nil ctx. func NewDriver(ctx context.Context, t Transport) *Driver { - return NewDriverTuned(ctx, NewMsgPool(), t, 10, 10, 3) + return NewDriverTuned(ctx, NewMsgPool(), NewCallPool(), t, 10, 10, 3) } func NewDriverTuned( ctx context.Context, mp MessagePool, + cp CallPool, t Transport, recvQueueLen int, sendQueueLen int, @@ -260,6 +261,7 @@ func NewDriverTuned( msgPool: mp, numProc: numProc, } + ret.cs.pool = cp ret.out, ret.outClose = SendChan( t, func(xid uint32, _ error) { @@ -392,10 +394,10 @@ func (r *Driver) doMsg(m *Message) { return } - if pc := r.cs.GetReply(m.Peer, msg, m.In()); pc != nil { - r.logXdr(pc.Proc.GetRes(), "<-%s REPLY(xid=%d) %s", m.Peer, msg.Xid, pc.Proc.ProcName()) + if proc, cb, ok := r.cs.GetReply(m.Peer, msg, m.In()); ok { + r.logXdr(proc.GetRes(), "<-%s REPLY(xid=%d) %s", m.Peer, msg.Xid, proc.ProcName()) m.Recycle() - pc.Cb(msg) + cb(msg) return } diff --git a/rpc/msg.go b/rpc/msg.go index 182051b..e027876 100644 --- a/rpc/msg.go +++ b/rpc/msg.go @@ -13,7 +13,7 @@ type Message struct { } func (m *Message) Recycle() { - m.pool.Reycle(m) + m.pool.Recycle(m) } func (m *Message) In() xdr.XDR { diff --git a/rpc/msgpool.go b/rpc/msgpool.go index ec97a1d..7e87f0c 100644 --- a/rpc/msgpool.go +++ b/rpc/msgpool.go @@ -6,7 +6,7 @@ import ( type MessagePool interface { NewMessage(peer string) *Message - Reycle(msg *Message) + Recycle(msg *Message) StatString() string } @@ -16,7 +16,7 @@ type msgArena struct { arena *xdr.Arena[Message] } -func NewMsgArenaCap(cap int) MessagePool { +func NewMsgArena(cap int) MessagePool { msgArena := &msgArena{} msgArena.arena = xdr.NewArena( cap, @@ -39,7 +39,7 @@ func (msgArena *msgArena) NewMessage(peer string) *Message { return msg } -func (msgArena *msgArena) Reycle(msg *Message) { +func (msgArena *msgArena) Recycle(msg *Message) { msgArena.arena.Recycle(msg) } @@ -67,7 +67,7 @@ func (msgPool *msgPool) NewMessage(peer string) *Message { return msg } -func (msgPool *msgPool) Reycle(msg *Message) { +func (msgPool *msgPool) Recycle(msg *Message) { msgPool.pool.Recycle(msg) } diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index 792723d..55bf3fd 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -59,7 +59,7 @@ func TestChannels(t *testing.T) { contents := []string{"one\n", "two\n", "three\n"} var ms []*rpc.Message - msgPool := rpc.NewMsgArenaCap(5000) + msgPool := rpc.NewMsgArena(5000) for _, msg := range contents { m := msgPool.NewMessage("") m.WriteString(msg) diff --git a/rpc/util.go b/rpc/util.go index 10069c4..075219d 100644 --- a/rpc/util.go +++ b/rpc/util.go @@ -1,21 +1,28 @@ - // Utilities for implementing RFC5531 RPC. package rpc import ( "fmt" - "github.com/xdrpp/goxdr/xdr" "sync" + + "github.com/xdrpp/goxdr/xdr" ) // Fake accept stat for when we can't send a message const SEND_ERR Accept_stat = 97 + // Fake accept stat to represent a cancelled call const CANCELED Accept_stat = 98 + // Fake accept stat to represent a response we can't unmarshal const GARBAGE_RET Accept_stat = 99 + func init() { - pseudo_states := []struct{stat Accept_stat; name string; comment string}{ + pseudo_states := []struct { + stat Accept_stat + name string + comment string + }{ {SEND_ERR, "SEND_ERR", "Transport-level error when sending call"}, {CANCELED, "CANCELED", "Call context canceled"}, {GARBAGE_RET, "GARBAGE_RET", "Unable to decode return value"}, @@ -68,17 +75,18 @@ func GetMsg(in xdr.XDR) (*Rpc_msg, error) { } type PendingCall struct { - Proc xdr.XdrProc - Cb func(*Rpc_msg) + Proc xdr.XdrProc + Cb func(*Rpc_msg) Server string } // A CallSet represents a set of pending calls. Its methods can be // called concurrently from multiple threads. type CallSet struct { - lock sync.Mutex + pool CallPool + lock sync.Mutex lastXid uint32 - calls map[uint32]*PendingCall + calls map[uint32]*PendingCall } // Allocate a new XID and create a message header for an outgoing RPC @@ -97,12 +105,14 @@ func (cs *CallSet) NewCall(server string, proc xdr.XdrProc, cs.lastXid++ _, ok = cs.calls[cs.lastXid] } - cs.calls[cs.lastXid] = &PendingCall{ - Proc: proc, - Cb: cb, - Server: server, - } - cmsg := Rpc_msg { Xid: cs.lastXid } + + pc := cs.pool.NewCall() + pc.Proc = proc + pc.Cb = cb + pc.Server = server + cs.calls[cs.lastXid] = pc + + cmsg := Rpc_msg{Xid: cs.lastXid} cmsg.Body.Mtype = CALL cmsg.Body.Cbody().Rpcvers = 2 cmsg.Body.Cbody().Prog = proc.Prog() @@ -115,6 +125,14 @@ func (cs *CallSet) NewCall(server string, proc xdr.XdrProc, func (cs *CallSet) Delete(xid uint32) { cs.lock.Lock() defer cs.lock.Unlock() + cs.delete(xid) +} + +func (cs *CallSet) delete(xid uint32) { + pc, ok := cs.calls[xid] + if ok { + cs.pool.Recycle(pc) + } delete(cs.calls, xid) } @@ -122,18 +140,19 @@ func (cs *CallSet) Delete(xid uint32) { // CANCELED pseudo-error (by default) or another pseudo-error if // reason is supplied. The only lengths that make sense for reason // are 0 (use default) and 1 (supply Accept_stat). -func (cs *CallSet) Cancel(xid uint32, reason...Accept_stat) { +func (cs *CallSet) Cancel(xid uint32, reason ...Accept_stat) { cs.lock.Lock() defer cs.lock.Unlock() if pc, ok := cs.calls[xid]; ok { delete(cs.calls, xid) - rmsg := Rpc_msg{ Xid: xid } + rmsg := Rpc_msg{Xid: xid} if len(reason) == 0 || reason[0] == SUCCESS { SetStat(&rmsg, CANCELED) } else { SetStat(&rmsg, reason[0]) } pc.Cb(&rmsg) + cs.pool.Recycle(pc) } } @@ -143,24 +162,24 @@ func (cs *CallSet) CancelAll() { calls := cs.calls cs.calls = nil for xid, pc := range calls { - rmsg := Rpc_msg{ Xid: xid } + rmsg := Rpc_msg{Xid: xid} SetStat(&rmsg, CANCELED) pc.Cb(&rmsg) + cs.pool.Recycle(pc) } } // Attempt to match a reply with a pending call. If it returns // a non-nil pc, you should call pc.Cb(rmsg). -func (cs *CallSet) GetReply(server string, rmsg *Rpc_msg, - in xdr.XDR) *PendingCall { +func (cs *CallSet) GetReply(server string, rmsg *Rpc_msg, in xdr.XDR) (xdr.XdrProc, func(*Rpc_msg), bool) { if rmsg.Body.Mtype != REPLY { - return nil + return nil, nil, false } cs.lock.Lock() defer cs.lock.Unlock() pc, ok := cs.calls[rmsg.Xid] if !ok || pc.Server != server { - return nil + return nil, nil, false } delete(cs.calls, rmsg.Xid) if IsSuccess(rmsg) { @@ -168,7 +187,8 @@ func (cs *CallSet) GetReply(server string, rmsg *Rpc_msg, rmsg.Body.Rbody().Areply().Reply_data.Stat = GARBAGE_RET } } - return pc + defer cs.pool.Recycle(pc) + return pc.Proc, pc.Cb, true } // Container for a server implementing a set of program/version @@ -201,7 +221,7 @@ func (s RpcSrv) GetProc(cmsg *Rpc_msg, in xdr.XDR) ( if cmsg.Body.Mtype != CALL { return nil, nil } - rmsg = &Rpc_msg { Xid: cmsg.Xid } + rmsg = &Rpc_msg{Xid: cmsg.Xid} if cmsg.Body.Cbody().Rpcvers != 2 { rmsg.Body.Mtype = REPLY From a7173c3fb0dd2d048bbd85c3f9c1d946ce0c6950 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Wed, 30 Jul 2025 12:37:05 -0700 Subject: [PATCH 47/51] use stack alloc for xdrGet32/64 --- cmd/goxdr/header.go.in | 39 +++++++++++++-------------------------- 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/cmd/goxdr/header.go.in b/cmd/goxdr/header.go.in index 78c7eda..8e98e5d 100644 --- a/cmd/goxdr/header.go.in +++ b/cmd/goxdr/header.go.in @@ -683,36 +683,23 @@ func xdrReadPad(in io.Reader, n uint32) { } } +func xdrReadInto(in io.Reader, p []byte) { + if k, err := in.Read(p); err != nil || k != len(p) { + XdrPanic("%s", err.Error()) + } +} + func xdrGet32(in io.Reader) uint32 { - b := xdrReadN(in, 4) - return binary.BigEndian.Uint32(b) + var b [4]byte + xdrReadInto(in, b[:]) + return binary.BigEndian.Uint32(b[:]) } func xdrGet64(in io.Reader) uint64 { - b := xdrReadN(in, 8) - return binary.BigEndian.Uint64(b) -} - -// func xdrReadInto(in io.Reader, p []byte) []byte { -// if k, err := in.Read(p); err != nil || k != len(p) { -// XdrPanic("%s", err.Error()) -// } -// return p -// } -// -// var smallPool = NewSmallBytesPool() -// -// func xdrGet32(in io.Reader) uint32 { -// b := xdrReadInto(in, smallPool.Get(4)) -// defer smallPool.Recycle(b) -// return binary.BigEndian.Uint32(b) -// } -// -// func xdrGet64(in io.Reader) uint64 { -// b := xdrReadInto(in, smallPool.Get(8)) -// defer smallPool.Recycle(b) -// return binary.BigEndian.Uint64(b) -// } + var b [8]byte + xdrReadInto(in, b[:]) + return binary.BigEndian.Uint64(b[:]) +} // XDR that unmarshals from canonical binary format type XdrIn struct { From 7ca143f1aa4e01707fcc4fca663500b29a78809b Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Wed, 30 Jul 2025 12:44:19 -0700 Subject: [PATCH 48/51] stack alloc xdrReadPad --- cmd/goxdr/header.go.in | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/goxdr/header.go.in b/cmd/goxdr/header.go.in index 8e98e5d..ef0905d 100644 --- a/cmd/goxdr/header.go.in +++ b/cmd/goxdr/header.go.in @@ -674,7 +674,9 @@ func xdrReadN(in io.Reader, n uint32) []byte { func xdrReadPad(in io.Reader, n uint32) { if n & 3 != 0 { - got := xdrReadN(in, 4-(n&3)) + var buf [4]byte + got := buf[:4-(n&3)] + xdrReadInto(in, got) for _, b := range got { if b != 0 { XdrPanic("padding contained non-zero bytes") From 8b5c5afe20df69f78a3c290e4c68979a77ebfab4 Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Wed, 30 Jul 2025 12:47:34 -0700 Subject: [PATCH 49/51] cleanup --- xdr/bpool.go | 32 -------------------------------- 1 file changed, 32 deletions(-) delete mode 100644 xdr/bpool.go diff --git a/xdr/bpool.go b/xdr/bpool.go deleted file mode 100644 index ddd409c..0000000 --- a/xdr/bpool.go +++ /dev/null @@ -1,32 +0,0 @@ -package xdr - -import "sync" - -type SmallBytesPool struct { - size [8]sync.Pool -} - -func NewSmallBytesPool() *SmallBytesPool { - return &SmallBytesPool{ - size: [8]sync.Pool{ - {New: func() any { return []byte{0} }}, - {New: func() any { return []byte{0, 0} }}, - {New: func() any { return []byte{0, 0, 0} }}, - {New: func() any { return []byte{0, 0, 0, 0} }}, - {New: func() any { return []byte{0, 0, 0, 0, 0} }}, - {New: func() any { return []byte{0, 0, 0, 0, 0, 0} }}, - {New: func() any { return []byte{0, 0, 0, 0, 0, 0, 0} }}, - {New: func() any { return []byte{0, 0, 0, 0, 0, 0, 0, 0} }}, - }, - } -} - -func (x *SmallBytesPool) Get(len int) []byte { - p := x.size[len-1].Get().([]byte) - clear(p) - return p -} - -func (x *SmallBytesPool) Recycle(b []byte) { - x.size[len(b)-1].Put(b) -} From 330db3aca5038fae6c7c278f2892882e884474ab Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Wed, 3 Sep 2025 22:18:11 -0700 Subject: [PATCH 50/51] tunable defalt max message size --- rpc/transport.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rpc/transport.go b/rpc/transport.go index b6b466d..e57c01a 100644 --- a/rpc/transport.go +++ b/rpc/transport.go @@ -54,6 +54,9 @@ type StreamTransport struct { msgPool MessagePool } +// DefaultMaxMsgSize is the default maximum message size for StreamTransports. +var DefaultMaxMsgSize int = 10e6 + // Create a stream transport from a connected stream socket. This is // the only valid way to initialize a StreamTransport. You can // manually adjust MaxMsgSize after calling this function. @@ -63,7 +66,7 @@ func NewStreamTransport(c net.Conn) *StreamTransport { func NewStreamTransportWithPool(c net.Conn, mp MessagePool) *StreamTransport { return &StreamTransport{ - MaxMsgSize: 0x100000, + MaxMsgSize: DefaultMaxMsgSize, Conn: c, okay: 1, msgPool: mp, From b3774c16fab1a948c1325b995b6c3f61273355bf Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Tue, 9 Sep 2025 21:07:14 -0700 Subject: [PATCH 51/51] fix bug in xdrReadN --- cmd/goxdr/header.go.in | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/goxdr/header.go.in b/cmd/goxdr/header.go.in index ef0905d..f7fb255 100644 --- a/cmd/goxdr/header.go.in +++ b/cmd/goxdr/header.go.in @@ -666,8 +666,8 @@ func (xo XdrOut) Marshal(name string, i XdrType) { func xdrReadN(in io.Reader, n uint32) []byte { p := make([]byte, n) - if k, err := in.Read(p); err != nil || k != int(n){ - XdrPanic("%s", err.Error()) + if k, err := io.ReadFull(in, p); err != nil { + XdrPanic("short read; %d/%d bytes (%s)", k, n, err.Error()) } return p }