@@ -388,6 +388,59 @@ func (s *session) prepMessageForSend(msg *Message, inReplyTo *Message) (msgBytes
388
388
return
389
389
}
390
390
391
+ func (s * session ) queueBatchForSend (msg []Messagable ) error {
392
+ s .sendMutex .Lock ()
393
+ defer s .sendMutex .Unlock ()
394
+
395
+ msgBytes , err := s .prepBatchAppMessagesForSend (msg )
396
+ if err != nil {
397
+ return err
398
+ }
399
+
400
+ s .toSend = append (s .toSend , msgBytes ... )
401
+ s .notifyMessageOut ()
402
+ return nil
403
+ }
404
+
405
+ func (s * session ) prepBatchAppMessagesForSend (msg []Messagable ) (msgBytes [][]byte , err error ) {
406
+ msgBytes = make ([][]byte , len (msg ))
407
+
408
+ seqNum := s .store .NextSenderMsgSeqNum ()
409
+ for i , m := range msg {
410
+ m := m .ToMessage ()
411
+ s .fillDefaultHeader (m , nil )
412
+ m .Header .SetField (tagMsgSeqNum , FIXInt (seqNum + i ))
413
+
414
+ msgType , err := m .Header .GetBytes (tagMsgType )
415
+ if err != nil {
416
+ return nil , err
417
+ }
418
+ if isAdminMessageType (msgType ) {
419
+ return nil , fmt .Errorf ("cannot send admin messages in batch" )
420
+ }
421
+
422
+ if err := s .application .ToApp (m , s .sessionID ); err != nil {
423
+ return nil , err
424
+ }
425
+
426
+ msgBytes [i ] = m .build ()
427
+ }
428
+
429
+ if err := s .persistBatch (seqNum , msgBytes ); err != nil {
430
+ return nil , err
431
+ }
432
+
433
+ return msgBytes , nil
434
+ }
435
+
436
+ func (s * session ) persistBatch (firstSeqNum int , msgBytes [][]byte ) error {
437
+ if ! s .DisableMessagePersist {
438
+ return s .store .SaveBatchAndIncrNextSenderMsgSeqNum (firstSeqNum , msgBytes )
439
+ }
440
+
441
+ return s .store .SetNextSenderMsgSeqNum (firstSeqNum + len (msgBytes ))
442
+ }
443
+
391
444
func (s * session ) persist (seqNum int , msgBytes []byte ) error {
392
445
if ! s .DisableMessagePersist {
393
446
return s .store .SaveMessageAndIncrNextSenderMsgSeqNum (seqNum , msgBytes )
0 commit comments