Skip to content

Commit e36b5e1

Browse files
authored
Expose query Offset to the Environment level (#136)
* Expose query Offset to the Environment level closes #132 Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> * Change version
1 parent e4ae359 commit e36b5e1

File tree

7 files changed

+41
-9
lines changed

7 files changed

+41
-9
lines changed

README.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv
3131
* [Consume messages](#consume-messages)
3232
* [Manual Track Offset](#manual-track-offset)
3333
* [Automatic Track Offset](#automatic-track-offset)
34+
* [Get consumer Offset](#get-consumer-offset)
3435
* [Handle Close](#handle-close)
3536
- [Performance test tool](#performance-test-tool)
3637
* [Performance test tool Docker](#performance-test-tool-docker)
@@ -44,7 +45,7 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv
4445
### Installing
4546

4647
```shell
47-
go get -u github.com/rabbitmq/rabbitmq-stream-go-client@v1.0.0-rc12
48+
go get -u github.com/rabbitmq/rabbitmq-stream-go-client
4849
```
4950

5051
imports:
@@ -423,6 +424,15 @@ stream.NewConsumerOptions().
423424
424425
See also "Automatic Offset Tracking" example in the [examples](./examples/) directory
425426
427+
### Get consumer offset
428+
429+
It is possible to query the consumer offset using:
430+
```golang
431+
offset, err := env.QueryOffset("consumer_name", "streamName")
432+
```
433+
An error is returned if the offset doesn't exist.
434+
435+
426436
### Handle Close
427437
Client provides an interface to handle the producer/consumer close.
428438

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.0.0-rc12
1+
1.0.0-rc.13

change_version

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ go fmt ./...
55
rm pkg/stream/constants.go-e
66

77

8-
sed -i -e "s/.*github.com\/rabbitmq\/rabbitmq-stream-go-client@v*.*/go get -u github.com\/rabbitmq\/rabbitmq-stream-go-client@v$1/" README.md
8+
# sed -i -e "s/.*github.com\/rabbitmq\/rabbitmq-stream-go-client@v*.*/go get -u github.com\/rabbitmq\/rabbitmq-stream-go-client@v$1/" README.md
99
go fmt ./...
10-
rm README.md-e
10+
# rm README.md-e
1111

1212
git add VERSION pkg/stream/constants.go README.md

examples/getting_started.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"github.com/google/uuid"
77
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
8-
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
98
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
109
"os"
1110
"strconv"
@@ -43,7 +42,7 @@ func main() {
4342
reader := bufio.NewReader(os.Stdin)
4443
// Set log level, not mandatory by default is INFO
4544
// you cn set DEBUG for more information
46-
// stream.SetLevelInfo(logs.DEBUG)
45+
// stream.SetLevelInfo(logs.DEBUG)
4746

4847
fmt.Println("Getting started with Streaming client for RabbitMQ")
4948
fmt.Println("Connecting to RabbitMQ streaming ...")

pkg/stream/constants.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const (
1919
const initBufferPublishSize = 2 + 2 + 1 + 4
2020

2121
const (
22-
ClientVersion = "1.0.0-rc12"
22+
ClientVersion = "1.0.0-rc.13"
2323

2424
commandDeclarePublisher = 1
2525
commandPublish = 2

pkg/stream/consumer_test.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,6 @@ var _ = Describe("Streaming Consumers", func() {
205205
Eventually(func() int64 { return consumer.GetLastStoredOffset() }, 5*time.Second).Should(Equal(int64(99)),
206206
"Offset should be 99")
207207
Expect(consumer.Close()).NotTo(HaveOccurred())
208-
209208
})
210209

211210
It("automatically commit by number/time", func() {
@@ -226,7 +225,7 @@ var _ = Describe("Streaming Consumers", func() {
226225
"Offset should be 100")
227226
Expect(consumer.Close()).NotTo(HaveOccurred())
228227
/// When the consumer is closed, it has to save the offset
229-
// so the last offest has to be 105
228+
// so the last offset has to be 105
230229
Eventually(func() int64 {
231230
return consumer.GetLastStoredOffset()
232231
}, 5*time.Second).Should(Equal(int64(105)),
@@ -318,6 +317,9 @@ var _ = Describe("Streaming Consumers", func() {
318317
It("Subscribe/Unsubscribe count messages manual store", func() {
319318
producer, err := env.NewProducer(streamName, nil)
320319
Expect(err).NotTo(HaveOccurred())
320+
// the offset doesn't exist (yet) here for the consumer test
321+
_, err = env.QueryOffset("consumer_test", streamName)
322+
Expect(err).To(HaveOccurred())
321323

322324
Expect(producer.BatchSend(CreateArrayMessagesForTesting(107))).
323325
NotTo(HaveOccurred())
@@ -341,6 +343,13 @@ var _ = Describe("Streaming Consumers", func() {
341343
}, 5*time.Second).Should(Equal(int64(107)),
342344
"Offset should be 107")
343345

346+
offset, err := env.QueryOffset("consumer_test", streamName)
347+
Expect(err).NotTo(HaveOccurred())
348+
Eventually(func() int64 {
349+
return offset
350+
}, 5*time.Second).Should(Equal(int64(107)),
351+
"Offset should be 107")
352+
344353
err = consumer.Close()
345354
Expect(err).NotTo(HaveOccurred())
346355

pkg/stream/enviroment.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,20 @@ func (env *Environment) StreamExists(streamName string) (bool, error) {
154154
return client.StreamExists(streamName), nil
155155
}
156156

157+
func (env *Environment) QueryOffset(consumerName string, streamName string) (int64, error) {
158+
client, err := env.newReconnectClient()
159+
defer func(client *Client) {
160+
err := client.Close()
161+
if err != nil {
162+
return
163+
}
164+
}(client)
165+
if err != nil {
166+
return 0, err
167+
}
168+
return client.queryOffset(consumerName, streamName)
169+
}
170+
157171
func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, error) {
158172
client, err := env.newReconnectClient()
159173
defer func(client *Client) {

0 commit comments

Comments
 (0)