Skip to content

Commit c4ee972

Browse files
authored
Amqp codec (#34)
* Extract the AMQP codec fixes:#27
1 parent cf8491c commit c4ee972

24 files changed

+4499
-43
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@
1616
.idea
1717
coverage.txt
1818
bin/
19-
vet
19+
vet
20+
.DS_Store

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
FROM golang:1.16 as builder
22
ENV GOPATH=/go GOOS=linux CGO_ENABLED=0
3-
WORKDIR /go/src/github.com/Gsantomaggio/go-stream-client
3+
WORKDIR /go/src/github.com/rabbitmq/rabbitmq-stream-go-client
44
COPY go.mod go.sum VERSION ./
55
COPY pkg pkg
66
COPY perfTest perfTest

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
# GO stream client for RabbitMQ streaming queues
22
---
3-
![Build](https://github.com/Gsantomaggio/go-stream-client/workflows/Build/badge.svg)
3+
![Build](https://github.com/rabbitmq/rabbitmq-stream-go-client/workflows/Build/badge.svg)
44
[![codecov](https://codecov.io/gh/Gsantomaggio/go-stream-client/branch/main/graph/badge.svg?token=HZD4S71QIM)](https://codecov.io/gh/Gsantomaggio/go-stream-client)
55

66
Experimental client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-server/tree/master/deps/rabbitmq_stream)
77

88
### Download
99
---
1010
```
11-
go get -u github.com/gsantomaggio/rabbitmq-stream-go-client@v0.3-alpha
11+
go get -u github.com/rabbitmq/rabbitmq-stream-go-client@v0.3-alpha
1212
```
1313

1414
### How to test

examples/getting_started.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ package main
33
import (
44
"bufio"
55
"fmt"
6-
"github.com/Azure/go-amqp"
76
"github.com/google/uuid"
8-
"github.com/gsantomaggio/go-stream-client/pkg/streaming"
7+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
8+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/streaming"
99
"os"
1010
"sync/atomic"
1111
"time"
@@ -38,7 +38,6 @@ func main() {
3838
Create() // Create the streaming queue
3939
CheckErr(err)
4040

41-
4241
var count int32
4342
consumer, err := client.ConsumerCreator().
4443
Stream(streamName).
@@ -55,7 +54,7 @@ func main() {
5554
// Get a new producer to publish the messages
5655
clientProducer, err := streaming.NewClientCreator().Uri(uris).
5756
PublishErrorHandler(func(publisherId uint8, publishingId int64, code uint16) {
58-
streaming.ERROR("Publish Error, publisherId %d, code: %s", publisherId, streaming.LookErrorCode(code))
57+
streaming.ERROR("Publish Error, publisherId %d, code: %s", publisherId, streaming.LookErrorCode(code))
5958
}).
6059
Connect()
6160
CheckErr(err)

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
module github.com/gsantomaggio/go-stream-client
1+
module github.com/rabbitmq/rabbitmq-stream-go-client
22

33
go 1.15
44

55
require (
6-
github.com/Azure/go-amqp v0.13.1
76
github.com/google/uuid v1.2.0
87
github.com/onsi/ginkgo v1.15.1
98
github.com/onsi/gomega v1.11.0
109
github.com/pkg/errors v0.9.1
1110
github.com/spf13/cobra v1.1.3
11+
honnef.co/go/tools v0.1.3 // indirect
1212
)

go.sum

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiy
1313
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
1414
github.com/Azure/go-amqp v0.13.1 h1:dXnEJ89Hf7wMkcBbLqvocZlM4a3uiX9uCxJIvU77+Oo=
1515
github.com/Azure/go-amqp v0.13.1/go.mod h1:qj+o8xPCz9tMSbQ83Vp8boHahuRDl5mkNHyt1xlxUTs=
16+
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
1617
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
1718
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
1819
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
@@ -232,6 +233,7 @@ golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU
232233
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
233234
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
234235
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
236+
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
235237
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
236238
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
237239
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -285,6 +287,8 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w
285287
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
286288
golang.org/x/sys v0.0.0-20210112080510-489259a85091 h1:DMyOG0U+gKfu8JZzg2UQe9MeaC1X+xQWlAKcRnjxjCw=
287289
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
290+
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 h1:myAQVi0cGEoqQVR5POX+8RR2mrocKqNN1hmeMqhX27k=
291+
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
288292
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
289293
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
290294
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
@@ -311,6 +315,8 @@ golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtn
311315
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
312316
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
313317
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
318+
golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY=
319+
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
314320
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
315321
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
316322
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -365,4 +371,6 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh
365371
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
366372
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
367373
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
374+
honnef.co/go/tools v0.1.3 h1:qTakTkI6ni6LFD5sBwwsdSO+AQqbSIxOauHTTQKZ/7o=
375+
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
368376
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=

perfTest/cmd/commands.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package cmd
33
import (
44
"fmt"
55
"github.com/google/uuid"
6-
"github.com/gsantomaggio/go-stream-client/pkg/streaming"
6+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/streaming"
77
"github.com/spf13/cobra"
88
"os"
99
)

perfTest/cmd/silent.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ package cmd
22

33
import (
44
"fmt"
5-
"github.com/Azure/go-amqp"
65
"github.com/google/uuid"
7-
"github.com/gsantomaggio/go-stream-client/pkg/streaming"
6+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
7+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/streaming"
88
"github.com/spf13/cobra"
99
"math/rand"
1010
"sync/atomic"

perfTest/perftest.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package main
22

33
import (
4-
"github.com/gsantomaggio/go-stream-client/perfTest/cmd"
4+
"github.com/rabbitmq/rabbitmq-stream-go-client/perfTest/cmd"
55
"sync"
66
)
77

pkg/amqp/buffer.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package amqp
2+
3+
import (
4+
"encoding/binary"
5+
"io"
6+
)
7+
8+
// buffer is similar to bytes.Buffer but specialized for this package
9+
type buffer struct {
10+
b []byte
11+
i int
12+
}
13+
14+
func (b *buffer) next(n int64) ([]byte, bool) {
15+
if b.readCheck(n) {
16+
buf := b.b[b.i:len(b.b)]
17+
b.i = len(b.b)
18+
return buf, false
19+
}
20+
21+
buf := b.b[b.i : b.i+int(n)]
22+
b.i += int(n)
23+
return buf, true
24+
}
25+
26+
func (b *buffer) skip(n int) {
27+
b.i += n
28+
}
29+
30+
31+
32+
func (b *buffer) readCheck(n int64) bool {
33+
return int64(b.i)+n > int64(len(b.b))
34+
}
35+
36+
func (b *buffer) readByte() (byte, error) {
37+
if b.readCheck(1) {
38+
return 0, io.EOF
39+
}
40+
41+
byte_ := b.b[b.i]
42+
b.i++
43+
return byte_, nil
44+
}
45+
46+
func (b *buffer) readType() (amqpType, error) {
47+
n, err := b.readByte()
48+
return amqpType(n), err
49+
}
50+
51+
func (b *buffer) peekType() (amqpType, error) {
52+
if b.readCheck(1) {
53+
return 0, io.EOF
54+
}
55+
56+
return amqpType(b.b[b.i]), nil
57+
}
58+
59+
func (b *buffer) readUint16() (uint16, error) {
60+
if b.readCheck(2) {
61+
return 0, io.EOF
62+
}
63+
64+
n := binary.BigEndian.Uint16(b.b[b.i:])
65+
b.i += 2
66+
return n, nil
67+
}
68+
69+
func (b *buffer) readUint32() (uint32, error) {
70+
if b.readCheck(4) {
71+
return 0, io.EOF
72+
}
73+
74+
n := binary.BigEndian.Uint32(b.b[b.i:])
75+
b.i += 4
76+
return n, nil
77+
}
78+
79+
func (b *buffer) readUint64() (uint64, error) {
80+
if b.readCheck(8) {
81+
return 0, io.EOF
82+
}
83+
84+
n := binary.BigEndian.Uint64(b.b[b.i : b.i+8])
85+
b.i += 8
86+
return n, nil
87+
}
88+
89+
func (b *buffer) write(p []byte) {
90+
b.b = append(b.b, p...)
91+
}
92+
93+
func (b *buffer) writeByte(byte_ byte) {
94+
b.b = append(b.b, byte_)
95+
}
96+
97+
func (b *buffer) writeString(s string) {
98+
b.b = append(b.b, s...)
99+
}
100+
101+
func (b *buffer) len() int {
102+
return len(b.b) - b.i
103+
}
104+
105+
func (b *buffer) bytes() []byte {
106+
return b.b[b.i:]
107+
}
108+
109+
func (b *buffer) writeUint16(n uint16) {
110+
b.b = append(b.b,
111+
byte(n>>8),
112+
byte(n),
113+
)
114+
}
115+
116+
func (b *buffer) writeUint32(n uint32) {
117+
b.b = append(b.b,
118+
byte(n>>24),
119+
byte(n>>16),
120+
byte(n>>8),
121+
byte(n),
122+
)
123+
}
124+
125+
func (b *buffer) writeUint64(n uint64) {
126+
b.b = append(b.b,
127+
byte(n>>56),
128+
byte(n>>48),
129+
byte(n>>40),
130+
byte(n>>32),
131+
byte(n>>24),
132+
byte(n>>16),
133+
byte(n>>8),
134+
byte(n),
135+
)
136+
}

0 commit comments

Comments
 (0)