Skip to content

Commit ffd70f7

Browse files
authored
add auto-commit to the super stream consumer (#305)
* add auto-commit to the super stream consumer * closes: #303 --------- Signed-off-by: Gabriele Santomaggio <g.santomaggio@gmail.com>
1 parent 0284451 commit ffd70f7

File tree

3 files changed

+79
-1
lines changed

3 files changed

+79
-1
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ Use `StreamExists` to check if a stream exists.
221221

222222
### Streams Statistics
223223

224-
To get stream statistics you need to use the the `environment.StreamStats` method.
224+
To get stream statistics you need to use the `environment.StreamStats` method.
225225

226226
```golang
227227
stats, err := environment.StreamStats(testStreamName)

pkg/stream/super_stream_consumer.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ type SuperStreamConsumerOptions struct {
1414
Filter *ConsumerFilter
1515
SingleActiveConsumer *SingleActiveConsumer
1616
ConsumerName string
17+
AutoCommitStrategy *AutoCommitStrategy
1718
}
1819

1920
func NewSuperStreamConsumerOptions() *SuperStreamConsumerOptions {
@@ -47,6 +48,11 @@ func (s *SuperStreamConsumerOptions) SetConsumerName(consumerName string) *Super
4748
return s
4849
}
4950

51+
func (s *SuperStreamConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *SuperStreamConsumerOptions {
52+
s.AutoCommitStrategy = autoCommitStrategy
53+
return s
54+
}
55+
5056
// CPartitionClose is a struct that is used to notify the user when a partition from a consumer is closed
5157
// The user can use the NotifyPartitionClose to get the channel
5258
type CPartitionClose struct {
@@ -161,6 +167,9 @@ func (s *SuperStreamConsumer) ConnectPartition(partition string, offset OffsetSp
161167
}
162168

163169
options = options.SetFilter(s.SuperStreamConsumerOptions.Filter)
170+
if s.SuperStreamConsumerOptions.AutoCommitStrategy != nil {
171+
options = options.SetAutoCommit(s.SuperStreamConsumerOptions.AutoCommitStrategy)
172+
}
164173

165174
if s.SuperStreamConsumerOptions.SingleActiveConsumer != nil {
166175
// mandatory to enable the super stream consumer

pkg/stream/super_stream_consumer_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
88
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
99
test_helper "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/test-helper"
10+
"strconv"
1011
"sync"
1112
"sync/atomic"
1213
"time"
@@ -509,4 +510,72 @@ var _ = Describe("Super Stream Producer", Label("super-stream-consumer"), func()
509510
Expect(env.Close()).NotTo(HaveOccurred())
510511
})
511512

513+
It("Super Stream Consumer AutoCommit", func() {
514+
// test the auto commit
515+
env, err := NewEnvironment(nil)
516+
Expect(err).NotTo(HaveOccurred())
517+
518+
superStream := "super-stream-consumer-with-autocommit"
519+
Expect(env.DeclareSuperStream(superStream,
520+
NewPartitionsOptions(2))).NotTo(HaveOccurred())
521+
522+
superProducer, err := env.NewSuperStreamProducer(superStream, NewSuperStreamProducerOptions(
523+
NewHashRoutingStrategy(func(message message.StreamMessage) string {
524+
return message.GetMessageProperties().GroupID
525+
})))
526+
Expect(err).NotTo(HaveOccurred())
527+
528+
for i := 0; i < 20; i++ {
529+
msg := amqp.NewMessage(make([]byte, 0))
530+
msg.Properties = &amqp.MessageProperties{
531+
GroupID: strconv.Itoa(i % 2),
532+
}
533+
Expect(superProducer.Send(msg)).NotTo(HaveOccurred())
534+
}
535+
536+
var receivedMessages int32
537+
handleMessages := func(consumerContext ConsumerContext, message *amqp.Message) {
538+
atomic.AddInt32(&receivedMessages, 1)
539+
}
540+
541+
superStreamConsumer, err := env.NewSuperStreamConsumer(superStream, handleMessages,
542+
NewSuperStreamConsumerOptions().
543+
SetOffset(OffsetSpecification{}.First()).
544+
SetConsumerName("auto-commit-consumer").
545+
// the setting is to trigger the auto commit based on the message count
546+
// the consumer will commit the offset after 9 messages
547+
SetAutoCommit(&AutoCommitStrategy{
548+
messageCountBeforeStorage: 9,
549+
// flushInterval is set to 50 seconds. So it will be ignored
550+
// messageCountBeforeStorage will be triggered first
551+
flushInterval: 50 * time.Second,
552+
}))
553+
Expect(err).NotTo(HaveOccurred())
554+
555+
time.Sleep(1 * time.Second)
556+
Eventually(func() int32 {
557+
return atomic.LoadInt32(&receivedMessages)
558+
}).
559+
WithPolling(300 * time.Millisecond).WithTimeout(5 * time.Second).Should(Equal(int32(20)))
560+
561+
// Given the partition routing strategy, the consumer will receive 10 messages from each partition
562+
// the consumer triggers the auto-commit after 9 messages.
563+
// So the query offset should return 8 for each partition
564+
offset0, err := env.QueryOffset("auto-commit-consumer", fmt.Sprintf("%s-0", superStream))
565+
Expect(err).NotTo(HaveOccurred())
566+
offset1, err := env.QueryOffset("auto-commit-consumer", fmt.Sprintf("%s-1", superStream))
567+
Expect(err).NotTo(HaveOccurred())
568+
569+
Expect(offset0).NotTo(BeNil())
570+
Expect(offset0).To(Equal(int64(8)))
571+
572+
Expect(offset1).NotTo(BeNil())
573+
Expect(offset1).To(Equal(int64(8)))
574+
575+
Expect(superProducer.Close()).NotTo(HaveOccurred())
576+
Expect(superStreamConsumer.Close()).NotTo(HaveOccurred())
577+
Expect(env.DeleteSuperStream(superStream)).NotTo(HaveOccurred())
578+
Expect(env.Close()).NotTo(HaveOccurred())
579+
})
580+
512581
})

0 commit comments

Comments
 (0)