Skip to content

Commit cf8491c

Browse files
authored
add rate (#33)
* add rate
1 parent e1287b0 commit cf8491c

File tree

4 files changed

+26
-2
lines changed

4 files changed

+26
-2
lines changed

perfTest/cmd/commands.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,20 @@ var (
2323
streams []string
2424
maxLengthBytes string
2525
printStatsV bool
26+
rate int
27+
batchSize int
2628
)
2729

2830
func init() {
2931
setupCli(rootCmd)
3032
}
3133

3234
func setupCli(baseCmd *cobra.Command) {
35+
batchSize = 100
3336
baseCmd.PersistentFlags().StringVarP(&rabbitmqBrokerUrl, "uris", "u", streaming.LocalhostUriConnection, "Broker URL")
3437
baseCmd.PersistentFlags().IntVarP(&producers, "producers", "p", 1, "Number of Producers")
3538
baseCmd.PersistentFlags().IntVarP(&consumers, "consumers", "c", 1, "Number of Consumers")
39+
baseCmd.PersistentFlags().IntVarP(&rate, "rate", "r", 0, "Limit publish rate")
3640
baseCmd.PersistentFlags().BoolVarP(&preDeclared, "pre-declared", "d", false, "Pre created stream")
3741
baseCmd.PersistentFlags().BoolVarP(&printStatsV, "print-stats", "n", true, "Print stats")
3842
baseCmd.PersistentFlags().StringSliceVarP(&streams, "streams", "s", []string{uuid.New().String()}, "Stream names, create an UUID if not specified")

perfTest/cmd/silent.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,14 +111,18 @@ func startProducers() error {
111111
return err
112112
}
113113
var arr []*amqp.Message
114-
for z := 0; z < 100; z++ {
114+
for z := 0; z < batchSize; z++ {
115115

116116
arr = append(arr, amqp.NewMessage([]byte(fmt.Sprintf("simul_%s", stream) )))
117117
}
118118

119119
go func(prod *streaming.Producer, messages []*amqp.Message) {
120120
for {
121-
//time.Sleep(1 * time.Millisecond)
121+
if rate > 0 {
122+
sleep := float64(batchSize) / float64(rate)
123+
sleep = sleep * 1000
124+
time.Sleep(time.Duration(sleep) * time.Millisecond)
125+
}
122126
atomic.AddInt32(&producerMessageCount, 100)
123127
_, err = prod.BatchPublish(nil, arr)
124128
if err != nil {

pkg/streaming/client_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,5 +105,19 @@ var _ = Describe("Streaming testClient", func() {
105105
Expect(err).NotTo(HaveOccurred())
106106
})
107107

108+
It("Create two times Stream precondition fail", func() {
109+
err := testClient.StreamCreator().Stream(testStreamName).Create()
110+
Expect(err).NotTo(HaveOccurred())
111+
err = testClient.StreamCreator().Stream(testStreamName).
112+
MaxLengthBytes(ByteCapacity{}.MB(100)).
113+
Create()
114+
Expect(err).To(HaveOccurred())
115+
Expect(fmt.Sprintf("%s", err)).
116+
To(ContainSubstring("Precondition Failed"))
117+
err = testClient.DeleteStream(testStreamName)
118+
Expect(err).NotTo(HaveOccurred())
119+
})
120+
121+
108122
})
109123
})

pkg/streaming/constants.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ func LookErrorCode(errorCode uint16) string {
8383
return "Code subscription id does not exist"
8484
case ResponseCodePublisherDoesNotExist:
8585
return "Code publisher does not exist"
86+
case ResponseCodePreconditionFailed:
87+
return "Code Precondition Failed"
8688
default:
8789
{
8890
WARN("Error not handled %d", errorCode)

0 commit comments

Comments
 (0)