Skip to content

Commit 8423b6e

Browse files
committed
remove correlation id con commit offest
rabbitmq/rabbitmq-server#3097
1 parent 427efdd commit 8423b6e

File tree

5 files changed

+24
-91
lines changed

5 files changed

+24
-91
lines changed

examples/getting_started.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,14 @@ func CreateArrayMessagesForTesting(bacthMessages int) []*amqp.Message {
2929

3030
func handlePublishConfirm(confirms stream.ChannelPublishConfirm) {
3131
go func() {
32-
for messagesIds := range confirms {
33-
for _, m := range messagesIds {
34-
fmt.Printf("Confirmed %s message \n ", m.Message.Data)
32+
for confirmed := range confirms {
33+
for _, msg := range confirmed {
34+
if msg.Confirmed {
35+
fmt.Printf("message %s stored \n ", msg.Message.Data)
36+
} else {
37+
fmt.Printf("message %s failed \n ", msg.Message.Data)
38+
}
39+
3540
}
3641
}
3742
}()

pkg/stream/consumer.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,10 @@ func (consumer *Consumer) Commit() error {
154154
if consumer.options.streamName == "" {
155155
return fmt.Errorf("stream Name can't be empty")
156156
}
157-
length := 2 + 2 + 4 + 2 + len(consumer.options.ConsumerName) + 2 +
157+
length := 2 + 2 + 2 + len(consumer.options.ConsumerName) + 2 +
158158
len(consumer.options.streamName) + 8
159159
var b = bytes.NewBuffer(make([]byte, 0, length+4))
160-
writeProtocolHeader(b, length, commandCommitOffset,
161-
0) // correlation ID not used yet, may be used if commit offset has a confirm
160+
writeProtocolHeader(b, length, commandCommitOffset) // correlation ID not used yet, may be used if commit offset has a confirm
162161

163162
writeString(b, consumer.options.ConsumerName)
164163
writeString(b, consumer.options.streamName)

pkg/stream/coordinator.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,15 @@ func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error
8181
time.Sleep(200 * time.Millisecond)
8282
tentatives++
8383
}
84-
//producer.mutex.Lock()
85-
//if producer.publishConfirm != nil {
86-
// for _, message := range producer.unConfirmedMessages {
87-
// message.Confirmed = false
88-
// producer.publishConfirm <- []*UnConfirmedMessage{message}
89-
// }
90-
//}
91-
//producer.mutex.Unlock()
92-
//producer.resetUnConfirmed()
84+
producer.mutex.Lock()
85+
if producer.publishConfirm != nil {
86+
for _, message := range producer.unConfirmedMessages {
87+
message.Confirmed = false
88+
producer.publishConfirm <- []*UnConfirmedMessage{message}
89+
}
90+
}
91+
producer.mutex.Unlock()
92+
producer.resetUnConfirmed()
9393

9494
if producer.closeHandler != nil {
9595
producer.closeHandler <- reason

pkg/stream/producer.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,11 @@ func (producer *Producer) removeUnConfirmed(messageid int64) {
6969
delete(producer.unConfirmedMessages, messageid)
7070
}
7171

72-
//func (producer *Producer) resetUnConfirmed() {
73-
// producer.mutex.Lock()
74-
// defer producer.mutex.Unlock()
75-
// producer.unConfirmedMessages = map[int64]*UnConfirmedMessage{}
76-
//}
72+
func (producer *Producer) resetUnConfirmed() {
73+
producer.mutex.Lock()
74+
defer producer.mutex.Unlock()
75+
producer.unConfirmedMessages = map[int64]*UnConfirmedMessage{}
76+
}
7777

7878
func (producer *Producer) lenUnConfirmed() int {
7979
producer.mutex.Lock()

pkg/system_integration/Vagrantfile

Lines changed: 0 additions & 71 deletions
This file was deleted.

0 commit comments

Comments
 (0)