Skip to content

Commit 427efdd

Browse files
authored
Consumer replica conn (#42)
* Add consumer broker replica * add URIs configuration * ha producer - first implementatio
1 parent a8eab98 commit 427efdd

38 files changed

+1387
-311
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ examples/examples
2020
vet
2121
.DS_Store
2222
perfTest/
23+
.vagrant/

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ test: vet fmt check
2626
go test -v ./pkg/stream -race -coverprofile=coverage.txt -covermode=atomic -tags debug
2727

2828
integration-test: vet fmt check
29-
go test -v ./pkg/system_integration -race -coverprofile=coverage.txt -covermode=atomic -tags debug
29+
cd ./pkg/system_integration && go test -v . -race -coverprofile=coverage.txt -covermode=atomic -tags debug -timeout 99999s
3030

3131
build: vet fmt check
3232
go build -ldflags=$(LDFLAGS) -v ./...

README.md

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,19 @@
33
![Build](https://github.com/rabbitmq/rabbitmq-stream-go-client/workflows/Build/badge.svg)
44
[![codecov](https://codecov.io/gh/Gsantomaggio/go-stream-client/branch/main/graph/badge.svg?token=HZD4S71QIM)](https://codecov.io/gh/Gsantomaggio/go-stream-client)
55

6-
Experimental client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-server/tree/master/deps/rabbitmq_stream)
6+
Experimental client
7+
for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-server/tree/master/deps/rabbitmq_stream)
78

89
### Download
910
---
11+
1012
```
1113
go get -u github.com/rabbitmq/rabbitmq-stream-go-client@v0.5-alpha
1214
```
1315

1416
### Getting started
1517
---
18+
1619
- Run RabbitMQ docker image with streaming:
1720
```
1821
docker run -it --rm --name rabbitmq -p 5552:5552 -p 5672:5672 -p 15672:15672 \
@@ -25,15 +28,17 @@ go get -u github.com/rabbitmq/rabbitmq-stream-go-client@v0.5-alpha
2528
```
2629

2730
### Performance test tool is an easy way to do some test:
31+
2832
```
2933
go run perfTest/perftest.go silent
3034
```
3135

3236
### API
3337
---
3438

35-
The API are generally composed by mandatory arguments and optional arguments
36-
the optional arguments can be set in the standard go way as:
39+
The API are generally composed by mandatory arguments and optional arguments the optional arguments can be set in the
40+
standard go way as:
41+
3742
```golang
3843
env, err := stream.NewEnvironment(
3944
&stream.EnvironmentOptions{
@@ -48,7 +53,9 @@ env, err := stream.NewEnvironment(
4853
},
4954
)
5055
```
56+
5157
or using Builders as:
58+
5259
```golang
5360
env, err := stream.NewEnvironment(
5461
stream.NewEnvironmentOptions().
@@ -59,6 +66,7 @@ env, err := stream.NewEnvironment(
5966
```
6067

6168
`nil` is also a valid value, default values will be provided:
69+
6270
```golang
6371
env, err := stream.NewEnvironment(nil)
6472
```
@@ -74,14 +82,13 @@ make build
7482
```
7583

7684
You need a docker image running to execute the tests in this way:
85+
7786
```
7887
docker run -it --rm --name rabbitmq -p 5552:5552 \
7988
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-rabbitmq_stream advertised_host localhost" \
8089
pivotalrabbitmq/rabbitmq-stream
8190
```
8291

83-
84-
85-
### Project status
86-
---
87-
The client is a work in progress, the API(s) could change
92+
### Project status
93+
---
94+
The client is a work in progress, the API(s) could change

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.5-alpha
1+
0.6-alpha

comopose/conf/enabled_plugins

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
[rabbitmq_management, rabbitmq_stream, rabbitmq_stream_management].

comopose/conf/rabbitmq.conf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
2+
3+
cluster_formation.classic_config.nodes.1 = rabbit@node0
4+
cluster_formation.classic_config.nodes.2 = rabbit@node1
5+
cluster_formation.classic_config.nodes.3 = rabbit@node2
6+
loopback_users.guest = false

comopose/conf/rabbitmq.config

Lines changed: 0 additions & 4 deletions
This file was deleted.

comopose/docker-compose.yml

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,51 @@
11
version: "3"
22
services:
3-
rabbit_node_1:
3+
rabbit_node0:
44
environment:
55
- RABBITMQ_ERLANG_COOKIE='secret_cookie'
6+
- RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbitmq_stream advertised_host localhost advertised_port 5552
67
networks:
78
- back
8-
hostname: rabbit_node_1
9+
hostname: node0
910
image: pivotalrabbitmq/rabbitmq-stream
1011
ports:
1112
- "15672:15672"
1213
- "5552:5552"
1314
tty: true
1415
volumes:
15-
# - rabbit1:/var/lib/rabbitmq
16+
- ./conf/:/etc/rabbitmq/
17+
rabbit_node1:
18+
environment:
19+
- RABBITMQ_ERLANG_COOKIE='secret_cookie'
20+
- RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbitmq_stream advertised_host localhost advertised_port 5553
21+
networks:
22+
- back
23+
hostname: node1
24+
depends_on:
25+
- rabbit_node_0
26+
image: pivotalrabbitmq/rabbitmq-stream
27+
ports:
28+
- "15673:15672"
29+
- "5553:5552"
30+
tty: true
31+
volumes:
1632
- ./conf/:/etc/rabbitmq/
17-
# command: bash -c "sleep 10; rabbitmq-server;"
18-
rabbit_node_2:
33+
rabbit_node2:
1934
environment:
2035
- RABBITMQ_ERLANG_COOKIE='secret_cookie'
36+
- RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbitmq_stream advertised_host localhost advertised_port 5554
2137
networks:
2238
- back
23-
hostname: rabbit_node_2
39+
hostname: node2
2440
depends_on:
2541
- rabbit_node_1
2642
image: pivotalrabbitmq/rabbitmq-stream
2743
ports:
28-
- "15673:15672"
44+
- "15674:15672"
2945
- "5554:5552"
3046
tty: true
3147
volumes:
32-
# - rabbit2:/var/lib/rabbitmq
3348
- ./conf/:/etc/rabbitmq/
34-
# command: bash -c "sleep 10; rabbitmq-server;"
35-
#volumes:
36-
# rabbit1:
37-
# driver: local
38-
# rabbit2:
39-
# driver: local
4049

4150
networks:
4251
back:

examples/getting_started.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@ func CreateArrayMessagesForTesting(bacthMessages int) []*amqp.Message {
2929

3030
func handlePublishConfirm(confirms stream.ChannelPublishConfirm) {
3131
go func() {
32-
messagesIds := <-confirms
33-
for _, m := range messagesIds {
34-
fmt.Printf("Confirmed %s message \n ", m.Message.Data)
32+
for messagesIds := range confirms {
33+
for _, m := range messagesIds {
34+
fmt.Printf("Confirmed %s message \n ", m.Message.Data)
35+
}
3536
}
3637
}()
3738
}
@@ -59,6 +60,7 @@ func main() {
5960
// Create a stream, you can create streams without any option like:
6061
// err = env.DeclareStream(streamName, nil)
6162
// it is a best practise to define a size, 1GB for example:
63+
6264
streamName := uuid.New().String()
6365
err = env.DeclareStream(streamName,
6466
&stream.StreamOptions{

examples/haProducer/producer.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"fmt"
7+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
8+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha"
9+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
10+
"os"
11+
"strconv"
12+
"sync/atomic"
13+
"time"
14+
)
15+
16+
func CheckErr(err error) {
17+
if err != nil {
18+
fmt.Printf("%s ", err)
19+
os.Exit(1)
20+
}
21+
}
22+
23+
var idx = 0
24+
25+
func CreateArrayMessagesForTesting(bacthMessages int) []*amqp.Message {
26+
var arr []*amqp.Message
27+
for z := 0; z < bacthMessages; z++ {
28+
idx++
29+
arr = append(arr, amqp.NewMessage([]byte(strconv.Itoa(idx))))
30+
}
31+
return arr
32+
}
33+
34+
func handlePublishConfirm(confirms stream.ChannelPublishConfirm) {
35+
var counter int32 = 0
36+
go func() {
37+
for messagesIds := range confirms {
38+
for _, m := range messagesIds {
39+
if !m.Confirmed {
40+
if atomic.AddInt32(&counter, 1)%10 == 0 {
41+
fmt.Printf("Confirmed %s message - status %t - %d \n ", m.Message.Data, m.Confirmed, atomic.LoadInt32(&counter))
42+
}
43+
}
44+
}
45+
}
46+
}()
47+
}
48+
49+
func main() {
50+
reader := bufio.NewReader(os.Stdin)
51+
// Set log level, not mandatory by default is INFO
52+
//stream.SetLevelInfo(stream.DEBUG)
53+
54+
fmt.Println("Getting started with Streaming client for RabbitMQ")
55+
fmt.Println("Connecting to RabbitMQ streaming ...")
56+
57+
env, err := stream.NewEnvironment(
58+
stream.NewEnvironmentOptions().
59+
SetHost("localhost").
60+
SetPort(5552).
61+
SetUser("guest").
62+
SetPassword("guest"))
63+
CheckErr(err)
64+
// Create a stream, you can create streams without any option like:
65+
// err = env.DeclareStream(streamName, nil)
66+
// it is a best practise to define a size, 1GB for example:
67+
68+
streamName := "test"
69+
err = env.DeclareStream(streamName,
70+
&stream.StreamOptions{
71+
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
72+
},
73+
)
74+
75+
rProducer, err := ha.NewHAProducer(env, streamName, "producer-ha")
76+
CheckErr(err)
77+
78+
chPublishConfirm := rProducer.NotifyPublishConfirmation()
79+
handlePublishConfirm(chPublishConfirm)
80+
time.Sleep(4 * time.Second)
81+
for i := 0; i < 1000000; i++ {
82+
err := rProducer.BatchPublish(CreateArrayMessagesForTesting(10))
83+
time.Sleep(10 * time.Millisecond)
84+
if i%1000 == 0 {
85+
fmt.Println("sent.. " + strconv.Itoa(i))
86+
}
87+
CheckErr(err)
88+
}
89+
90+
fmt.Println("Press any key to start consuming ")
91+
_, _ = reader.ReadString('\n')
92+
93+
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
94+
fmt.Printf("messages consumed: %s \n ", message.Data)
95+
}
96+
97+
consumer, err := env.NewConsumer(context.TODO(), streamName,
98+
handleMessages,
99+
stream.NewConsumerOptions().
100+
SetConsumerName("my_consumer"))
101+
CheckErr(err)
102+
103+
fmt.Println("Press any key to stop ")
104+
_, _ = reader.ReadString('\n')
105+
time.Sleep(200 * time.Millisecond)
106+
err = rProducer.Close()
107+
CheckErr(err)
108+
err = consumer.Close()
109+
CheckErr(err)
110+
err = env.DeleteStream(streamName)
111+
CheckErr(err)
112+
err = env.Close()
113+
CheckErr(err)
114+
}

0 commit comments

Comments
 (0)