Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ func (store *memoryStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg
return store.IncrNextSenderMsgSeqNum()
}

func (store *memoryStore) SaveBatchAndIncrNextSenderMsgSeqNum(seqNum int, msg [][]byte) error {
for offset, m := range msg {
if err := store.SaveMessageAndIncrNextSenderMsgSeqNum(seqNum+offset, m); err != nil {
return err
}
}
return nil
}

func (store *memoryStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error {
for seqNum := beginSeqNum; seqNum <= endSeqNum; seqNum++ {
if m, ok := store.messageMap[seqNum]; ok {
Expand Down
15 changes: 15 additions & 0 deletions registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,21 @@ func SendToTarget(m Messagable, sessionID SessionID) error {
return session.queueForSend(msg)
}

// SendBatchToTarget sends a batch of messages on the session. The entire batch
// will fail if the application callback returns an error for any of the
// messages.
//
// This function is more efficient as it calls the store only once for the whole
// batch.
func SendBatchToTarget(msgs []Messagable, sessionID SessionID) error {
session, ok := lookupSession(sessionID)
if !ok {
return errUnknownSession
}

return session.queueBatchForSend(msgs)
}

// ResetSession resets session's sequence numbers.
func ResetSession(sessionID SessionID) error {
session, ok := lookupSession(sessionID)
Expand Down
53 changes: 53 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,59 @@ func (s *session) prepMessageForSend(msg *Message, inReplyTo *Message) (msgBytes
return
}

func (s *session) queueBatchForSend(msg []Messagable) error {
s.sendMutex.Lock()
defer s.sendMutex.Unlock()

msgBytes, err := s.prepBatchAppMessagesForSend(msg)
if err != nil {
return err
}

s.toSend = append(s.toSend, msgBytes...)
s.notifyMessageOut()
return nil
}

func (s *session) prepBatchAppMessagesForSend(msg []Messagable) (msgBytes [][]byte, err error) {
msgBytes = make([][]byte, len(msg))

seqNum := s.store.NextSenderMsgSeqNum()
for i, m := range msg {
m := m.ToMessage()
s.fillDefaultHeader(m, nil)
m.Header.SetField(tagMsgSeqNum, FIXInt(seqNum+i))

msgType, err := m.Header.GetBytes(tagMsgType)
if err != nil {
return nil, err
}
if isAdminMessageType(msgType) {
return nil, fmt.Errorf("cannot send admin messages in batch")
}

if err := s.application.ToApp(m, s.sessionID); err != nil {
return nil, err
}

msgBytes[i] = m.build()
}

if err := s.persistBatch(seqNum, msgBytes); err != nil {
return nil, err
}

return msgBytes, nil
}

func (s *session) persistBatch(firstSeqNum int, msgBytes [][]byte) error {
if !s.DisableMessagePersist {
return s.store.SaveBatchAndIncrNextSenderMsgSeqNum(firstSeqNum, msgBytes)
}

return s.store.SetNextSenderMsgSeqNum(firstSeqNum + len(msgBytes))
}

func (s *session) persist(seqNum int, msgBytes []byte) error {
if !s.DisableMessagePersist {
return s.store.SaveMessageAndIncrNextSenderMsgSeqNum(seqNum, msgBytes)
Expand Down
19 changes: 19 additions & 0 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,25 @@ func (suite *SessionSendTestSuite) TestQueueForSendAdminMessage() {
suite.NextSenderMsgSeqNum(2)
}

func (suite *SessionSendTestSuite) TestQueueBatchForSend() {
suite.MockApp.On("ToApp").Return(nil)
suite.Require().NoError(suite.queueBatchForSend([]Messagable{suite.NewOrderSingle()}))

suite.MockApp.AssertExpectations(suite.T())
suite.NoMessageSent()
suite.MessagePersisted(suite.MockApp.lastToApp)
suite.FieldEquals(tagMsgSeqNum, 1, suite.MockApp.lastToApp.Header)
suite.NextSenderMsgSeqNum(2)
}

func (suite *SessionSendTestSuite) TestQueueBatchForSendDoNotSendAdmin() {
suite.Require().Error(suite.queueBatchForSend([]Messagable{suite.Heartbeat()}))

suite.MockApp.AssertExpectations(suite.T())
suite.NoMessageSent()
suite.NextSenderMsgSeqNum(1)
}

func (suite *SessionSendTestSuite) TestSendAppMessage() {
suite.MockApp.On("ToApp").Return(nil)
require.Nil(suite.T(), suite.send(suite.NewOrderSingle()))
Expand Down
1 change: 1 addition & 0 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type MessageStore interface {

SaveMessage(seqNum int, msg []byte) error
SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error
SaveBatchAndIncrNextSenderMsgSeqNum(seqNum int, msg [][]byte) error
GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error)
IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error

Expand Down
Loading