Skip to content

Commit e1287b0

Browse files
authored
new repo changes (#31)
* new repo changes
1 parent 5d2fb3c commit e1287b0

File tree

8 files changed

+30
-17
lines changed

8 files changed

+30
-17
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ build: vet build-perfTest
2525
test: vet
2626
go test -v ./pkg/streaming -race -coverprofile=coverage.txt -covermode=atomic
2727
docker-build: build
28-
docker build -t gsantomaggio/go-stream-client:$(VERSION) .
28+
docker build -t pivotalrabbitmq/go-stream-perf-test:$(VERSION) .
2929

3030
docker-push: docker-build
31-
docker push gsantomaggio/go-stream-client:$(VERSION)
31+
docker push pivotalrabbitmq/go-stream-perf-test:$(VERSION)
3232

3333
run-perTest: build-perfTest
3434
go run perfTest/perftest.go $(PERFTEST_FLAGS)

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ Experimental client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rab
88
### Download
99
---
1010
```
11-
go get -u github.com/gsantomaggio/go-stream-client@v0.2-alpha
11+
go get -u github.com/gsantomaggio/rabbitmq-stream-go-client@v0.3-alpha
1212
```
1313

1414
### How to test
@@ -28,7 +28,7 @@ go get -u github.com/gsantomaggio/go-stream-client@v0.2-alpha
2828
---
2929
The performance tool is work in progress, you can use it with docker
3030
```
31-
docker run --network host -it gsantomaggio/go-stream-client silent
31+
docker run --network host -it pivotalrabbitmq/go-stream-perf-test silent
3232
```
3333

3434
or directly

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.2-alpha
1+
0.3-alpha

perfTest/cmd/silent.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func printStats() {
5151
}
5252

5353
func startSimulation() error {
54-
streaming.INFO("Silent Simulation, url: %s producers: %d consumers: %d streams: %s\n", rabbitmqBrokerUrl, producers, consumers, streams)
54+
streaming.INFO("Silent (%s) Simulation, url: %s producers: %d consumers: %d streams: %s ", streaming.Version, rabbitmqBrokerUrl, producers, consumers, streams)
5555

5656
err := initStreams()
5757
err = startConsumers()
@@ -149,7 +149,10 @@ func startConsumers() error {
149149
Name(uuid.New().String()).
150150
MessagesHandler(func(Context streaming.ConsumerContext, message *amqp.Message) {
151151
if atomic.AddInt32(&consumerMessageCount, 1)%500 == 0 {
152-
_ = Context.Consumer.Commit()
152+
err := Context.Consumer.Commit()
153+
if err != nil {
154+
streaming.ERROR("Error Commit: %s", err)
155+
}
153156
}
154157
}).Build()
155158
if err != nil {

perfTest/perftest.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@ package main
22

33
import (
44
"github.com/gsantomaggio/go-stream-client/perfTest/cmd"
5-
"time"
5+
"sync"
66
)
77

8+
var wg sync.WaitGroup
89
func main() {
9-
cmd.Execute()
10-
for true {
11-
time.Sleep(1 * time.Second)
12-
}
10+
11+
wg.Add(1)
12+
go cmd.Execute()
13+
wg.Wait()
1314
}

pkg/streaming/constants.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ const (
6464
DefaultReadSocketBuffer = 4096 * 2
6565

6666
//
67-
Version = "0.2-alpha"
67+
Version = "0.3-alpha"
6868
)
6969

7070
func LookErrorCode(errorCode uint16) string {

pkg/streaming/consumer_creator.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ type Consumer struct {
1414
parameters *ConsumerCreator
1515
mutex *sync.RWMutex
1616
}
17+
1718
func (consumer *Consumer) GetStream() string {
1819
return consumer.parameters.streamName
1920
}
@@ -133,6 +134,9 @@ func (c *ConsumerCreator) Build() (*Consumer, error) {
133134
return
134135
}
135136

137+
case data := <-consumer.response.data:
138+
consumer.setOffset(data.(int64))
139+
136140
case messages := <-consumer.response.messages:
137141
for _, message := range messages {
138142
c.messagesHandler(ConsumerContext{Consumer: consumer}, message)

pkg/streaming/response.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,11 @@ func (c *Client) handleDeliver(r *bufio.Reader) {
191191
consumer, _ := c.coordinator.GetConsumerById(subscriptionId)
192192

193193
_ = ReadByte(r)
194-
_ = ReadByte(r)
194+
chunkType := ReadByte(r)
195+
if chunkType != 0 {
196+
WARN("Invalid chunkType: %d ", chunkType)
197+
}
198+
195199
_ = ReadUShort(r)
196200
numRecords, _ := ReadUInt(r)
197201
_ = ReadInt64(r) // timestamp
@@ -210,7 +214,7 @@ func (c *Client) handleDeliver(r *bufio.Reader) {
210214
var offsetLimit int64 = -1
211215

212216
if consumer.parameters.offsetSpecification.isOffset() {
213-
offsetLimit = consumer.parameters.offsetSpecification.offset
217+
offsetLimit = consumer.getOffset()
214218
}
215219
//if
216220

@@ -235,14 +239,15 @@ func (c *Client) handleDeliver(r *bufio.Reader) {
235239
batchConsumingMessages = append(batchConsumingMessages, msg)
236240
}
237241

242+
} else {
243+
WARN("entryType Not Handled %d", entryType)
238244
}
239245
numRecords--
240246
offset++
241-
consumer.setOffset(offset)
242247
}
243248

244249

245-
//consumer.response.code <- Code{id: ResponseCodeOk}
250+
consumer.response.data <- offset
246251
consumer.response.messages <- batchConsumingMessages
247252

248253
}

0 commit comments

Comments
 (0)