Skip to content

Commit 19828e9

Browse files
authored
optional print stats (#30)
* optional print stats
1 parent ed3a986 commit 19828e9

File tree

6 files changed

+28
-18
lines changed

6 files changed

+28
-18
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ A POC client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-s
88
### Download
99
---
1010
```
11-
go get -u github.com/gsantomaggio/go-stream-client@v0.1-alpha
11+
go get -u github.com/gsantomaggio/go-stream-client@v0.2-alpha
1212
```
1313

1414
### How to test

examples/getting_started.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,14 @@ func main() {
3131
streamName := uuid.New().String()
3232
err = client.StreamCreator().Stream(streamName).
3333
Create() // Create the streaming queue
34+
CheckErr(err)
3435

36+
err = client.StreamCreator().Stream(streamName).
37+
MaxLengthBytes(streaming.ByteCapacity{}.MB(5)).
38+
Create() // Create the streaming queue
3539
CheckErr(err)
40+
41+
3642
var count int32
3743
consumer, err := client.ConsumerCreator().
3844
Stream(streamName).

perfTest/cmd/commands.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ var (
2222
preDeclared bool
2323
streams []string
2424
maxLengthBytes string
25+
printStatsV bool
2526
)
2627

2728
func init() {
@@ -33,6 +34,7 @@ func setupCli(baseCmd *cobra.Command) {
3334
baseCmd.PersistentFlags().IntVarP(&producers, "producers", "p", 1, "Number of Producers")
3435
baseCmd.PersistentFlags().IntVarP(&consumers, "consumers", "c", 1, "Number of Consumers")
3536
baseCmd.PersistentFlags().BoolVarP(&preDeclared, "pre-declared", "d", false, "Pre created stream")
37+
baseCmd.PersistentFlags().BoolVarP(&printStatsV, "print-stats", "n", true, "Print stats")
3638
baseCmd.PersistentFlags().StringSliceVarP(&streams, "streams", "s", []string{uuid.New().String()}, "Stream names, create an UUID if not specified")
3739
baseCmd.PersistentFlags().StringVarP(&maxLengthBytes, "max_length_bytes", "m", "", "Stream max length bytes, default is unlimited, ex: 10MB,50GB, etc..")
3840
baseCmd.AddCommand(versionCmd)

perfTest/cmd/silent.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,25 @@ var (
2929
)
3030

3131
func printStats() {
32-
start := time.Now()
33-
ticker := time.NewTicker(2 * time.Second)
34-
go func() {
35-
for {
36-
select {
37-
case _ = <-ticker.C:
38-
v := time.Now().Sub(start).Seconds()
39-
PMessagesPerSecond := float64(atomic.LoadInt32(&producerMessageCount)) / v
40-
CMessagesPerSecond := float64(atomic.LoadInt32(&consumerMessageCount)) / v
41-
streaming.INFO("Published %.2f msg/s, Consumed %.2f msg/s, Active Connections: %d", PMessagesPerSecond, CMessagesPerSecond, len(connections))
42-
atomic.SwapInt32(&producerMessageCount, 0)
43-
atomic.SwapInt32(&consumerMessageCount, 0)
44-
start = time.Now()
32+
if printStatsV {
33+
start := time.Now()
34+
ticker := time.NewTicker(2 * time.Second)
35+
go func() {
36+
for {
37+
select {
38+
case _ = <-ticker.C:
39+
v := time.Now().Sub(start).Seconds()
40+
PMessagesPerSecond := float64(atomic.LoadInt32(&producerMessageCount)) / v
41+
CMessagesPerSecond := float64(atomic.LoadInt32(&consumerMessageCount)) / v
42+
streaming.INFO("Published %.2f msg/s, Consumed %.2f msg/s, Active Connections: %d", PMessagesPerSecond, CMessagesPerSecond, len(connections))
43+
atomic.SwapInt32(&producerMessageCount, 0)
44+
atomic.SwapInt32(&consumerMessageCount, 0)
45+
start = time.Now()
46+
}
4547
}
46-
}
4748

48-
}()
49+
}()
50+
}
4951
}
5052

5153
func startSimulation() error {

pkg/streaming/constants.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ const (
6464
DefaultReadSocketBuffer = 4096 * 2
6565

6666
//
67-
Version = "0.2-alpha-dev"
67+
Version = "0.2-alpha"
6868
)
6969

7070
func LookErrorCode(errorCode uint16) string {

pkg/streaming/response.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (c *Client) handleResponse() {
7777
}
7878
case CommandHeartbeat:
7979
{
80-
DEBUG("RECEIVED Heartbeat %d buff:%d \n", readerProtocol.CommandID, buffer.Buffered())
80+
//DEBUG("RECEIVED Heartbeat %d buff:%d \n", readerProtocol.CommandID, buffer.Buffered())
8181

8282
}
8383
case CommandQueryOffset:

0 commit comments

Comments
 (0)