Skip to content

Commit ccf528f

Browse files
authored
Query sequence (#145)
* Add a way to query the last sequence id #144 Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> * Add producer.GetLastPublishingId() function To easly get the last id for the producer Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent fefb3ad commit ccf528f

File tree

11 files changed

+73
-9
lines changed

11 files changed

+73
-9
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,4 @@ tls-gen/
2727
dist/
2828

2929
perfTest/perfTest
30+
go.dev/

Docker/Dockerfile

Whitespace-only changes.

README.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv
4545
### Installing
4646

4747
```shell
48-
go get -u github.com/rabbitmq/rabbitmq-stream-go-client@v1.0.0-rc.13
48+
go get -u github.com/rabbitmq/rabbitmq-stream-go-client
4949
```
5050

5151
imports:
@@ -296,6 +296,11 @@ https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplication/
296296
You can find a "Deduplication" example in the [examples](./examples/) directory. </br>
297297
Run it more than time, the messages count will be always 10.
298298

299+
To retrieve the last sequence id for producer you can use:
300+
```
301+
publishingId, err := producer.GetLastPublishingId()
302+
```
303+
299304
### Sub Entries Batching
300305

301306
The number of messages to put in a sub-entry. A sub-entry is one "slot" in a publishing frame,

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.0.0-rc.13
1+
1.0.1-rc.1

examples/deduplication/deduplication.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,13 @@ func main() {
5353
}
5454
}(chConfirm, producer)
5555

56+
// In case you need to know which is the last ID for the producer: GetLastPublishingId
57+
lastPublishingId, err := producer.GetLastPublishingId()
58+
CheckErr(err)
59+
fmt.Printf("lastPublishingId: %d\n",
60+
lastPublishingId,
61+
)
62+
5663
data := make(map[int]string)
5764
data[0] = "Piaggio"
5865
data[1] = "Ferrari"

pkg/stream/client.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,8 @@ func (c *Client) internalDeclarePublisher(streamName string, producer *Producer)
525525
res := c.handleWrite(b.Bytes(), resp)
526526

527527
if publisherReferenceSize > 0 {
528-
producer.sequence = c.queryPublisherSequence(producer.options.Name, streamName)
528+
v, _ := c.queryPublisherSequence(producer.options.Name, streamName)
529+
producer.sequence = v
529530
}
530531

531532
return res
@@ -560,7 +561,7 @@ func (c *Client) metaData(streams ...string) *StreamsMetadata {
560561
return data.(*StreamsMetadata)
561562
}
562563

563-
func (c *Client) queryPublisherSequence(publisherReference string, stream string) int64 {
564+
func (c *Client) queryPublisherSequence(publisherReference string, stream string) (int64, error) {
564565

565566
length := 2 + 2 + 4 + 2 + len(publisherReference) + 2 + len(stream)
566567
resp := c.coordinator.NewResponse(commandQueryPublisherSequence)
@@ -570,10 +571,13 @@ func (c *Client) queryPublisherSequence(publisherReference string, stream string
570571

571572
writeString(b, publisherReference)
572573
writeString(b, stream)
573-
c.handleWriteWithResponse(b.Bytes(), resp, false)
574+
err := c.handleWriteWithResponse(b.Bytes(), resp, false)
574575
sequence := <-resp.data
575576
_ = c.coordinator.RemoveResponseById(resp.correlationid)
576-
return sequence.(int64)
577+
if err.Err != nil {
578+
return 0, err.Err
579+
}
580+
return sequence.(int64), nil
577581

578582
}
579583

pkg/stream/constants.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const (
1919
const initBufferPublishSize = 2 + 2 + 1 + 4
2020

2121
const (
22-
ClientVersion = "1.0.0-rc.13"
22+
ClientVersion = "1.0.1-rc.1"
2323

2424
commandDeclarePublisher = 1
2525
commandPublish = 2

pkg/stream/consumer_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,12 +259,13 @@ var _ = Describe("Streaming Consumers", func() {
259259
})
260260

261261
It("Deduplication", func() {
262-
producer, err := env.NewProducer(streamName, NewProducerOptions().SetProducerName("producer-ded"))
262+
producerName := "producer-ded"
263+
producer, err := env.NewProducer(streamName, NewProducerOptions().SetProducerName(producerName))
263264
Expect(err).NotTo(HaveOccurred())
264265
var arr []message.StreamMessage
265266
for z := 0; z < 10; z++ {
266267
m := amqp.NewMessage([]byte("test_" + strconv.Itoa(z)))
267-
m.SetPublishingId(int64(z * 10))
268+
m.SetPublishingId(int64(z * 10)) // id stored: the last one should be the same on QuerySequence
268269
arr = append(arr, m)
269270
}
270271

@@ -300,6 +301,19 @@ var _ = Describe("Streaming Consumers", func() {
300301
return atomic.LoadInt32(&messagesReceived)
301302
}, 5*time.Second).Should(Equal(int32(10)),
302303
"consumer should receive only 10 messages")
304+
305+
Eventually(func() int64 {
306+
v, _ := env.QuerySequence(producerName, streamName)
307+
return v
308+
}, 5*time.Second).Should(Equal(int64(90)),
309+
"QuerySequence should give the last id: 90")
310+
311+
Eventually(func() int64 {
312+
v, _ := producer.GetLastPublishingId()
313+
return v
314+
}, 5*time.Second).Should(Equal(int64(90)),
315+
"GetLastPublishingId should give the last id: 90")
316+
303317
Expect(producer.Close()).NotTo(HaveOccurred())
304318
Expect(consumer.Close()).NotTo(HaveOccurred())
305319
})

pkg/stream/enviroment.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,22 @@ func (env *Environment) QueryOffset(consumerName string, streamName string) (int
168168
return client.queryOffset(consumerName, streamName)
169169
}
170170

171+
// QuerySequence gets the last id stored for a producer
172+
// you can also see producer.GetLastPublishingId() that is the easier way to get the last-id
173+
func (env *Environment) QuerySequence(publisherReference string, streamName string) (int64, error) {
174+
client, err := env.newReconnectClient()
175+
defer func(client *Client) {
176+
err := client.Close()
177+
if err != nil {
178+
return
179+
}
180+
}(client)
181+
if err != nil {
182+
return 0, err
183+
}
184+
return client.queryPublisherSequence(publisherReference, streamName)
185+
}
186+
171187
func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, error) {
172188
client, err := env.newReconnectClient()
173189
defer func(client *Client) {

pkg/stream/enviroment_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,20 @@ var _ = Describe("Environment test", func() {
261261

262262
})
263263

264+
Describe("Validation Query Offset/Sequence", func() {
265+
266+
env, err := NewEnvironment(NewEnvironmentOptions())
267+
Expect(err).NotTo(HaveOccurred())
268+
_, err = env.QuerySequence("my_prod",
269+
"Stream_Doesnt_exist")
270+
Expect(err).To(HaveOccurred())
271+
272+
_, err = env.QueryOffset("my_cons",
273+
"Stream_Doesnt_exist")
274+
Expect(err).To(HaveOccurred())
275+
Expect(env.Close()).NotTo(HaveOccurred())
276+
})
277+
264278
Describe("Stream Existing/Meta data", func() {
265279

266280
env, err := NewEnvironment(NewEnvironmentOptions().SetPort(5552).

0 commit comments

Comments
 (0)