Skip to content

Commit 345b205

Browse files
authored
Fixes #158 (#159)
* Fixes #158 Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> * Add a test case for #158 Signed-off-by: Aitor Perez Cedres <acedres@vmware.com> * Refactor integration test
1 parent 1ac7034 commit 345b205

File tree

8 files changed

+195
-36
lines changed

8 files changed

+195
-36
lines changed

go.sum

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ github.com/frankban/quicktest v1.14.2 h1:SPb1KFFmM+ybpEjPUhCCkZOM5xlovT5UbrMvWnX
99
github.com/frankban/quicktest v1.14.2/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps=
1010
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
1111
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
12+
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I=
1213
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
1314
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
1415
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
@@ -29,6 +30,7 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
2930
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
3031
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
3132
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
33+
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE=
3234
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
3335
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
3436
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@@ -127,6 +129,7 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
127129
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
128130
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
129131
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
132+
golang.org/x/tools v0.1.10 h1:QjFRCZxdOhBJ/UNgnBZLbNV13DlbnK0quyivTnXJM20=
130133
golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E=
131134
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
132135
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package integration_test
2+
3+
import (
4+
"testing"
5+
6+
. "github.com/onsi/ginkgo/v2"
7+
. "github.com/onsi/gomega"
8+
)
9+
10+
func TestIntegrationTest(t *testing.T) {
11+
RegisterFailHandler(Fail)
12+
RunSpecs(t, "IntegrationTest Suite")
13+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package integration_test
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"time"
7+
8+
. "github.com/onsi/ginkgo/v2"
9+
. "github.com/onsi/gomega"
10+
11+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
12+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
13+
stream "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
14+
)
15+
16+
var _ = Describe("StreamIntegration", func() {
17+
Context("Issue 158", func() {
18+
var (
19+
addresses []string = []string{
20+
"rabbitmq-stream://guest:guest@localhost:5552/"}
21+
streamName string = "test-next"
22+
streamEnv *stream.Environment
23+
producer *stream.Producer
24+
totalInitialMessages int
25+
)
26+
27+
BeforeEach(func() {
28+
var err error
29+
streamEnv, err = stream.NewEnvironment(
30+
stream.NewEnvironmentOptions().SetUris(addresses))
31+
Expect(err).ToNot(HaveOccurred())
32+
33+
err = streamEnv.DeclareStream(streamName,
34+
stream.NewStreamOptions().SetMaxLengthBytes(stream.ByteCapacity{}.GB(2)))
35+
Expect(err).ToNot(HaveOccurred())
36+
37+
producer, err = streamEnv.NewProducer(streamName, nil)
38+
Expect(err).ToNot(HaveOccurred())
39+
confirmationCh := producer.NotifyPublishConfirmation()
40+
readyCh := make(chan bool)
41+
42+
totalInitialMessages = 100
43+
// Routine to receive message confirmations
44+
// Required to ensure there are existing messages before we
45+
// attach to the stream
46+
go func(c chan bool) {
47+
totalExpected := totalInitialMessages
48+
count := 0
49+
loop:
50+
for {
51+
select {
52+
case confirmations := <-confirmationCh:
53+
for range confirmations {
54+
count += 1
55+
if count == totalExpected {
56+
break loop
57+
}
58+
}
59+
}
60+
}
61+
c <- true
62+
}(readyCh)
63+
64+
for i := 0; i < totalInitialMessages; i++ {
65+
var message message.StreamMessage
66+
body := fmt.Sprintf(`{"name": "item-%d", "age": %d}`, i, i)
67+
message = amqp.NewMessage([]byte(body))
68+
err = producer.Send(message)
69+
Expect(err).ToNot(HaveOccurred())
70+
}
71+
72+
// Wait for all confirmations
73+
<-readyCh
74+
})
75+
76+
AfterEach(func() {
77+
Expect(streamEnv.DeleteStream(streamName)).
78+
To(SatisfyAny(
79+
Succeed(),
80+
MatchError(stream.StreamDoesNotExist),
81+
))
82+
})
83+
84+
It("consumes from an existing stream", func() {
85+
By("attaching using Next strategy")
86+
options := stream.NewConsumerOptions().
87+
SetConsumerName("golang-client-issue-158-test").
88+
SetOffset(stream.OffsetSpecification{}.Next()).
89+
SetManualCommit()
90+
91+
receivedOffsets := make([]int64, 0)
92+
m := sync.Mutex{} // To avoid races in the handler and test assertions
93+
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
94+
defer GinkgoRecover()
95+
m.Lock()
96+
receivedOffsets = append(
97+
receivedOffsets,
98+
consumerContext.Consumer.GetOffset(),
99+
)
100+
m.Unlock()
101+
Expect(consumerContext.Consumer.StoreOffset()).To(Succeed())
102+
}
103+
104+
consumer, err := streamEnv.NewConsumer(streamName, handleMessages, options)
105+
Expect(err).ToNot(HaveOccurred())
106+
107+
newMessagesExpected := 100
108+
for i := totalInitialMessages; i < totalInitialMessages+newMessagesExpected; i++ {
109+
var message message.StreamMessage
110+
body := fmt.Sprintf(`{"name": "item-%d", "age": %d}`, i, i)
111+
message = amqp.NewMessage([]byte(body))
112+
err = producer.Send(message)
113+
Expect(err).ToNot(HaveOccurred())
114+
}
115+
116+
// We should receive only 100 messages because Next sends the next chunk
117+
// in the stream. The previously 100 messages should be in a different chunk
118+
By("receiving only new messages")
119+
Eventually(func() int {
120+
m.Lock()
121+
defer m.Unlock()
122+
return len(receivedOffsets)
123+
}).
124+
WithTimeout(time.Second * 3).
125+
WithPolling(time.Millisecond * 500).
126+
Should(BeNumerically("==", 100))
127+
128+
firstExpectedOffset := 100
129+
for i := 0; i < len(receivedOffsets); i++ {
130+
m.Lock()
131+
Expect(receivedOffsets[i]).To(BeNumerically("==", firstExpectedOffset+i),
132+
"Offset in [%d] is %d, expected %d",
133+
i, receivedOffsets[i], firstExpectedOffset+i)
134+
m.Unlock()
135+
}
136+
// Current offset is initial (first) + total received msg - 1
137+
// -1 because the first offset is 100 (it's not 101)
138+
// e.g. 100, 101 ... 199. NOT 200
139+
// Similar when 0, 1...99 (not 100)
140+
expectedCurrentOffset := firstExpectedOffset + newMessagesExpected - 1
141+
Expect(consumer.GetOffset()).To(BeNumerically("==", expectedCurrentOffset))
142+
})
143+
})
144+
})

pkg/stream/client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -775,9 +775,9 @@ func (c *Client) DeclareSubscriber(streamName string,
775775
}
776776

777777
case offsetMessages := <-consumer.response.offsetMessages:
778-
for _, message := range offsetMessages.messages {
779-
consumer.incCurrentOffset()
780-
consumer.MessagesHandler(ConsumerContext{Consumer: consumer}, message)
778+
for _, offMessage := range offsetMessages {
779+
consumer.setCurrentOffset(offMessage.offset)
780+
consumer.MessagesHandler(ConsumerContext{Consumer: consumer}, offMessage.message)
781781
if consumer.options.autocommit {
782782
consumer.messageCountBeforeStorage += 1
783783
if consumer.messageCountBeforeStorage >= consumer.options.autoCommitStrategy.messageCountBeforeStorage {

pkg/stream/consumer.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,6 @@ func (consumer *Consumer) setCurrentOffset(offset int64) {
6464
consumer.currentOffset = offset
6565
}
6666

67-
func (consumer *Consumer) incCurrentOffset() {
68-
consumer.mutex.Lock()
69-
defer consumer.mutex.Unlock()
70-
consumer.currentOffset += 1
71-
}
72-
7367
func (consumer *Consumer) GetOffset() int64 {
7468
consumer.mutex.Lock()
7569
res := consumer.currentOffset

pkg/stream/consumer_test.go

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -222,15 +222,16 @@ var _ = Describe("Streaming Consumers", func() {
222222
Expect(err).NotTo(HaveOccurred())
223223
Eventually(func() int64 {
224224
return consumer.GetLastStoredOffset()
225-
}, 5*time.Second).Should(Equal(int64(100)),
226-
"Offset should be 100")
225+
// 99 is the offset since it starts from 0
226+
}, 5*time.Second).Should(Equal(int64(99)),
227+
"Offset should be 99")
227228
Expect(consumer.Close()).NotTo(HaveOccurred())
228229
/// When the consumer is closed, it has to save the offset
229-
// so the last offset has to be 105
230+
// so the last offset has to be 104
230231
Eventually(func() int64 {
231232
return consumer.GetLastStoredOffset()
232-
}, 5*time.Second).Should(Equal(int64(105)),
233-
"Offset should be 105")
233+
}, 5*time.Second).Should(Equal(int64(104)),
234+
"Offset should be 104")
234235

235236
consumerTimer, errTimer := env.NewConsumer(streamName,
236237
func(consumerContext ConsumerContext, message *amqp.Message) {
@@ -245,15 +246,15 @@ var _ = Describe("Streaming Consumers", func() {
245246
time.Sleep(2 * time.Second)
246247
Eventually(func() int64 {
247248
return consumerTimer.GetLastStoredOffset()
248-
}, 5*time.Second).Should(Equal(int64(105)),
249-
"Offset should be 105")
249+
}, 5*time.Second).Should(Equal(int64(104)),
250+
"Offset should be 104")
250251
Expect(consumerTimer.Close()).NotTo(HaveOccurred())
251252
/// When the consumer is closed, it has to save the offset
252-
// so the last offest has to be 105
253+
// so the last offest has to be 104
253254
Eventually(func() int64 {
254255
return consumerTimer.GetLastStoredOffset()
255-
}, 5*time.Second).Should(Equal(int64(105)),
256-
"Offset should be 105")
256+
}, 5*time.Second).Should(Equal(int64(104)),
257+
"Offset should be 104")
257258

258259
})
259260

@@ -356,22 +357,25 @@ var _ = Describe("Streaming Consumers", func() {
356357

357358
Eventually(func() int64 {
358359
return consumer.GetLastStoredOffset()
359-
}, 5*time.Second).Should(Equal(int64(107)),
360-
"Offset should be 107")
360+
// 106 is the offset since it starts from 0
361+
}, 5*time.Second).Should(Equal(int64(106)),
362+
"Offset should be 106")
361363
time.Sleep(500 * time.Millisecond)
362364
offset, err := env.QueryOffset("consumer_test", streamName)
363365
Expect(err).NotTo(HaveOccurred())
364366
Eventually(func() int64 {
365367
return offset
366-
}, 5*time.Second).Should(Equal(int64(107)),
367-
"Offset should be 107")
368+
// 106 is the offset since it starts from 0
369+
}, 5*time.Second).Should(Equal(int64(106)),
370+
"Offset should be 106")
368371

369372
offsetConsumer, err := consumer.QueryOffset()
370373
Expect(err).NotTo(HaveOccurred())
371374
Eventually(func() int64 {
372375
return offsetConsumer
373-
}, 5*time.Second).Should(Equal(int64(107)),
374-
"Consumer Offset should be 107")
376+
// 106 is the offset since it starts from 0
377+
}, 5*time.Second).Should(Equal(int64(106)),
378+
"Consumer Offset should be 106")
375379

376380
err = consumer.Close()
377381
Expect(err).NotTo(HaveOccurred())
@@ -385,7 +389,7 @@ var _ = Describe("Streaming Consumers", func() {
385389
SetConsumerName("consumer_test"))
386390
Expect(err).NotTo(HaveOccurred())
387391
time.Sleep(500 * time.Millisecond)
388-
Expect(atomic.LoadInt32(&messagesReceived)).To(Equal(int32(0)))
392+
Expect(atomic.LoadInt32(&messagesReceived)).To(Equal(int32(1)))
389393
Expect(consumer.Close()).NotTo(HaveOccurred())
390394
})
391395

pkg/stream/coordinator.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@ type Code struct {
2323
id uint16
2424
}
2525

26-
type offsetMessages struct {
27-
messages []*amqp.Message
28-
offset int64
26+
type offsetMessage struct {
27+
message *amqp.Message
28+
offset int64
2929
}
3030

31+
type offsetMessages = []*offsetMessage
32+
3133
type Response struct {
3234
code chan Code
3335
data chan interface{}

pkg/stream/server_frame.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ func (c *Client) handleDeliver(r *bufio.Reader) {
319319
filter := offsetLimit != -1
320320

321321
//messages
322-
var batchConsumingMessages []*amqp.Message
322+
var batchConsumingMessages offsetMessages
323323
var bytesBuffer = make([]byte, int(dataLength))
324324
_, err = io.ReadFull(r, bytesBuffer)
325325
if err != nil {
@@ -384,15 +384,13 @@ func (c *Client) handleDeliver(r *bufio.Reader) {
384384
}
385385

386386
if consumer.getStatus() == open {
387-
consumer.response.offsetMessages <- offsetMessages{
388-
messages: batchConsumingMessages,
389-
offset: offset,
390-
}
387+
consumer.response.offsetMessages <- batchConsumingMessages
388+
391389
}
392390

393391
}
394392

395-
func (c *Client) decodeMessage(r *bufio.Reader, filter bool, offset int64, offsetLimit int64, batchConsumingMessages []*amqp.Message) []*amqp.Message {
393+
func (c *Client) decodeMessage(r *bufio.Reader, filter bool, offset int64, offsetLimit int64, batchConsumingMessages offsetMessages) offsetMessages {
396394
sizeMessage, _ := readUInt(r)
397395
arrayMessage := readUint8Array(r, sizeMessage)
398396
if filter && (offset < offsetLimit) {
@@ -403,7 +401,8 @@ func (c *Client) decodeMessage(r *bufio.Reader, filter bool, offset int64, offse
403401
if err != nil {
404402
logs.LogError("error unmarshal messages: %s", err)
405403
}
406-
batchConsumingMessages = append(batchConsumingMessages, msg)
404+
batchConsumingMessages = append(batchConsumingMessages,
405+
&offsetMessage{offset: offset, message: msg})
407406
}
408407
return batchConsumingMessages
409408
}

0 commit comments

Comments
 (0)