Skip to content

Commit 676ccf7

Browse files
authored
Add manual commit for super stream consumet (#323)
* Add manual commit to super stream consumer --------- Signed-off-by: Gabriele Santomaggio <g.santomaggio@gmail.com>
1 parent 0324edc commit 676ccf7

File tree

4 files changed

+35
-2
lines changed

4 files changed

+35
-2
lines changed

README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,26 @@ You can read also the java stream-client blog post: https://rabbitmq.github.io/r
570570
571571
Super Stream supports [publish-filtering](#publish-filtering) and [consume-filtering](#consume-filtering) features.
572572
573+
Offset tracking is supported for the Super Stream consumer. </br>
574+
In the same way as the standard stream, you can use the `SetAutoCommit` or `SetManualCommit` option to enable/disable the automatic offset tracking. </br>
575+
576+
On the super stream consumer message handler is possible to identify the partition, the consumer and the offset: </br>
577+
```golang
578+
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
579+
....
580+
consumerContext.Consumer.GetName() // consumer name
581+
consumerContext.Consumer.GetOffset() // current offset
582+
consumerContext.Consumer.GetStreamName() // stream name (partition name )
583+
....
584+
}
585+
```
586+
587+
Manual tracking API:
588+
- `consumerContext.Consumer.StoreOffset()`: stores the current offset.
589+
- `consumerContext.Consumer.StoreCustomOffset(xxx)` stores a custom offset.
590+
591+
Like the standard stream, you should avoid to store the offset for each single message: it will reduce the performances.
592+
573593
574594
### Performance test tool
575595

examples/offsetTracking/offsetTracking.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ func main() {
7171
streamName,
7272
handleMessages,
7373
stream.NewConsumerOptions().
74+
SetManualCommit(). // disable auto commit
7475
SetConsumerName("my_consumer"). // set a consumer name
7576
SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning
7677
CheckErr(err)

examples/single_active_consumer/single_active_consumer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ func main() {
5050
// This is only for the example, in a real application you should not store the offset
5151
// for each message, it is better to store the offset for a batch of messages
5252
err := consumerContext.Consumer.StoreOffset()
53+
5354
CheckErrConsumer(err)
5455
}
5556

pkg/stream/super_stream_consumer.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@ type SuperStreamConsumerOptions struct {
1515
SingleActiveConsumer *SingleActiveConsumer
1616
ConsumerName string
1717
AutoCommitStrategy *AutoCommitStrategy
18+
Autocommit bool
1819
}
1920

2021
func NewSuperStreamConsumerOptions() *SuperStreamConsumerOptions {
2122
return &SuperStreamConsumerOptions{
22-
Offset: OffsetSpecification{}.Next(),
23+
Offset: OffsetSpecification{}.Next(),
24+
Autocommit: false,
2325
}
2426
}
2527

@@ -49,10 +51,16 @@ func (s *SuperStreamConsumerOptions) SetConsumerName(consumerName string) *Super
4951
}
5052

5153
func (s *SuperStreamConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *SuperStreamConsumerOptions {
54+
s.Autocommit = true
5255
s.AutoCommitStrategy = autoCommitStrategy
5356
return s
5457
}
5558

59+
func (s *SuperStreamConsumerOptions) SetManualCommit() *SuperStreamConsumerOptions {
60+
s.Autocommit = false
61+
return s
62+
}
63+
5664
// CPartitionClose is a struct that is used to notify the user when a partition from a consumer is closed
5765
// The user can use the NotifyPartitionClose to get the channel
5866
type CPartitionClose struct {
@@ -167,8 +175,11 @@ func (s *SuperStreamConsumer) ConnectPartition(partition string, offset OffsetSp
167175
}
168176

169177
options = options.SetFilter(s.SuperStreamConsumerOptions.Filter)
170-
if s.SuperStreamConsumerOptions.AutoCommitStrategy != nil {
178+
179+
if s.SuperStreamConsumerOptions.Autocommit {
171180
options = options.SetAutoCommit(s.SuperStreamConsumerOptions.AutoCommitStrategy)
181+
} else {
182+
options = options.SetManualCommit()
172183
}
173184

174185
if s.SuperStreamConsumerOptions.SingleActiveConsumer != nil {

0 commit comments

Comments
 (0)