Skip to content

Commit 827dfdc

Browse files
authored
expose application properites (#119)
1 parent 4585ab4 commit 827dfdc

File tree

4 files changed

+67
-5
lines changed

4 files changed

+67
-5
lines changed

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ module github.com/rabbitmq/rabbitmq-stream-go-client
33
go 1.16
44

55
require (
6-
github.com/frankban/quicktest v1.14.0 // indirect
76
github.com/golang/snappy v0.0.4
87
github.com/google/uuid v1.3.0
98
github.com/klauspost/compress v1.14.2

pkg/amqp/types.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -414,10 +414,12 @@ type Message struct {
414414
}
415415

416416
type AMQP10 struct {
417-
publishingId int64
418-
hasPublishingId bool
419-
message *Message
420-
Properties *MessageProperties
417+
publishingId int64
418+
hasPublishingId bool
419+
message *Message
420+
Properties *MessageProperties
421+
Annotations Annotations
422+
ApplicationProperties map[string]interface{}
421423
}
422424

423425
func NewMessage(data []byte) *AMQP10 {
@@ -443,6 +445,8 @@ func (amqp *AMQP10) GetPublishingId() int64 {
443445

444446
func (amqp *AMQP10) MarshalBinary() ([]byte, error) {
445447
amqp.message.Properties = amqp.Properties
448+
amqp.message.ApplicationProperties = amqp.ApplicationProperties
449+
amqp.message.Annotations = amqp.Annotations
446450
return amqp.message.MarshalBinary()
447451
}
448452

@@ -467,6 +471,10 @@ func (amqp *AMQP10) GetMessageAnnotations() Annotations {
467471
return amqp.message.Annotations
468472
}
469473

474+
func (amqp *AMQP10) GetApplicationProperties() map[string]interface{} {
475+
return amqp.message.ApplicationProperties
476+
}
477+
470478
// NewMessage returns a *Message with data as the payload.
471479
//
472480
// This constructor is intended as a helper for basic Messages with a

pkg/message/interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ type StreamMessage interface {
1111
GetData() [][]byte
1212
GetMessageProperties() *amqp.MessageProperties
1313
GetMessageAnnotations() amqp.Annotations
14+
GetApplicationProperties() map[string]interface{}
1415
}

pkg/stream/consumer_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,60 @@ var _ = Describe("Streaming Consumers", func() {
398398

399399
})
400400

401+
It("Application Message Properties", func() {
402+
producer, err := env.NewProducer(streamName, nil)
403+
404+
chConfirm := producer.NotifyPublishConfirmation()
405+
go func(ch ChannelPublishConfirm, p *Producer) {
406+
for ids := range ch {
407+
for _, msg := range ids {
408+
Expect(msg.GetMessage().GetApplicationProperties()["key1"]).To(Equal("value1"))
409+
Expect(msg.GetMessage().GetApplicationProperties()["key2"]).To(Equal("value2"))
410+
Expect(msg.GetMessage().GetApplicationProperties()["key3"]).To(Equal("value3"))
411+
Expect(msg.GetMessage().GetApplicationProperties()["key4"]).To(Equal("value4"))
412+
Expect(msg.GetMessage().GetMessageAnnotations()["annotation_key_1"]).To(Equal("annotation_vale_1"))
413+
Expect(msg.GetMessage().GetMessageAnnotations()["annotation_key_2"]).To(Equal("annotation_vale_2"))
414+
}
415+
}
416+
}(chConfirm, producer)
417+
418+
appMap := map[string]interface{}{
419+
"key1": "value1",
420+
"key2": "value2",
421+
"key3": "value3",
422+
"key4": "value4",
423+
"key5": "value5",
424+
}
425+
Expect(err).NotTo(HaveOccurred())
426+
msg := amqp.NewMessage([]byte("message"))
427+
msg.ApplicationProperties = appMap
428+
msg.Annotations = map[interface{}]interface{}{
429+
"annotation_key_1": "annotation_vale_1",
430+
"annotation_key_2": "annotation_vale_2",
431+
}
432+
433+
Expect(producer.Send(msg)).NotTo(HaveOccurred())
434+
defer func(producer *Producer) {
435+
Expect(producer.Close()).NotTo(HaveOccurred())
436+
}(producer)
437+
438+
consumer, err := env.NewConsumer(streamName,
439+
func(consumerContext ConsumerContext, message *amqp.Message) {
440+
Expect(message.ApplicationProperties["key1"]).To(Equal("value1"))
441+
Expect(message.ApplicationProperties["key2"]).To(Equal("value2"))
442+
Expect(message.ApplicationProperties["key3"]).To(Equal("value3"))
443+
Expect(message.ApplicationProperties["key4"]).To(Equal("value4"))
444+
Expect(message.Annotations["annotation_key_1"]).To(Equal("annotation_vale_1"))
445+
Expect(message.Annotations["annotation_key_2"]).To(Equal("annotation_vale_2"))
446+
447+
}, NewConsumerOptions().SetOffset(OffsetSpecification{}.First()).
448+
SetConsumerName("consumer_test"))
449+
Expect(err).NotTo(HaveOccurred())
450+
time.Sleep(200 * time.Millisecond)
451+
Expect(consumer.Close()).NotTo(HaveOccurred())
452+
453+
})
454+
401455
It("Consistent Messages", func() {
402456
producer, err := env.NewProducer(streamName, nil)
403457
Expect(err).NotTo(HaveOccurred())

0 commit comments

Comments
 (0)