Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
5f51212
fix race condition in driver closure
petar May 24, 2025
db0ac80
avoid goroutine when processing server calls in sequence
petar Jun 5, 2025
9d896cd
use a sync.Pool for Messages in rpc.Transport and rpc.Driver
petar Jun 8, 2025
9f3dc84
fix
petar Jun 8, 2025
cbe282b
fix
petar Jun 8, 2025
9705188
cosmetic
petar Jun 8, 2025
5b659fb
move to a global pool for rpc.Message buffers
petar Jun 8, 2025
0c045a9
maintain message cache stats
petar Jun 9, 2025
81c42df
add forgotten safeSend
petar Jun 9, 2025
69ebab7
improve xdrReadN; results in massive speedup
petar Jun 9, 2025
9f4cadb
fix stats display
petar Jun 9, 2025
4d9eb65
add struct pool and arena
petar Jul 15, 2025
f433905
fix bug; message pool
petar Jul 17, 2025
8b19b47
msgpool as init param
petar Jul 17, 2025
10fbd2f
process incoming messages in parallel
petar Jul 24, 2025
5128944
adjust par to 10
petar Jul 24, 2025
04234aa
add a buffer to the message processing channel
petar Jul 24, 2025
8deae8c
add buffer to send/receive msg chans
petar Jul 24, 2025
c251528
simplify par msg processing
petar Jul 24, 2025
18190a9
implement msg pool with arena
petar Jul 24, 2025
f15d546
install latency meters in sendchan and receivechan
petar Jul 24, 2025
082365e
adjust stat printout
petar Jul 24, 2025
145daf0
fix small bytes pool; use small bytes pool for xdrGet32 and xdrGet64
petar Jul 25, 2025
0a3a9ef
revert to original xdrGet32/64
petar Jul 25, 2025
d215576
make operation parameter of driver customizable
petar Jul 25, 2025
68cf81a
fix test
petar Jul 25, 2025
bfd977d
adjust params
petar Jul 27, 2025
5bef373
avoid bytes.Buffer allocation
petar Jul 27, 2025
32e3fb6
put msgpool behind an interface
petar Jul 29, 2025
fb2a5d7
remove init from arena
petar Jul 29, 2025
6cacee5
swap in message pool
petar Jul 29, 2025
1651d87
prealloc 200 bytes for msg buffer
petar Jul 29, 2025
99a8c97
rm prealloc
petar Jul 29, 2025
32f59fe
fix bug in prealloc
petar Jul 29, 2025
5b687d9
wipe prealloc
petar Jul 29, 2025
151017a
add rmsgChanPool
petar Jul 29, 2025
a421d30
Revert "add rmsgChanPool"
petar Jul 29, 2025
ac068bb
test doMsg par = 1
petar Jul 29, 2025
17d19dc
remove latency meters from driver
petar Jul 29, 2025
df6ca7c
switch to arena
petar Jul 29, 2025
da8dc1b
turn off receive parallelism
petar Jul 29, 2025
b50289e
receive par 3; reset objects on returning to pool as well
petar Jul 29, 2025
ec0acb0
remove default msg pool allocator
petar Jul 30, 2025
8dcb515
improve doMsg
petar Jul 30, 2025
7051480
compat with old interface
petar Jul 30, 2025
b2fbdfa
intro call pool
petar Jul 30, 2025
a7173c3
use stack alloc for xdrGet32/64
petar Jul 30, 2025
7ca143f
stack alloc xdrReadPad
petar Jul 30, 2025
8b5c5af
cleanup
petar Jul 30, 2025
330db3a
tunable defalt max message size
petar Sep 4, 2025
b3774c1
fix bug in xdrReadN
petar Sep 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmd/goxdr/goxdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func (r *rpc_const) emit(e *emitter) {
if r.comment != "" {
e.printf("%s\n", r.comment)
}
// Replace the character '\'' with '_' in r.val before printing constants;
// Replace the character '\'' with '_' in r.val before printing constants;
// golang supports _ but not '
valStr := strings.ReplaceAll(r.val.String(), "'", "_")
e.printf("const %s = %s\n", r.id, valStr)
Expand Down Expand Up @@ -1386,7 +1386,6 @@ func main() {

if *opt_emitbp {
fmt.Fprintf(out, `import(
"bytes"
"context"
"encoding/binary"
"fmt"
Expand Down
29 changes: 19 additions & 10 deletions cmd/goxdr/header.go.in
Original file line number Diff line number Diff line change
Expand Up @@ -665,16 +665,18 @@ func (xo XdrOut) Marshal(name string, i XdrType) {
}

func xdrReadN(in io.Reader, n uint32) []byte {
var b bytes.Buffer
if _, err := io.CopyN(&b, in, int64(n)); err != nil {
XdrPanic("%s", err.Error())
p := make([]byte, n)
if k, err := io.ReadFull(in, p); err != nil {
XdrPanic("short read; %d/%d bytes (%s)", k, n, err.Error())
}
return b.Bytes()
return p
}

func xdrReadPad(in io.Reader, n uint32) {
if n & 3 != 0 {
got := xdrReadN(in, 4-(n&3))
var buf [4]byte
got := buf[:4-(n&3)]
xdrReadInto(in, got)
for _, b := range got {
if b != 0 {
XdrPanic("padding contained non-zero bytes")
Expand All @@ -683,17 +685,24 @@ func xdrReadPad(in io.Reader, n uint32) {
}
}

func xdrReadInto(in io.Reader, p []byte) {
if k, err := in.Read(p); err != nil || k != len(p) {
XdrPanic("%s", err.Error())
}
}

func xdrGet32(in io.Reader) uint32 {
b := xdrReadN(in, 4)
return binary.BigEndian.Uint32(b)
var b [4]byte
xdrReadInto(in, b[:])
return binary.BigEndian.Uint32(b[:])
}

func xdrGet64(in io.Reader) uint64 {
b := xdrReadN(in, 8)
return binary.BigEndian.Uint64(b)
var b [8]byte
xdrReadInto(in, b[:])
return binary.BigEndian.Uint64(b[:])
}


// XDR that unmarshals from canonical binary format
type XdrIn struct {
In io.Reader
Expand Down
66 changes: 66 additions & 0 deletions rpc/callpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package rpc

import "github.com/xdrpp/goxdr/xdr"

type CallPool interface {
NewCall() *PendingCall
Recycle(*PendingCall)
StatString() string
}

// arena

type callArena struct {
arena *xdr.Arena[PendingCall]
}

func NewCallArena(cap int) CallPool {
callArena := &callArena{}
callArena.arena = xdr.NewArena(
cap,
func(p *PendingCall) {
*p = PendingCall{}
},
)
return callArena
}

func (callArena *callArena) StatString() string {
return callArena.arena.StatString()
}

func (callArena *callArena) NewCall() *PendingCall {
return callArena.arena.Get()
}

func (callArena *callArena) Recycle(call *PendingCall) {
callArena.arena.Recycle(call)
}

// pool

type callPool struct {
pool *xdr.Pool[PendingCall]
}

func NewCallPool() CallPool {
x := &callPool{}
x.pool = xdr.NewPool(
func(call *PendingCall) {
*call = PendingCall{}
},
)
return x
}

func (callPool *callPool) NewCall() *PendingCall {
return callPool.pool.Get()
}

func (callPool *callPool) Recycle(call *PendingCall) {
callPool.pool.Recycle(call)
}

func (callPool *callPool) StatString() string {
return callPool.pool.StatString()
}
Loading