@@ -51,7 +51,7 @@ func (cs *ConfirmationStatus) GetErrorCode() uint16 {
51
51
}
52
52
53
53
type pendingMessagesSequence struct {
54
- messages []messageSequence
54
+ messages []* messageSequence
55
55
size int
56
56
}
57
57
@@ -60,6 +60,7 @@ type messageSequence struct {
60
60
unCompressedSize int
61
61
publishingId int64
62
62
filterValue string
63
+ refMessage * message.StreamMessage
63
64
}
64
65
65
66
type Producer struct {
@@ -68,7 +69,7 @@ type Producer struct {
68
69
onClose onInternalClose
69
70
unConfirmedMessages map [int64 ]* ConfirmationStatus
70
71
sequence int64
71
- mutex * sync.Mutex
72
+ mutex * sync.RWMutex
72
73
mutexPending * sync.Mutex
73
74
publishConfirm chan []* ConfirmationStatus
74
75
closeHandler chan Event
@@ -170,11 +171,27 @@ func NewProducerOptions() *ProducerOptions {
170
171
}
171
172
172
173
func (producer * Producer ) GetUnConfirmed () map [int64 ]* ConfirmationStatus {
173
- producer .mutex .Lock ()
174
- defer producer .mutex .Unlock ()
174
+ producer .mutex .RLock ()
175
+ defer producer .mutex .RUnlock ()
175
176
return producer .unConfirmedMessages
176
177
}
177
178
179
+ func (producer * Producer ) addUnConfirmedSequences (message []* messageSequence , producerID uint8 ) {
180
+ producer .mutex .Lock ()
181
+ defer producer .mutex .Unlock ()
182
+
183
+ for _ , msg := range message {
184
+ producer .unConfirmedMessages [msg .publishingId ] =
185
+ & ConfirmationStatus {
186
+ inserted : time .Now (),
187
+ message : * msg .refMessage ,
188
+ producerID : producerID ,
189
+ publishingId : msg .publishingId ,
190
+ confirmed : false ,
191
+ }
192
+ }
193
+
194
+ }
178
195
func (producer * Producer ) addUnConfirmed (sequence int64 , message message.StreamMessage , producerID uint8 ) {
179
196
producer .mutex .Lock ()
180
197
defer producer .mutex .Unlock ()
@@ -191,6 +208,18 @@ func (po *ProducerOptions) isSubEntriesBatching() bool {
191
208
return po .SubEntrySize > 1
192
209
}
193
210
211
+ func (producer * Producer ) removeFromConfirmationStatus (status []* ConfirmationStatus ) {
212
+ producer .mutex .Lock ()
213
+ defer producer .mutex .Unlock ()
214
+
215
+ for _ , msg := range status {
216
+ delete (producer .unConfirmedMessages , msg .publishingId )
217
+ for _ , linked := range msg .linkedTo {
218
+ delete (producer .unConfirmedMessages , linked .publishingId )
219
+ }
220
+ }
221
+ }
222
+
194
223
func (producer * Producer ) removeUnConfirmed (sequence int64 ) {
195
224
producer .mutex .Lock ()
196
225
defer producer .mutex .Unlock ()
@@ -210,13 +239,13 @@ func (producer *Producer) lenPendingMessages() int {
210
239
}
211
240
212
241
func (producer * Producer ) getUnConfirmed (sequence int64 ) * ConfirmationStatus {
213
- producer .mutex .Lock ()
214
- defer producer .mutex .Unlock ()
242
+ producer .mutex .RLock ()
243
+ defer producer .mutex .RUnlock ()
215
244
return producer .unConfirmedMessages [sequence ]
216
245
}
217
246
218
247
func (producer * Producer ) NotifyPublishConfirmation () ChannelPublishConfirm {
219
- ch := make (chan []* ConfirmationStatus )
248
+ ch := make (chan []* ConfirmationStatus , 1 )
220
249
producer .publishConfirm = ch
221
250
return ch
222
251
}
@@ -263,19 +292,26 @@ func (producer *Producer) startUnconfirmedMessagesTimeOutTask() {
263
292
go func () {
264
293
for producer .getStatus () == open {
265
294
time .Sleep (2 * time .Second )
266
- producer .mutex .Lock ()
295
+ toRemove := make ([]* ConfirmationStatus , 0 )
296
+ // check the unconfirmed messages and remove the one that are expired
297
+ // use the RLock to avoid blocking the producer
298
+ producer .mutex .RLock ()
267
299
for _ , msg := range producer .unConfirmedMessages {
268
300
if time .Since (msg .inserted ) > producer .options .ConfirmationTimeOut {
269
301
msg .err = ConfirmationTimoutError
270
302
msg .errorCode = timeoutError
271
303
msg .confirmed = false
272
- if producer .publishConfirm != nil {
273
- producer .publishConfirm <- []* ConfirmationStatus {msg }
274
- }
275
- delete (producer .unConfirmedMessages , msg .publishingId )
304
+ toRemove = append (toRemove , msg )
305
+ }
306
+ }
307
+ producer .mutex .RUnlock ()
308
+
309
+ if len (toRemove ) > 0 {
310
+ producer .removeFromConfirmationStatus (toRemove )
311
+ if producer .publishConfirm != nil {
312
+ producer .publishConfirm <- toRemove
276
313
}
277
314
}
278
- producer .mutex .Unlock ()
279
315
}
280
316
time .Sleep (5 * time .Second )
281
317
producer .flushUnConfirmedMessages (timeoutError , ConfirmationTimoutError )
@@ -312,7 +348,7 @@ func (producer *Producer) startPublishTask() {
312
348
}
313
349
314
350
producer .pendingMessages .size += msg .unCompressedSize
315
- producer .pendingMessages .messages = append (producer .pendingMessages .messages , msg )
351
+ producer .pendingMessages .messages = append (producer .pendingMessages .messages , & msg )
316
352
if len (producer .pendingMessages .messages ) >= (producer .options .BatchSize ) {
317
353
producer .sendBufferedMessages ()
318
354
}
@@ -384,7 +420,7 @@ func (producer *Producer) assignPublishingID(message message.StreamMessage) int6
384
420
// BatchSend is the primitive method to send messages to the stream, the method Send prepares the messages and
385
421
// calls BatchSend internally.
386
422
func (producer * Producer ) BatchSend (batchMessages []message.StreamMessage ) error {
387
- var messagesSequence = make ([]messageSequence , len (batchMessages ))
423
+ var messagesSequence = make ([]* messageSequence , len (batchMessages ))
388
424
totalBufferToSend := 0
389
425
for i , batchMessage := range batchMessages {
390
426
messageBytes , err := batchMessage .MarshalBinary ()
@@ -398,16 +434,17 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error
398
434
399
435
sequence := producer .assignPublishingID (batchMessage )
400
436
totalBufferToSend += len (messageBytes )
401
- messagesSequence [i ] = messageSequence {
437
+ messagesSequence [i ] = & messageSequence {
402
438
messageBytes : messageBytes ,
403
439
unCompressedSize : len (messageBytes ),
404
440
publishingId : sequence ,
405
441
filterValue : filterValue ,
442
+ refMessage : & batchMessage ,
406
443
}
407
-
408
- producer .addUnConfirmed (sequence , batchMessage , producer .id )
409
444
}
410
445
446
+ producer .addUnConfirmedSequences (messagesSequence , producer .GetID ())
447
+
411
448
if totalBufferToSend + initBufferPublishSize > producer .options .client .tuneState .requestedMaxFrameSize {
412
449
for _ , msg := range messagesSequence {
413
450
@@ -432,11 +469,11 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error
432
469
func (producer * Producer ) GetID () uint8 {
433
470
return producer .id
434
471
}
435
- func (producer * Producer ) internalBatchSend (messagesSequence []messageSequence ) error {
472
+ func (producer * Producer ) internalBatchSend (messagesSequence []* messageSequence ) error {
436
473
return producer .internalBatchSendProdId (messagesSequence , producer .GetID ())
437
474
}
438
475
439
- func (producer * Producer ) simpleAggregation (messagesSequence []messageSequence , b * bufio.Writer ) {
476
+ func (producer * Producer ) simpleAggregation (messagesSequence []* messageSequence , b * bufio.Writer ) {
440
477
for _ , msg := range messagesSequence {
441
478
r := msg .messageBytes
442
479
writeBLong (b , msg .publishingId ) // publishingId
@@ -459,13 +496,15 @@ func (producer *Producer) subEntryAggregation(aggregation subEntries, b *bufio.W
459
496
}
460
497
}
461
498
462
- func (producer * Producer ) aggregateEntities (msgs []messageSequence , size int , compression Compression ) (subEntries , error ) {
499
+ func (producer * Producer ) aggregateEntities (msgs []* messageSequence , size int , compression Compression ) (subEntries , error ) {
463
500
subEntries := subEntries {}
464
501
465
502
var entry * subEntry
466
503
for _ , msg := range msgs {
467
504
if len (subEntries .items ) == 0 || len (entry .messages ) >= size {
468
- entry = & subEntry {}
505
+ entry = & subEntry {
506
+ messages : make ([]* messageSequence , 0 ),
507
+ }
469
508
entry .publishingId = - 1
470
509
subEntries .items = append (subEntries .items , entry )
471
510
}
@@ -506,7 +545,7 @@ func (producer *Producer) aggregateEntities(msgs []messageSequence, size int, co
506
545
/// the producer id is always the producer.GetID(). This function is needed only for testing
507
546
// some condition, like simulate publish error, see
508
547
509
- func (producer * Producer ) internalBatchSendProdId (messagesSequence []messageSequence , producerID uint8 ) error {
548
+ func (producer * Producer ) internalBatchSendProdId (messagesSequence []* messageSequence , producerID uint8 ) error {
510
549
producer .options .client .socket .mutex .Lock ()
511
550
defer producer .options .client .socket .mutex .Unlock ()
512
551
if producer .getStatus () == closed {
@@ -656,7 +695,7 @@ func (producer *Producer) GetName() string {
656
695
return producer .options .Name
657
696
}
658
697
659
- func (producer * Producer ) sendWithFilter (messagesSequence []messageSequence , producerID uint8 ) error {
698
+ func (producer * Producer ) sendWithFilter (messagesSequence []* messageSequence , producerID uint8 ) error {
660
699
frameHeaderLength := initBufferPublishSize
661
700
var msgLen int
662
701
for _ , msg := range messagesSequence {
0 commit comments