Skip to content

Commit 1af5ee1

Browse files
Lastconsumed when no offset (#122)
* Return offset 0 when no offset Co-authored-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 0b47773 commit 1af5ee1

File tree

4 files changed

+55
-38
lines changed

4 files changed

+55
-38
lines changed

pkg/stream/client.go

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,27 @@ func (c *Client) DeclareStream(streamName string, options *StreamOptions) error
672672

673673
}
674674

675+
func (c *Client) queryOffset(consumerName string, streamName string) (int64, error) {
676+
length := 2 + 2 + 4 + 2 + len(consumerName) + 2 + len(streamName)
677+
678+
resp := c.coordinator.NewResponse(CommandQueryOffset)
679+
correlationId := resp.correlationid
680+
var b = bytes.NewBuffer(make([]byte, 0, length+4))
681+
writeProtocolHeader(b, length, CommandQueryOffset,
682+
correlationId)
683+
684+
writeString(b, consumerName)
685+
writeString(b, streamName)
686+
err := c.handleWriteWithResponse(b.Bytes(), resp, false)
687+
offset := <-resp.data
688+
_ = c.coordinator.RemoveResponseById(resp.correlationid)
689+
if err.Err != nil {
690+
return 0, err.Err
691+
}
692+
693+
return offset.(int64), nil
694+
}
695+
675696
func (c *Client) DeclareSubscriber(streamName string,
676697
messagesHandler MessagesHandler,
677698
options *ConsumerOptions) (*Consumer, error) {
@@ -691,32 +712,34 @@ func (c *Client) DeclareSubscriber(streamName string,
691712
return nil, fmt.Errorf("message count before storage must be bigger than one")
692713
}
693714

715+
if options.Offset.isLastConsumed() {
716+
lastOffset, err := c.queryOffset(options.ConsumerName, streamName)
717+
switch err {
718+
case nil, OffsetNotFoundError:
719+
if err == OffsetNotFoundError {
720+
options.Offset.typeOfs = typeFirst
721+
options.Offset.offset = 0
722+
break
723+
} else {
724+
options.Offset.offset = lastOffset
725+
options.Offset.typeOfs = typeOffset
726+
break
727+
}
728+
default:
729+
return nil, err
730+
}
731+
}
732+
694733
options.client = c
695734
options.streamName = streamName
696735
consumer := c.coordinator.NewConsumer(messagesHandler, options)
736+
697737
length := 2 + 2 + 4 + 1 + 2 + len(streamName) + 2 + 2
698738
if options.Offset.isOffset() ||
699739
options.Offset.isTimestamp() {
700740
length += 8
701741
}
702742

703-
if options.Offset.isLastConsumed() {
704-
lastOffset, err := consumer.QueryOffset()
705-
if err != nil {
706-
_ = c.coordinator.RemoveConsumerById(consumer.ID, Event{
707-
Command: CommandQueryOffset,
708-
StreamName: streamName,
709-
Name: consumer.GetName(),
710-
Reason: "error QueryOffset",
711-
Err: err,
712-
})
713-
return nil, err
714-
}
715-
options.Offset.offset = lastOffset
716-
// here we change the type since typeLastConsumed is not part of the protocol
717-
options.Offset.typeOfs = typeOffset
718-
}
719-
720743
// copy the option offset to the consumer offset
721744
// the option.offset won't change ( in case we need to retrive the original configuration)
722745
// consumer.current offset will be moved when reading

pkg/stream/constants.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ const (
6969
responseCodeAccessRefused = uint16(16)
7070
responseCodePreconditionFailed = uint16(17)
7171
responseCodePublisherDoesNotExist = uint16(18)
72+
responseCodeNoOffset = uint16(19)
7273

7374
/// responses out of protocol
7475
closeChannel = uint16(60)
@@ -111,6 +112,7 @@ var StreamAlreadyExists = errors.New("Stream Already Exists")
111112
var VirtualHostAccessFailure = errors.New("Virtual Host Access Failure")
112113
var SubscriptionIdDoesNotExist = errors.New("Subscription Id Does Not Exist")
113114
var PublisherDoesNotExist = errors.New("Publisher Does Not Exist")
115+
var OffsetNotFoundError = errors.New("Offset not found")
114116
var FrameTooLarge = errors.New("Frame Too Large, the buffer is too big")
115117
var CodeAccessRefused = errors.New("Resources Access Refused")
116118
var ConnectionClosed = errors.New("Can't send the message, connection closed")
@@ -137,6 +139,8 @@ func lookErrorCode(errorCode uint16) error {
137139
return SubscriptionIdDoesNotExist
138140
case responseCodePublisherDoesNotExist:
139141
return PublisherDoesNotExist
142+
case responseCodeNoOffset:
143+
return OffsetNotFoundError
140144
case responseCodePreconditionFailed:
141145
return PreconditionFailed
142146
case responseCodeFrameTooLarge:

pkg/stream/consumer.go

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -275,27 +275,7 @@ func (consumer *Consumer) internalStoreOffset() error {
275275
}
276276

277277
func (consumer *Consumer) QueryOffset() (int64, error) {
278-
length := 2 + 2 + 4 + 2 + len(consumer.options.ConsumerName) + 2 + len(consumer.options.streamName)
279-
280-
resp := consumer.options.client.coordinator.NewResponse(CommandQueryOffset)
281-
correlationId := resp.correlationid
282-
var b = bytes.NewBuffer(make([]byte, 0, length+4))
283-
writeProtocolHeader(b, length, CommandQueryOffset,
284-
correlationId)
285-
286-
writeString(b, consumer.options.ConsumerName)
287-
writeString(b, consumer.options.streamName)
288-
err := consumer.options.client.handleWriteWithResponse(b.Bytes(), resp, false)
289-
if err.Err != nil {
290-
return 0, err.Err
291-
292-
}
293-
294-
offset := <-resp.data
295-
_ = consumer.options.client.coordinator.RemoveResponseById(resp.correlationid)
296-
297-
return offset.(int64), nil
298-
278+
return consumer.options.client.queryOffset(consumer.options.ConsumerName, consumer.options.streamName)
299279
}
300280

301281
/*

pkg/stream/consumer_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,16 @@ var _ = Describe("Streaming Consumers", func() {
276276
Expect(consumer.Close()).NotTo(HaveOccurred())
277277
})
278278

279+
It("last consumed message not raise an error fist time", func() {
280+
281+
_, err := env.NewConsumer(streamName,
282+
func(consumerContext ConsumerContext, message *amqp.Message) {
283+
}, NewConsumerOptions().
284+
SetOffset(OffsetSpecification{}.LastConsumed()).
285+
SetConsumerName("consumer_test"))
286+
Expect(err).NotTo(HaveOccurred())
287+
})
288+
279289
It("Subscribe/Unsubscribe count messages manual store", func() {
280290
producer, err := env.NewProducer(streamName, nil)
281291
Expect(err).NotTo(HaveOccurred())

0 commit comments

Comments
 (0)