Skip to content

Commit 017cffd

Browse files
committed
Support batch of app messages
1 parent 2ed31c3 commit 017cffd

File tree

12 files changed

+97
-1547
lines changed

12 files changed

+97
-1547
lines changed

memory_store.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,15 @@ func (store *memoryStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg
9797
return store.IncrNextSenderMsgSeqNum()
9898
}
9999

100+
func (store *memoryStore) SaveBatchAndIncrNextSenderMsgSeqNum(seqNum int, msg [][]byte) error {
101+
for offset, m := range msg {
102+
if err := store.SaveMessageAndIncrNextSenderMsgSeqNum(seqNum+offset, m); err != nil {
103+
return err
104+
}
105+
}
106+
return nil
107+
}
108+
100109
func (store *memoryStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error {
101110
for seqNum := beginSeqNum; seqNum <= endSeqNum; seqNum++ {
102111
if m, ok := store.messageMap[seqNum]; ok {

registry.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,21 @@ func SendToTarget(m Messagable, sessionID SessionID) error {
6464
return session.queueForSend(msg)
6565
}
6666

67+
// SendBatchToTarget sends a batch of messages on the session. The entire batch
68+
// will fail if the application callback returns an error for any of the
69+
// messages.
70+
//
71+
// This function is more efficient as it calls the store only once for the whole
72+
// batch.
73+
func SendBatchToTarget(msgs []Messagable, sessionID SessionID) error {
74+
session, ok := lookupSession(sessionID)
75+
if !ok {
76+
return errUnknownSession
77+
}
78+
79+
return session.queueBatchForSend(msgs)
80+
}
81+
6782
// ResetSession resets session's sequence numbers.
6883
func ResetSession(sessionID SessionID) error {
6984
session, ok := lookupSession(sessionID)

session.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,59 @@ func (s *session) prepMessageForSend(msg *Message, inReplyTo *Message) (msgBytes
388388
return
389389
}
390390

391+
func (s *session) queueBatchForSend(msg []Messagable) error {
392+
s.sendMutex.Lock()
393+
defer s.sendMutex.Unlock()
394+
395+
msgBytes, err := s.prepBatchAppMessagesForSend(msg)
396+
if err != nil {
397+
return err
398+
}
399+
400+
s.toSend = append(s.toSend, msgBytes...)
401+
s.notifyMessageOut()
402+
return nil
403+
}
404+
405+
func (s *session) prepBatchAppMessagesForSend(msg []Messagable) (msgBytes [][]byte, err error) {
406+
msgBytes = make([][]byte, len(msg))
407+
408+
seqNum := s.store.NextSenderMsgSeqNum()
409+
for i, m := range msg {
410+
m := m.ToMessage()
411+
s.fillDefaultHeader(m, nil)
412+
m.Header.SetField(tagMsgSeqNum, FIXInt(seqNum+i))
413+
414+
msgType, err := m.Header.GetBytes(tagMsgType)
415+
if err != nil {
416+
return nil, err
417+
}
418+
if isAdminMessageType(msgType) {
419+
return nil, fmt.Errorf("cannot send admin messages in batch")
420+
}
421+
422+
if err := s.application.ToApp(m, s.sessionID); err != nil {
423+
return nil, err
424+
}
425+
426+
msgBytes[i] = m.build()
427+
}
428+
429+
if err := s.persistBatch(seqNum, msgBytes); err != nil {
430+
return nil, err
431+
}
432+
433+
return msgBytes, nil
434+
}
435+
436+
func (s *session) persistBatch(firstSeqNum int, msgBytes [][]byte) error {
437+
if !s.DisableMessagePersist {
438+
return s.store.SaveBatchAndIncrNextSenderMsgSeqNum(firstSeqNum, msgBytes)
439+
}
440+
441+
return s.store.SetNextSenderMsgSeqNum(firstSeqNum + len(msgBytes))
442+
}
443+
391444
func (s *session) persist(seqNum int, msgBytes []byte) error {
392445
if !s.DisableMessagePersist {
393446
return s.store.SaveMessageAndIncrNextSenderMsgSeqNum(seqNum, msgBytes)

session_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -853,6 +853,25 @@ func (suite *SessionSendTestSuite) TestQueueForSendAdminMessage() {
853853
suite.NextSenderMsgSeqNum(2)
854854
}
855855

856+
func (suite *SessionSendTestSuite) TestQueueBatchForSend() {
857+
suite.MockApp.On("ToApp").Return(nil)
858+
suite.Require().NoError(suite.queueBatchForSend([]Messagable{suite.NewOrderSingle()}))
859+
860+
suite.MockApp.AssertExpectations(suite.T())
861+
suite.NoMessageSent()
862+
suite.MessagePersisted(suite.MockApp.lastToApp)
863+
suite.FieldEquals(tagMsgSeqNum, 1, suite.MockApp.lastToApp.Header)
864+
suite.NextSenderMsgSeqNum(2)
865+
}
866+
867+
func (suite *SessionSendTestSuite) TestQueueBatchForSendDoNotSendAdmin() {
868+
suite.Require().Error(suite.queueBatchForSend([]Messagable{suite.Heartbeat()}))
869+
870+
suite.MockApp.AssertExpectations(suite.T())
871+
suite.NoMessageSent()
872+
suite.NextSenderMsgSeqNum(1)
873+
}
874+
856875
func (suite *SessionSendTestSuite) TestSendAppMessage() {
857876
suite.MockApp.On("ToApp").Return(nil)
858877
require.Nil(suite.T(), suite.send(suite.NewOrderSingle()))

store.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type MessageStore interface {
3535

3636
SaveMessage(seqNum int, msg []byte) error
3737
SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error
38+
SaveBatchAndIncrNextSenderMsgSeqNum(seqNum int, msg [][]byte) error
3839
GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error)
3940
IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error
4041

0 commit comments

Comments
 (0)