Skip to content

Commit dc29860

Browse files
authored
Add details on message confirmation (#111)
* Add details
1 parent 96d0060 commit dc29860

File tree

4 files changed

+26
-0
lines changed

4 files changed

+26
-0
lines changed

pkg/amqp/types.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,14 @@ func (amqp *AMQP10) Message() *Message {
436436

437437
}
438438

439+
func (amqp *AMQP10) GetMessageProperties() *MessageProperties {
440+
return amqp.message.Properties
441+
}
442+
443+
func (amqp *AMQP10) GetMessageAnnotations() Annotations {
444+
return amqp.message.Annotations
445+
}
446+
439447
// NewMessage returns a *Message with data as the payload.
440448
//
441449
// This constructor is intended as a helper for basic Messages with a

pkg/message/interface.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package message
22

3+
import "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
4+
35
type StreamMessage interface {
46
MarshalBinary() ([]byte, error)
57
UnmarshalBinary(data []byte) error
68
SetPublishingId(id int64)
79
GetPublishingId() int64
810
HasPublishingId() bool
911
GetData() [][]byte
12+
GetMessageProperties() *amqp.MessageProperties
13+
GetMessageAnnotations() amqp.Annotations
1014
}

pkg/stream/consumer_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,19 @@ var _ = Describe("Streaming Consumers", func() {
347347

348348
It("message Properties", func() {
349349
producer, err := env.NewProducer(streamName, nil)
350+
351+
chConfirm := producer.NotifyPublishConfirmation()
352+
go func(ch ChannelPublishConfirm, p *Producer) {
353+
for ids := range ch {
354+
for _, msg := range ids {
355+
Expect(msg.GetMessage().GetMessageProperties().To).To(Equal("ToTest"))
356+
Expect(msg.GetMessage().GetMessageProperties().Subject).To(Equal("SubjectTest"))
357+
Expect(msg.GetMessage().GetMessageProperties().ReplyTo).To(Equal("replyToTest"))
358+
Expect(msg.GetMessage().GetMessageProperties().ContentType).To(Equal("ContentTypeTest"))
359+
Expect(msg.GetMessage().GetMessageProperties().ContentEncoding).To(Equal("ContentEncodingTest"))
360+
}
361+
}
362+
}(chConfirm, producer)
350363
Expect(err).NotTo(HaveOccurred())
351364
msg := amqp.NewMessage([]byte("message"))
352365
msg.Properties = &amqp.MessageProperties{

pkg/stream/producer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ func (cs *ConfirmationStatus) GetMessage() message.StreamMessage {
4545
return cs.message
4646
}
4747

48+
4849
func (cs *ConfirmationStatus) GetErrorCode() uint16 {
4950
return cs.errorCode
5051
}

0 commit comments

Comments
 (0)