Skip to content

Commit 1af0078

Browse files
authored
WIP Implement the metadata call to retrieve the stream lead (#37)
Add support for the enviroment
1 parent bb60f9d commit 1af0078

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+3305
-1898
lines changed

.github/workflows/test_and_publish_image.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ jobs:
1010
rabbitmq-streaming:
1111
image: pivotalrabbitmq/rabbitmq-stream
1212
env:
13-
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: "-rabbitmq_stream"
13+
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: "-rabbitmq_stream advertised_host localhost"
1414
ports:
1515
- 5551:5551
1616
steps:

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ ENV GOPATH=/go GOOS=linux CGO_ENABLED=0
33
WORKDIR /go/src/github.com/rabbitmq/rabbitmq-stream-go-client
44
COPY go.mod go.sum VERSION ./
55
COPY pkg pkg
6+
COPY Makefile Makefile
67
COPY perfTest perfTest
78

89
RUN mkdir /stream_perf_test

Makefile

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ LDFLAGS = "-X main.Version=$(VERSION)"
1010

1111
all: test build
1212

13-
vet:
14-
go vet ./...
13+
14+
vet: $(go_sources)
15+
go vet ./pkg/stream
1516

1617
fmt:
1718
go fmt ./...
@@ -20,10 +21,13 @@ STATICCHECK ?= $(GOBIN)/staticcheck
2021
$(STATICCHECK):
2122
go get honnef.co/go/tools/cmd/staticcheck
2223
check: $(STATICCHECK)
23-
$(STATICCHECK) ./pkg/streaming
24+
$(STATICCHECK) ./pkg/stream
2425

2526
test: vet fmt check
26-
go test -v ./pkg/streaming -race -coverprofile=coverage.txt -covermode=atomic
27+
go test -v ./pkg/stream -race -coverprofile=coverage.txt -covermode=atomic
28+
29+
integration-test: vet fmt check
30+
go test -v ./pkg/system_integration -race -coverprofile=coverage.txt -covermode=atomic -tags debug
2731

2832
build: vet fmt check
2933
go build -ldflags=$(LDFLAGS) -v ./...
@@ -39,4 +43,4 @@ perf-test-docker-build: perf-test-build
3943
docker build -t pivotalrabbitmq/go-stream-perf-test:$(VERSION) .
4044

4145
perf-test-docker-push: perf-test-docker-build
42-
docker push pivotalrabbitmq/go-stream-perf-test:$(VERSION)
46+
docker push pivotalrabbitmq/go-stream-perf-test:$(VERSION)

README.md

Lines changed: 39 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -8,86 +8,80 @@ Experimental client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rab
88
### Download
99
---
1010
```
11-
go get -u github.com/rabbitmq/rabbitmq-stream-go-client@v0.3-alpha
11+
go get -u github.com/rabbitmq/rabbitmq-stream-go-client@v0.4-alpha
1212
```
1313

14-
### How to test
14+
### Getting started
1515
---
1616
- Run RabbitMQ docker image with streaming:
1717
```
1818
docker run -it --rm --name rabbitmq -p 5551:5551 -p 5672:5672 -p 15672:15672 \
1919
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-rabbitmq_stream advertised_host localhost" \
2020
pivotalrabbitmq/rabbitmq-stream
21-
2221
```
23-
- Run getting started example:
22+
- Run "getting started" example:
2423
```
2524
go run examples/getting_started.go
2625
```
27-
### Performance Test
28-
---
29-
The performance tool is work in progress, you can use it with docker
30-
```
31-
docker run --network host -it pivotalrabbitmq/go-stream-perf-test silent
32-
```
3326

34-
or directly
27+
### Performance test tool is an easy way to do some test:
3528
```
36-
go run perfTest/perftest.go
29+
go run perfTest/perftest.go silent
3730
```
3831

39-
40-
41-
4232
### API
4333
---
4434

35+
The API are generally composed by mandatory arguments and optional arguments
36+
the optional arguments can be set in the standard go way as:
4537
```golang
46-
client, err := streaming.NewClientCreator().Uri(uris).Connect() // Create and Connect a client
38+
env, err := stream.NewEnvironment(
39+
&stream.EnvironmentOptions{
40+
ConnectionParameters: stream.Broker{
41+
Host: "localhost",
42+
Port: 5551,
43+
User: "guest",
44+
Password: "guest",
45+
},
46+
MaxProducersPerClient: 3,
47+
MaxConsumersPerClient: 3,
48+
},
49+
)
4750
```
48-
49-
```golang
50-
err = client.StreamCreator().Stream(streamName).Create() // Create streaming queue without parameters
51-
err = client.StreamCreator().Stream(streamName).MaxAge(120 * time.Hour).Create() // Create streaming queue with Max Age
52-
err = client.StreamCreator().Stream(streamName).MaxLengthBytes(streaming.ByteCapacity{}.B(5)).Create() // Create streaming queue 5 GB max lenght
51+
or using Builders as:
5352
```
54-
55-
```golang
56-
/// Implement a consumer
57-
consumer, err := client.ConsumerCreator().
58-
Stream(streamName).
59-
Name("my_consumer").
60-
MessagesHandler(func(context streaming.ConsumerContext, message *amqp.Message) {
61-
fmt.Printf("received %d, message %s \n", context.Consumer.ID, message.Data)
62-
}).Build()
53+
env, err := stream.NewEnvironment(
54+
stream.NewEnvironmentOptions().
55+
SetHost("localhost").
56+
SetPort(5551).
57+
SetUser("guest").
58+
SetPassword("guest"))
6359
```
6460

61+
`nil` is also a valid value, default values will be provided:
6562
```golang
66-
/// get a producer
67-
producer, err := client.ProducerCreator().Stream(streamName).Build()
63+
env, err := stream.NewEnvironment(nil)
6864
```
6965

66+
The suggested way is to use builders.
67+
68+
7069
### Build from source
7170
---
7271

7372
```shell
7473
make build
7574
```
7675

76+
You need a docker image running to execute the tests in this way:
77+
```
78+
docker run -it --rm --name rabbitmq -p 5551:5551 \
79+
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-rabbitmq_stream advertised_host localhost" \
80+
pivotalrabbitmq/rabbitmq-stream
81+
```
82+
83+
7784

78-
### Methods Implemented:
79-
---
80-
- Open(vhost)
81-
- CreateStream
82-
- DeleteStream
83-
- DeclarePublisher
84-
- Close Publisher
85-
- Publish
86-
- Subscribe
87-
- Commit
88-
- UnSubscribe
89-
- HeartBeat
90-
9185
### Project status
9286
---
9387
The client is a work in progress, the API(s) could change

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.4-alpha
1+
0.5-alpha

comopose/conf/rabbitmq.config

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
[ { rabbit, [
2+
{cluster_nodes, {['rabbit@rabbit_node_1', 'rabbit@rabbit_node_2'], disc}},
3+
{ loopback_users, [ ] } ] }
4+
].

comopose/docker-compose.yml

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
version: "3"
2+
services:
3+
rabbit_node_1:
4+
environment:
5+
- RABBITMQ_ERLANG_COOKIE='secret_cookie'
6+
networks:
7+
- back
8+
hostname: rabbit_node_1
9+
image: pivotalrabbitmq/rabbitmq-stream
10+
ports:
11+
- "15672:15672"
12+
- "5551:5551"
13+
tty: true
14+
volumes:
15+
# - rabbit1:/var/lib/rabbitmq
16+
- ./conf/:/etc/rabbitmq/
17+
# command: bash -c "sleep 10; rabbitmq-server;"
18+
rabbit_node_2:
19+
environment:
20+
- RABBITMQ_ERLANG_COOKIE='secret_cookie'
21+
networks:
22+
- back
23+
hostname: rabbit_node_2
24+
depends_on:
25+
- rabbit_node_1
26+
image: pivotalrabbitmq/rabbitmq-stream
27+
ports:
28+
- "15673:15672"
29+
- "5552:5551"
30+
tty: true
31+
volumes:
32+
# - rabbit2:/var/lib/rabbitmq
33+
- ./conf/:/etc/rabbitmq/
34+
# command: bash -c "sleep 10; rabbitmq-server;"
35+
#volumes:
36+
# rabbit1:
37+
# driver: local
38+
# rabbit2:
39+
# driver: local
40+
41+
networks:
42+
back:

examples/getting_started.go

Lines changed: 62 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -6,96 +6,90 @@ import (
66
"fmt"
77
"github.com/google/uuid"
88
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
9-
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/streaming"
9+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
1010
"os"
11-
"sync/atomic"
11+
"strconv"
1212
"time"
1313
)
1414

1515
func CheckErr(err error) {
1616
if err != nil {
17-
streaming.ERROR("%s ", err)
17+
fmt.Printf("%s ", err)
18+
os.Exit(1)
1819
}
1920
}
21+
22+
func CreateArrayMessagesForTesting(bacthMessages int) []*amqp.Message {
23+
var arr []*amqp.Message
24+
for z := 0; z < bacthMessages; z++ {
25+
arr = append(arr, amqp.NewMessage([]byte("hello_world_"+strconv.Itoa(z))))
26+
}
27+
return arr
28+
}
29+
2030
func main() {
2131
reader := bufio.NewReader(os.Stdin)
22-
streaming.INFO("Getting started with Streaming client for RabbitMQ")
23-
streaming.INFO("Connecting to RabbitMQ streaming ...")
24-
uris := "rabbitmq-streaming://guest:guest@localhost:5551/%2f"
25-
client, err := streaming.NewClientCreator().
26-
Uri(uris).
27-
Connect() // Create Client
28-
CheckErr(err)
29-
if err != nil {
30-
return
31-
}
32+
// Set log level, not mandatory by default is INFO
33+
stream.SetLevelInfo(stream.DEBUG)
3234

33-
streaming.INFO("Connected to: %s", uris)
34-
streamName := uuid.New().String()
35-
err = client.StreamCreator().Stream(streamName).
36-
Create() // Create the streaming queue
37-
CheckErr(err)
35+
fmt.Println("Getting started with Streaming client for RabbitMQ")
36+
fmt.Println("Connecting to RabbitMQ streaming ...")
37+
//uri := "rabbitmq-streaming://guest:guest@localhost:5551/%2f"
38+
// The environment is a wrapper around the TCP client connections
3839

39-
err = client.StreamCreator().Stream(streamName).
40-
MaxLengthBytes(streaming.ByteCapacity{}.MB(5)).
41-
Create() // Create the streaming queue
40+
env, err := stream.NewEnvironment(
41+
stream.NewEnvironmentOptions().
42+
SetHost("localhost").
43+
SetPort(5551).
44+
SetUser("guest").
45+
SetPassword("guest"))
4246
CheckErr(err)
47+
// Create a stream, you can create streams without any option like:
48+
// err = env.DeclareStream(streamName, nil)
49+
// it is a best practise to define a size, 1GB for example:
50+
streamName := uuid.New().String()
51+
err = env.DeclareStream(streamName,
52+
&stream.StreamOptions{
53+
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
54+
},
55+
)
4356

44-
var count int32
45-
consumer, err := client.ConsumerCreator().
46-
Stream(streamName).
47-
Name(uuid.NewString()).
48-
MessagesHandler(func(context streaming.ConsumerContext, message *amqp.Message) {
49-
streaming.INFO("Message number:%d consumer id:%d data:%s \n",
50-
atomic.AddInt32(&count, 1), context.Consumer.ID,
51-
message.Data)
52-
err := context.Consumer.Commit()
53-
CheckErr(err)
54-
}).Build()
5557
CheckErr(err)
5658

57-
// Get a new producer to publish the messages
58-
clientProducer, err := streaming.NewClientCreator().Uri(uris).
59-
PublishErrorHandler(func(publisherId uint8, publishingId int64, code uint16) {
60-
streaming.ERROR("Publish Error, publisherId %d, code: %s", publisherId, streaming.LookErrorCode(code))
61-
}).
62-
Connect()
63-
CheckErr(err)
64-
producer, err := clientProducer.ProducerCreator().Stream(streamName).Build()
65-
CheckErr(err)
59+
//Define a producer to a stream, optional publish confirmation
60+
producer, err := env.NewProducer(streamName,
61+
stream.NewProducerOptions().SetPublishConfirmHandler(func(ch <-chan []int64) {
62+
messagesIds := <-ch
63+
fmt.Printf("Confirmed %d messages \n \n ", len(messagesIds))
6664

67-
//
68-
numberOfSend := 10
69-
batchSize := 10
65+
}))
66+
CheckErr(err)
7067

71-
// Create AMQP 1.0 messages, see:https://github.com/Azure/go-amqp
72-
// message aggregation
73-
countM := 0
74-
start := time.Now()
75-
for z := 0; z < numberOfSend; z++ {
76-
var arr []*amqp.Message
77-
for f := 0; f < batchSize; f++ {
78-
countM++
79-
arr = append(arr, amqp.NewMessage([]byte(fmt.Sprintf("test_%d", countM))))
80-
}
81-
_, err = producer.BatchPublish(context.Background(), arr) // batch send
82-
if err != nil {
83-
streaming.ERROR("%s", err)
84-
}
68+
// each publish sends a number of messages, the batchMessages should be around 100 messages for send
69+
for i := 0; i < 2; i++ {
70+
_, err := producer.BatchPublish(context.Background(), CreateArrayMessagesForTesting(10))
71+
CheckErr(err)
8572
}
8673

87-
elapsed := time.Since(start)
88-
streaming.INFO("%d messages, published in: %s\n", numberOfSend*batchSize, elapsed)
89-
90-
fmt.Println("Press any key to stop ")
91-
_, _ = reader.ReadString('\n')
74+
// this sleep is not mandatory, just to show the confirmed messages
75+
time.Sleep(1 * time.Second)
9276
err = producer.Close()
9377
CheckErr(err)
94-
err = consumer.UnSubscribe()
95-
CheckErr(err)
96-
err = client.DeleteStream(streamName) // Remove the streaming queue and the data
78+
79+
// Define a consumer per stream, there are different offset options to define a consumer, default is
80+
//env.NewConsumer(streamName, func(Context streaming.ConsumerContext, message *amqp.Message) {
81+
//
82+
//}, nil)
83+
// if you need to track the offset you need a consumer name like:
84+
consumer, err := env.NewConsumer(streamName, func(Context stream.ConsumerContext, message *amqp.Message) {
85+
fmt.Printf("consumer id: %d, text: %s \n ", Context.Consumer.ID, message.Data)
86+
}, stream.NewConsumerOptions().
87+
SetConsumerName("my_consumer"). // gives a name
88+
SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning
9789
CheckErr(err)
98-
err = client.Close()
90+
91+
fmt.Println("Press any key to stop ")
92+
err = consumer.UnSubscribe()
9993
CheckErr(err)
100-
fmt.Println("Bye bye")
94+
_, _ = reader.ReadString('\n')
10195
}

0 commit comments

Comments
 (0)