Skip to content

Commit 5cadb98

Browse files
authored
Implement stream stats (#196)
* Implement stream stats Closes: #190 Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent ec6e973 commit 5cadb98

File tree

7 files changed

+211
-17
lines changed

7 files changed

+211
-17
lines changed

README.md

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv
2222
* [Load Balancer](#load-balancer)
2323
* [TLS](#tls)
2424
* [Streams](#streams)
25+
* [Statistics](#streams-statistics)
2526
* [Publish messages](#publish-messages)
2627
* [`Send` vs `BatchSend`](#send-vs-batchsend)
2728
* [Publish Confirmation](#publish-confirmation)
@@ -58,7 +59,7 @@ imports:
5859
### Run server with Docker
5960
---
6061
You may need a server to test locally. Let's start the broker:
61-
```shell
62+
```shell
6263
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672\
6364
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost -rabbit loopback_users "none"' \
6465
rabbitmq:3.9-management
@@ -182,6 +183,35 @@ The function `DeclareStream` doesn't return errors if a stream is already define
182183
Note that it returns the precondition failed when it doesn't have the same parameters
183184
Use `StreamExists` to check if a stream exists.
184185

186+
### Streams Statistics
187+
188+
To get stream statistics you need to use the the `environment.StreamStats` method.
189+
190+
```golang
191+
stats, err := environment.StreamStats(testStreamName)
192+
193+
// FirstOffset - The first offset in the stream.
194+
// return first offset in the stream /
195+
// Error if there is no first offset yet
196+
197+
firstOffset, err := stats.FirstOffset() // first offset of the stream
198+
199+
// LastOffset - The last offset in the stream.
200+
// return last offset in the stream
201+
// error if there is no first offset yet
202+
lastOffset, err := stats.LastOffset() // last offset of the stream
203+
204+
// CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
205+
//
206+
// It is the offset of the first message in the last chunk confirmed by a quorum of the stream
207+
// cluster members (leader and replicas).
208+
//
209+
// The committed chunk ID is a good indication of what the last offset of a stream can be at a
210+
// given time. The value can be stale as soon as the application reads it though, as the committed
211+
// chunk ID for a stream that is published to changes all the time.
212+
213+
committedChunkId, err := statsAfter.CommittedChunkId()
214+
```
185215

186216
### Publish messages
187217

@@ -241,14 +271,14 @@ The `Send` interface works in most of the cases, In some condition is about 15/2
241271

242272
### Publish Confirmation
243273

244-
For each publish the server sends back to the client the confirmation or an error.
274+
For each publish the server sends back to the client the confirmation or an error.
245275
The client provides an interface to receive the confirmation:
246276

247277
```golang
248278
//optional publish confirmation channel
249279
chPublishConfirm := producer.NotifyPublishConfirmation()
250280
handlePublishConfirm(chPublishConfirm)
251-
281+
252282
func handlePublishConfirm(confirms stream.ChannelPublishConfirm) {
253283
go func() {
254284
for confirmed := range confirms {
@@ -264,7 +294,7 @@ func handlePublishConfirm(confirms stream.ChannelPublishConfirm) {
264294
}
265295
```
266296

267-
In the MessageStatus struct you can find two `publishingId`:
297+
In the MessageStatus struct you can find two `publishingId`:
268298
```golang
269299
//first one
270300
messageStatus.GetMessage().GetPublishingId()
@@ -277,12 +307,12 @@ The second one is assigned automatically by the client.
277307
In case the user specifies the `publishingId` with:
278308
```golang
279309
msg = amqp.NewMessage([]byte("mymessage"))
280-
msg.SetPublishingId(18) // <---
310+
msg.SetPublishingId(18) // <---
281311
```
282312

283313

284314
The filed: `messageStatus.GetMessage().HasPublishingId()` is true and </br>
285-
the values `messageStatus.GetMessage().GetPublishingId()` and `messageStatus.GetPublishingId()` are the same.
315+
the values `messageStatus.GetMessage().GetPublishingId()` and `messageStatus.GetPublishingId()` are the same.
286316

287317

288318
See also "Getting started" example in the [examples](./examples/) directory
@@ -303,8 +333,8 @@ publishingId, err := producer.GetLastPublishingId()
303333

304334
### Sub Entries Batching
305335

306-
The number of messages to put in a sub-entry. A sub-entry is one "slot" in a publishing frame,
307-
meaning outbound messages are not only batched in publishing frames, but in sub-entries as well.
336+
The number of messages to put in a sub-entry. A sub-entry is one "slot" in a publishing frame,
337+
meaning outbound messages are not only batched in publishing frames, but in sub-entries as well.
308338
Use this feature to increase throughput at the cost of increased latency. </br>
309339
You can find a "Sub Entries Batching" example in the [examples](./examples/) directory. </br>
310340

@@ -319,7 +349,7 @@ producer, err := env.NewProducer(streamName, stream.NewProducerOptions().
319349

320350
### Ha Producer Experimental
321351
The ha producer is built up the standard producer. </br>
322-
Features:
352+
Features:
323353
- auto-reconnect in case of disconnection
324354
- handle the unconfirmed messages automatically in case of fail.
325355

@@ -329,7 +359,7 @@ You can find a "HA producer" example in the [examples](./examples/) directory. <
329359
haproducer := NewHAProducer(
330360
env *stream.Environment, // mandatory
331361
streamName string, // mandatory
332-
producerOptions *stream.ProducerOptions, //optional
362+
producerOptions *stream.ProducerOptions, //optional
333363
confirmMessageHandler ConfirmMessageHandler // mandatory
334364
)
335365
```
@@ -352,7 +382,7 @@ With `ConsumerOptions` it is possible to customize the consumer behaviour.
352382
```golang
353383
stream.NewConsumerOptions().
354384
SetConsumerName("my_consumer"). // set a consumer name
355-
SetCRCCheck(false). // Enable/Disable the CRC control.
385+
SetCRCCheck(false). // Enable/Disable the CRC control.
356386
SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning
357387
```
358388
Disabling the CRC control can increase the performances.
@@ -374,7 +404,7 @@ handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Mes
374404
consumer, err := env.NewConsumer(
375405
..
376406
stream.NewConsumerOptions().
377-
SetConsumerName("my_consumer"). <------
407+
SetConsumerName("my_consumer"). <------
378408
```
379409
A consumer must have a name to be able to store offsets. <br>
380410
Note: *AVOID to store the offset for each single message, it will reduce the performances*
@@ -388,9 +418,9 @@ processMessageAsync := func(consumer stream.Consumer, message *amqp.Message, off
388418
err := consumer.StoreCustomOffset(offset) // commit all messages up to this offset
389419
....
390420
```
391-
This is useful in situations where we have to process messages asynchronously and we cannot block the original message
421+
This is useful in situations where we have to process messages asynchronously and we cannot block the original message
392422
handler. Which means we cannot store the current or latest delivered offset as we saw in the `handleMessages` function
393-
above.
423+
above.
394424
395425
### Automatic Track Offset
396426
@@ -422,9 +452,9 @@ stream.NewConsumerOptions().
422452
// set a consumerOffsetNumber name
423453
SetConsumerName("my_consumer").
424454
SetAutoCommit(stream.NewAutoCommitStrategy().
425-
SetCountBeforeStorage(50). // store each 50 messages stores
455+
SetCountBeforeStorage(50). // store each 50 messages stores
426456
SetFlushInterval(10*time.Second)). // store each 10 seconds
427-
SetOffset(stream.OffsetSpecification{}.First()))
457+
SetOffset(stream.OffsetSpecification{}.First()))
428458
```
429459
430460
See also "Automatic Offset Tracking" example in the [examples](./examples/) directory
@@ -453,7 +483,7 @@ In this way it is possible to handle fail-over
453483
454484
### Performance test tool
455485
456-
Performance test tool it is useful to execute tests.
486+
Performance test tool it is useful to execute tests.
457487
See also the [Java Performance](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#the-performance-tool) tool
458488
459489

pkg/stream/client.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,3 +799,28 @@ func (c *Client) DeclareSubscriber(streamName string,
799799
}()
800800
return consumer, err.Err
801801
}
802+
803+
func (c *Client) StreamStats(streamName string) (*StreamStats, error) {
804+
805+
resp := c.coordinator.NewResponse(commandStreamStatus)
806+
correlationId := resp.correlationid
807+
808+
length := 2 + 2 + 4 + 2 + len(streamName)
809+
810+
var b = bytes.NewBuffer(make([]byte, 0, length+4))
811+
writeProtocolHeader(b, length, commandStreamStatus,
812+
correlationId)
813+
writeString(b, streamName)
814+
815+
err := c.handleWriteWithResponse(b.Bytes(), resp, false)
816+
offset := <-resp.data
817+
_ = c.coordinator.RemoveResponseById(resp.correlationid)
818+
if err.Err != nil {
819+
return nil, err.Err
820+
}
821+
m, ok := offset.(map[string]int64)
822+
if !ok {
823+
return nil, fmt.Errorf("invalid response, expected map[string]int64 but got %T", offset)
824+
}
825+
return newStreamStats(m, streamName), nil
826+
}

pkg/stream/client_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,52 @@ var _ = Describe("Streaming testEnvironment", func() {
115115
Expect(testEnvironment.DeleteStream(testStreamName)).NotTo(HaveOccurred())
116116
})
117117

118+
It("Stream Status", func() {
119+
Expect(testEnvironment.DeclareStream(testStreamName, nil)).
120+
NotTo(HaveOccurred())
121+
stats, err := testEnvironment.StreamStats(testStreamName)
122+
Expect(err).NotTo(HaveOccurred())
123+
Expect(stats).NotTo(BeNil())
124+
125+
DeferCleanup(func() {
126+
Expect(testEnvironment.DeleteStream(testStreamName)).NotTo(HaveOccurred())
127+
})
128+
129+
_, err = stats.FirstOffset()
130+
Expect(fmt.Sprintf("%s", err)).
131+
To(ContainSubstring("FirstOffset not found for"))
132+
133+
_, err = stats.LastOffset()
134+
Expect(fmt.Sprintf("%s", err)).
135+
To(ContainSubstring("LastOffset not found for"))
136+
137+
_, err = stats.CommittedChunkId()
138+
Expect(fmt.Sprintf("%s", err)).
139+
To(ContainSubstring("CommittedChunkId not found for"))
140+
141+
producer, err := testEnvironment.NewProducer(testStreamName, nil)
142+
Expect(err).NotTo(HaveOccurred())
143+
Expect(producer.BatchSend(CreateArrayMessagesForTesting(1_000))).NotTo(HaveOccurred())
144+
time.Sleep(time.Millisecond * 800)
145+
Expect(producer.Close()).NotTo(HaveOccurred())
146+
147+
statsAfter, err := testEnvironment.StreamStats(testStreamName)
148+
Expect(err).NotTo(HaveOccurred())
149+
Expect(statsAfter).NotTo(BeNil())
150+
151+
offset, err := statsAfter.FirstOffset()
152+
Expect(err).NotTo(HaveOccurred())
153+
Expect(offset == 0).To(BeTrue())
154+
155+
offset, err = statsAfter.LastOffset()
156+
Expect(err).NotTo(HaveOccurred())
157+
Expect(offset > 0).To(BeTrue())
158+
159+
offset, err = statsAfter.CommittedChunkId()
160+
Expect(err).NotTo(HaveOccurred())
161+
Expect(offset > 0).To(BeTrue())
162+
})
163+
118164
It("Create two times Stream precondition fail", func() {
119165
Expect(testEnvironment.DeclareStream(testStreamName, nil)).NotTo(HaveOccurred())
120166
err := testEnvironment.DeclareStream(testStreamName,

pkg/stream/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ const (
4444
commandOpen = 21
4545
CommandClose = 22
4646
commandHeartbeat = 23
47+
commandStreamStatus = 28
4748

4849
/// used only for tests
4950
commandUnitTest = 99

pkg/stream/environment.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,20 @@ func (env *Environment) QuerySequence(publisherReference string, streamName stri
186186
return client.queryPublisherSequence(publisherReference, streamName)
187187
}
188188

189+
func (env *Environment) StreamStats(streamName string) (*StreamStats, error) {
190+
client, err := env.newReconnectClient()
191+
defer func(client *Client) {
192+
err := client.Close()
193+
if err != nil {
194+
return
195+
}
196+
}(client)
197+
if err != nil {
198+
return nil, err
199+
}
200+
return client.StreamStats(streamName)
201+
}
202+
189203
func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, error) {
190204
client, err := env.newReconnectClient()
191205
defer func(client *Client) {

pkg/stream/server_frame.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ func (c *Client) handleResponse() {
9999
c.queryOffsetFrameHandler(readerProtocol, buffer)
100100

101101
}
102+
case commandStreamStatus:
103+
{
104+
c.streamStatusFrameHandler(readerProtocol, buffer)
105+
}
102106
case commandMetadata:
103107
{
104108
c.metadataFrameHandler(readerProtocol, buffer)
@@ -478,6 +482,29 @@ func (c *Client) metadataUpdateFrameHandler(buffer *bufio.Reader) {
478482
}
479483
}
480484

485+
func (c *Client) streamStatusFrameHandler(readProtocol *ReaderProtocol,
486+
r *bufio.Reader) {
487+
488+
c.handleGenericResponse(readProtocol, r)
489+
490+
count, _ := readUInt(r)
491+
streamStatus := make(map[string]int64)
492+
493+
for i := 0; i < int(count); i++ {
494+
key := readString(r)
495+
value := readInt64(r)
496+
streamStatus[key] = value
497+
}
498+
res, err := c.coordinator.GetResponseById(readProtocol.CorrelationId)
499+
if err != nil {
500+
logs.LogWarn("stream status response not found")
501+
return
502+
}
503+
res.code <- Code{id: readProtocol.ResponseCode}
504+
res.data <- streamStatus
505+
506+
}
507+
481508
func (c *Client) metadataFrameHandler(readProtocol *ReaderProtocol,
482509
r *bufio.Reader) {
483510
readProtocol.CorrelationId, _ = readUInt(r)

pkg/stream/stream_stats.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package stream
2+
3+
import "fmt"
4+
5+
type StreamStats struct {
6+
stats map[string]int64
7+
streamName string
8+
}
9+
10+
func newStreamStats(stats map[string]int64, streamName string) *StreamStats {
11+
return &StreamStats{stats: stats, streamName: streamName}
12+
13+
}
14+
15+
// FirstOffset - The first offset in the stream.
16+
// return first offset in the stream /
17+
// Error if there is no first offset yet
18+
func (s *StreamStats) FirstOffset() (int64, error) {
19+
if s.stats["first_chunk_id"] == -1 {
20+
return -1, fmt.Errorf("FirstOffset not found for %s", s.streamName)
21+
}
22+
return s.stats["first_chunk_id"], nil
23+
}
24+
25+
// LastOffset - The last offset in the stream.
26+
// return last offset in the stream
27+
// error if there is no first offset yet
28+
func (s *StreamStats) LastOffset() (int64, error) {
29+
if s.stats["last_chunk_id"] == -1 {
30+
return -1, fmt.Errorf("LastOffset not found for %s", s.streamName)
31+
}
32+
return s.stats["last_chunk_id"], nil
33+
}
34+
35+
// CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
36+
//
37+
// It is the offset of the first message in the last chunk confirmed by a quorum of the stream
38+
// cluster members (leader and replicas).
39+
//
40+
// The committed chunk ID is a good indication of what the last offset of a stream can be at a
41+
// given time. The value can be stale as soon as the application reads it though, as the committed
42+
// chunk ID for a stream that is published to changes all the time.
43+
//
44+
// return committed offset in this stream
45+
// Error if there is no committed chunk yet
46+
func (s *StreamStats) CommittedChunkId() (int64, error) {
47+
if s.stats["committed_chunk_id"] == -1 {
48+
return -1, fmt.Errorf("CommittedChunkId not found for %s", s.streamName)
49+
}
50+
return s.stats["committed_chunk_id"], nil
51+
}

0 commit comments

Comments
 (0)