Skip to content

Commit 8ed32be

Browse files
authored
Handle Consumer closing (#58)
fixes: #57 fixes: #60
1 parent 89ba0a8 commit 8ed32be

18 files changed

+257
-80
lines changed

Makefile

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,21 @@ check: $(STATICCHECK)
2323
$(STATICCHECK) ./pkg/stream
2424

2525
test: vet fmt check
26-
go test --tags=debug -v ./pkg/stream -race -coverprofile=coverage.txt -covermode=atomic #-ginkgo.v
26+
go test --tags=debug -v ./pkg/stream -coverprofile=coverage.txt -covermode=atomic #-ginkgo.v
27+
28+
build-all: vet fmt check build-darwin build-windows build-linux
29+
go test --tags=debug -v -race ./pkg/stream -coverprofile=coverage.txt -covermode=atomic #-ginkgo.v
2730

2831
integration-test: vet fmt check
2932
cd ./pkg/system_integration && go test -v . -race -coverprofile=coverage.txt -covermode=atomic -tags debug -timeout 99999s
3033

34+
build-%: vet fmt check
35+
GOOS=$(*) GOARCH=amd64 go build -ldflags=$(LDFLAGS) -v ./...
36+
3137
build: vet fmt check
3238
go build -ldflags=$(LDFLAGS) -v ./...
3339

34-
PERFTEST_FLAGS ?= --producers 1 --consumers 1
40+
PERFTEST_FLAGS ?= --publishers 1 --consumers 1
3541
perf-test-run: perf-test-build
3642
go run perfTest/perftest.go silent $(PERFTEST_FLAGS)
3743

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
# GO stream client for RabbitMQ streaming queues
22
---
33
![Build](https://github.com/rabbitmq/rabbitmq-stream-go-client/workflows/Build/badge.svg)
4+
[![codecov](https://codecov.io/gh/rabbitmq/rabbitmq-stream-go-client/branch/main/graph/badge.svg?token=HZD4S71QIM)](https://codecov.io/gh/rabbitmq/rabbitmq-stream-go-client)
45

56
Experimental client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-server/tree/master/deps/rabbitmq_stream)
67

78
### Download
89
---
910

1011
```
11-
go get -u github.com/rabbitmq/rabbitmq-stream-go-client@v0.9-alpha
12+
go get -u github.com/rabbitmq/rabbitmq-stream-go-client@v0.10-alpha
1213
```
1314

1415
### Getting started

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.9-alpha
1+
0.10-alpha

examples/publishersError/publisherError.go

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"bufio"
55
"fmt"
66
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
7-
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
87
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
98

109
//"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
@@ -20,14 +19,6 @@ func CheckErr(err error) {
2019
}
2120
}
2221

23-
func CreateArrayMessagesForTesting(bacthMessages int) []message.StreamMessage {
24-
var arr []message.StreamMessage
25-
for z := 0; z < bacthMessages; z++ {
26-
arr = append(arr, amqp.NewMessage([]byte("hello_world_"+strconv.Itoa(z))))
27-
}
28-
return arr
29-
}
30-
3122
func main() {
3223
reader := bufio.NewReader(os.Stdin)
3324

@@ -41,45 +32,43 @@ func main() {
4132
SetUser("test").
4233
SetPassword("test"))
4334
CheckErr(err)
44-
streamName := "no"
45-
//err = env.DeclareStream(streamName,
46-
// &stream.StreamOptions{
47-
// MaxLengthBytes: stream.ByteCapacity{}.GB(2),
48-
// },
49-
//)
50-
//CheckErr(err)
35+
streamName := "pub-error"
36+
err = env.DeclareStream(streamName,
37+
&stream.StreamOptions{
38+
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
39+
},
40+
)
5141

52-
producer, err := env.NewProducer(streamName, &stream.ProducerOptions{Name: "myProducer"})
42+
CheckErr(err)
43+
44+
producer, err := env.NewProducer(streamName, stream.NewProducerOptions().SetProducerName("myProducer"))
5345
CheckErr(err)
5446

5547
// This channel receives the callback in case the is some error during the
5648
// publisher.
5749
chPublishError := producer.NotifyPublishError()
5850
handlePublishError(chPublishError)
5951

60-
go func() {
61-
for i := 0; i < 100; i++ {
62-
err := producer.BatchSend(CreateArrayMessagesForTesting(2))
63-
CheckErr(err)
64-
}
65-
}()
66-
// Here we close the producer during the publishing
67-
// so an publish error is raised
68-
69-
err = producer.Close()
70-
CheckErr(err)
52+
for i := 0; i < 100; i++ {
53+
msg := amqp.NewMessage([]byte("hello_world_" + strconv.Itoa(i)))
54+
err := producer.Send(msg)
55+
CheckErr(err)
56+
}
7157

7258
fmt.Println("Press any key to stop ")
7359
_, _ = reader.ReadString('\n')
7460
CheckErr(err)
61+
err = env.DeleteStream(streamName)
62+
CheckErr(err)
63+
err = env.Close()
64+
CheckErr(err)
7565

7666
}
7767

7868
func handlePublishError(publishError stream.ChannelPublishError) {
7969
go func() {
8070
var totalMessages int32
81-
for {
82-
pError := <-publishError
71+
for pError := range publishError {
8372
atomic.AddInt32(&totalMessages, 1)
8473
var data [][]byte
8574
if pError.UnConfirmedMessage != nil {

pkg/ha/ha_publisher.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,6 @@ func NewHAProducer(env *stream.Environment, streamName string, producerOptions *
7373

7474
func (p *ReliableProducer) newProducer() error {
7575

76-
//if p.producer != nil && len(p.producer.GetUnConfirmed()) > 0 {
77-
// for _, msg := range p.producer.GetUnConfirmed() {
78-
// msg.Confirmed = false
79-
// p.channelPublishConfirm <- []*stream.UnConfirmedMessage{msg}
80-
// }
81-
//
82-
//}
83-
8476
producer, err := p.env.NewProducer(p.streamName, p.producerOptions)
8577
if err != nil {
8678
return err

pkg/stream/brokers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (br *Broker) mergeWithDefault() {
5252
if br.Password == "" {
5353
br.Password = broker.Password
5454
}
55-
if br.Port == "" {
55+
if br.Port == "" || br.Port == "0" {
5656
br.Port = broker.Port
5757
}
5858
if br.Scheme == "" {

pkg/stream/client.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,17 @@ func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (
440440
if options == nil {
441441
options = NewProducerOptions()
442442
}
443+
444+
if options.QueueSize < minQueuePublisherSize || options.QueueSize > maxQueuePublisherSize {
445+
return nil, fmt.Errorf("QueueSize values must be between %d and %d",
446+
minQueuePublisherSize, maxQueuePublisherSize)
447+
}
448+
449+
if options.BatchSize < minBatchSize || options.BatchSize > maxBatchSize {
450+
return nil, fmt.Errorf("BatchSize values must be between %d and %d",
451+
minBatchSize, maxBatchSize)
452+
}
453+
443454
producer, err := c.coordinator.NewProducer(&ProducerOptions{
444455
client: c,
445456
streamName: streamName,
@@ -621,6 +632,10 @@ func (c *Client) DeclareSubscriber(streamName string,
621632
options = NewConsumerOptions()
622633
}
623634

635+
if options.Offset.typeOfs <= 0 || options.Offset.typeOfs > 6 {
636+
return nil, fmt.Errorf("specify a valid Offset")
637+
}
638+
624639
options.client = c
625640
options.streamName = streamName
626641
consumer := c.coordinator.NewConsumer(messagesHandler, options)

pkg/stream/constants.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ import (
1111
// for example the producer status
1212

1313
const (
14-
running = iota
15-
closed = iota
14+
open = iota
15+
closed = iota
1616
)
1717

1818
const initBufferPublishSize = 2 + 2 + 1 + 4
@@ -76,14 +76,22 @@ const (
7676
LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f"
7777

7878
///
79-
defaultSocketBuffer = 4096
80-
79+
defaultSocketBuffer = 4096
80+
defaultQueuePublisherSize = 10000
81+
minQueuePublisherSize = 100
82+
maxQueuePublisherSize = 1_000_000
83+
84+
minBatchSize = 1
85+
maxBatchSize = 10_000
86+
defaultBatchSize = 100
8187
//
82-
ClientVersion = "0.9-alpha"
88+
ClientVersion = "0.10-alpha"
8389

8490
StreamTcpPort = "5552"
8591
)
8692

93+
var AlreadyClosed = errors.New("Already Closed")
94+
8795
var PreconditionFailed = errors.New("Precondition Failed")
8896
var AuthenticationFailure = errors.New("Authentication Failure")
8997
var StreamDoesNotExist = errors.New("Stream Does Not Exist")

pkg/stream/consumer.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,21 @@ type Consumer struct {
1818
// different form ConsumerOptions.offset. ConsumerOptions.offset is just the configuration
1919
// and won't change. currentOffset is the status of the offset
2020
currentOffset int64
21-
CloseHandler chan Event
21+
closeHandler chan Event
22+
23+
status int
24+
}
25+
26+
func (consumer *Consumer) setStatus(status int) {
27+
consumer.mutex.Lock()
28+
defer consumer.mutex.Unlock()
29+
consumer.status = status
30+
}
31+
32+
func (consumer *Consumer) getStatus() int {
33+
consumer.mutex.Lock()
34+
defer consumer.mutex.Unlock()
35+
return consumer.status
2236
}
2337

2438
func (consumer *Consumer) GetStreamName() string {
@@ -50,7 +64,7 @@ func (consumer *Consumer) GetOffset() int64 {
5064

5165
func (consumer *Consumer) NotifyClose() ChannelClose {
5266
ch := make(chan Event, 1)
53-
consumer.CloseHandler = ch
67+
consumer.closeHandler = ch
5468
return ch
5569
}
5670

@@ -60,7 +74,7 @@ type ConsumerContext struct {
6074

6175
type MessagesHandler func(consumerContext ConsumerContext, message *amqp.Message)
6276

63-
type ConsumerOptions struct {
77+
type /**/ ConsumerOptions struct {
6478
client *Client
6579
ConsumerName string
6680
streamName string
@@ -105,6 +119,10 @@ func (c *Client) credit(subscriptionId byte, credit int16) {
105119
}
106120

107121
func (consumer *Consumer) Close() error {
122+
if consumer.getStatus() == closed {
123+
return AlreadyClosed
124+
}
125+
consumer.setStatus(closed)
108126
_, errGet := consumer.options.client.coordinator.GetConsumerById(consumer.ID)
109127
if errGet != nil {
110128
return nil

pkg/stream/consumer_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,4 +239,41 @@ var _ = Describe("Streaming Consumers", func() {
239239
Expect(err).NotTo(HaveOccurred())
240240
})
241241

242+
It("Check already closed", func() {
243+
producer, err := env.NewProducer(streamName, nil)
244+
Expect(err).NotTo(HaveOccurred())
245+
err = producer.BatchSend(CreateArrayMessagesForTesting(500)) // batch send
246+
Expect(err).NotTo(HaveOccurred())
247+
defer func(producer *Producer) {
248+
err := producer.Close()
249+
Expect(err).NotTo(HaveOccurred())
250+
}(producer)
251+
252+
var messagesCount int32 = 0
253+
consumer, err := env.NewConsumer(streamName,
254+
func(consumerContext ConsumerContext, message *amqp.Message) {
255+
if atomic.AddInt32(&messagesCount, 1) >= 250 {
256+
err := consumerContext.Consumer.Close()
257+
if err != nil {
258+
return
259+
}
260+
}
261+
}, NewConsumerOptions().SetOffset(OffsetSpecification{}.First()).SetConsumerName("consumer_test"))
262+
Expect(err).NotTo(HaveOccurred())
263+
time.Sleep(500 * time.Millisecond)
264+
err = consumer.Close()
265+
Expect(err).To(Equal(AlreadyClosed))
266+
267+
})
268+
269+
It("Validation", func() {
270+
_, err := env.NewConsumer(streamName,
271+
func(consumerContext ConsumerContext, message *amqp.Message) {
272+
}, &ConsumerOptions{
273+
Offset: OffsetSpecification{},
274+
})
275+
Expect(err).To(HaveOccurred())
276+
277+
})
278+
242279
})

0 commit comments

Comments
 (0)