From 017cffdee3e4c858406d7c009a048700b24fb2df Mon Sep 17 00:00:00 2001 From: Gaylor Bosson Date: Tue, 28 Jan 2025 10:19:51 +0100 Subject: [PATCH] Support batch of app messages --- memory_store.go | 9 + registry.go | 15 ++ session.go | 53 ++++ session_test.go | 19 ++ store.go | 1 + store/file/file_store.go | 425 -------------------------------- store/file/file_store_test.go | 79 ------ store/file/util.go | 84 ------- store/mongo/mongo_store.go | 393 ----------------------------- store/mongo/mongo_store_test.go | 75 ------ store/sql/sql_store.go | 400 ------------------------------ store/sql/sql_store_test.go | 91 ------- 12 files changed, 97 insertions(+), 1547 deletions(-) delete mode 100644 store/file/file_store.go delete mode 100644 store/file/file_store_test.go delete mode 100644 store/file/util.go delete mode 100644 store/mongo/mongo_store.go delete mode 100644 store/mongo/mongo_store_test.go delete mode 100644 store/sql/sql_store.go delete mode 100644 store/sql/sql_store_test.go diff --git a/memory_store.go b/memory_store.go index 7c57ca783..7e0f3542b 100644 --- a/memory_store.go +++ b/memory_store.go @@ -97,6 +97,15 @@ func (store *memoryStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg return store.IncrNextSenderMsgSeqNum() } +func (store *memoryStore) SaveBatchAndIncrNextSenderMsgSeqNum(seqNum int, msg [][]byte) error { + for offset, m := range msg { + if err := store.SaveMessageAndIncrNextSenderMsgSeqNum(seqNum+offset, m); err != nil { + return err + } + } + return nil +} + func (store *memoryStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error { for seqNum := beginSeqNum; seqNum <= endSeqNum; seqNum++ { if m, ok := store.messageMap[seqNum]; ok { diff --git a/registry.go b/registry.go index 5f8e69fc2..04ba7496e 100644 --- a/registry.go +++ b/registry.go @@ -64,6 +64,21 @@ func SendToTarget(m Messagable, sessionID SessionID) error { return session.queueForSend(msg) } +// SendBatchToTarget sends a batch of messages on the session. The entire batch +// will fail if the application callback returns an error for any of the +// messages. +// +// This function is more efficient as it calls the store only once for the whole +// batch. +func SendBatchToTarget(msgs []Messagable, sessionID SessionID) error { + session, ok := lookupSession(sessionID) + if !ok { + return errUnknownSession + } + + return session.queueBatchForSend(msgs) +} + // ResetSession resets session's sequence numbers. func ResetSession(sessionID SessionID) error { session, ok := lookupSession(sessionID) diff --git a/session.go b/session.go index 49b1b467b..d7dcffe0f 100644 --- a/session.go +++ b/session.go @@ -388,6 +388,59 @@ func (s *session) prepMessageForSend(msg *Message, inReplyTo *Message) (msgBytes return } +func (s *session) queueBatchForSend(msg []Messagable) error { + s.sendMutex.Lock() + defer s.sendMutex.Unlock() + + msgBytes, err := s.prepBatchAppMessagesForSend(msg) + if err != nil { + return err + } + + s.toSend = append(s.toSend, msgBytes...) + s.notifyMessageOut() + return nil +} + +func (s *session) prepBatchAppMessagesForSend(msg []Messagable) (msgBytes [][]byte, err error) { + msgBytes = make([][]byte, len(msg)) + + seqNum := s.store.NextSenderMsgSeqNum() + for i, m := range msg { + m := m.ToMessage() + s.fillDefaultHeader(m, nil) + m.Header.SetField(tagMsgSeqNum, FIXInt(seqNum+i)) + + msgType, err := m.Header.GetBytes(tagMsgType) + if err != nil { + return nil, err + } + if isAdminMessageType(msgType) { + return nil, fmt.Errorf("cannot send admin messages in batch") + } + + if err := s.application.ToApp(m, s.sessionID); err != nil { + return nil, err + } + + msgBytes[i] = m.build() + } + + if err := s.persistBatch(seqNum, msgBytes); err != nil { + return nil, err + } + + return msgBytes, nil +} + +func (s *session) persistBatch(firstSeqNum int, msgBytes [][]byte) error { + if !s.DisableMessagePersist { + return s.store.SaveBatchAndIncrNextSenderMsgSeqNum(firstSeqNum, msgBytes) + } + + return s.store.SetNextSenderMsgSeqNum(firstSeqNum + len(msgBytes)) +} + func (s *session) persist(seqNum int, msgBytes []byte) error { if !s.DisableMessagePersist { return s.store.SaveMessageAndIncrNextSenderMsgSeqNum(seqNum, msgBytes) diff --git a/session_test.go b/session_test.go index 9308ec4f0..6899ce9c6 100644 --- a/session_test.go +++ b/session_test.go @@ -853,6 +853,25 @@ func (suite *SessionSendTestSuite) TestQueueForSendAdminMessage() { suite.NextSenderMsgSeqNum(2) } +func (suite *SessionSendTestSuite) TestQueueBatchForSend() { + suite.MockApp.On("ToApp").Return(nil) + suite.Require().NoError(suite.queueBatchForSend([]Messagable{suite.NewOrderSingle()})) + + suite.MockApp.AssertExpectations(suite.T()) + suite.NoMessageSent() + suite.MessagePersisted(suite.MockApp.lastToApp) + suite.FieldEquals(tagMsgSeqNum, 1, suite.MockApp.lastToApp.Header) + suite.NextSenderMsgSeqNum(2) +} + +func (suite *SessionSendTestSuite) TestQueueBatchForSendDoNotSendAdmin() { + suite.Require().Error(suite.queueBatchForSend([]Messagable{suite.Heartbeat()})) + + suite.MockApp.AssertExpectations(suite.T()) + suite.NoMessageSent() + suite.NextSenderMsgSeqNum(1) +} + func (suite *SessionSendTestSuite) TestSendAppMessage() { suite.MockApp.On("ToApp").Return(nil) require.Nil(suite.T(), suite.send(suite.NewOrderSingle())) diff --git a/store.go b/store.go index 6689297ba..f4c66dc23 100644 --- a/store.go +++ b/store.go @@ -35,6 +35,7 @@ type MessageStore interface { SaveMessage(seqNum int, msg []byte) error SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error + SaveBatchAndIncrNextSenderMsgSeqNum(seqNum int, msg [][]byte) error GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error diff --git a/store/file/file_store.go b/store/file/file_store.go deleted file mode 100644 index c0a28531e..000000000 --- a/store/file/file_store.go +++ /dev/null @@ -1,425 +0,0 @@ -// Copyright (c) quickfixengine.org All rights reserved. -// -// This file may be distributed under the terms of the quickfixengine.org -// license as defined by quickfixengine.org and appearing in the file -// LICENSE included in the packaging of this file. -// -// This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING -// THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A -// PARTICULAR PURPOSE. -// -// See http://www.quickfixengine.org/LICENSE for licensing information. -// -// Contact ask@quickfixengine.org if any conditions of this licensing -// are not clear to you. - -package file - -import ( - "fmt" - "io" - "os" - "path" - "strconv" - "strings" - "sync" - "time" - - "github.com/pkg/errors" - "github.com/quickfixgo/quickfix" - "github.com/quickfixgo/quickfix/config" -) - -type fileStoreFactory struct { - settings *quickfix.Settings -} - -type fileStore struct { - sessionID quickfix.SessionID - cache quickfix.MessageStore - bodyFname string - headerFname string - sessionFname string - senderSeqNumsFname string - targetSeqNumsFname string - - fileMu sync.Mutex - bodyFile *os.File - headerFile *os.File - sessionFile *os.File - senderSeqNumsFile *os.File - targetSeqNumsFile *os.File - fileSync bool -} - -// NewStoreFactory returns a file-based implementation of MessageStoreFactory. -func NewStoreFactory(settings *quickfix.Settings) quickfix.MessageStoreFactory { - return fileStoreFactory{settings: settings} -} - -// Create creates a new FileStore implementation of the MessageStore interface. -func (f fileStoreFactory) Create(sessionID quickfix.SessionID) (msgStore quickfix.MessageStore, err error) { - globalSettings := f.settings.GlobalSettings() - dynamicSessions, _ := globalSettings.BoolSetting(config.DynamicSessions) - - sessionSettings, ok := f.settings.SessionSettings()[sessionID] - if !ok { - if dynamicSessions { - sessionSettings = globalSettings - } else { - return nil, fmt.Errorf("unknown session: %v", sessionID) - } - } - - dirname, err := sessionSettings.Setting(config.FileStorePath) - if err != nil { - return nil, err - } - var fsync bool - if sessionSettings.HasSetting(config.FileStoreSync) { - fsync, err = sessionSettings.BoolSetting(config.FileStoreSync) - if err != nil { - return nil, err - } - } else { - fsync = true //existing behavior is to fsync writes - } - return newFileStore(sessionID, dirname, fsync) -} - -func newFileStore(sessionID quickfix.SessionID, dirname string, fileSync bool) (*fileStore, error) { - if err := os.MkdirAll(dirname, os.ModePerm); err != nil { - return nil, err - } - - sessionPrefix := createFilenamePrefix(sessionID) - - memStore, memErr := quickfix.NewMemoryStoreFactory().Create(sessionID) - if memErr != nil { - return nil, errors.Wrap(memErr, "cache creation") - } - - store := &fileStore{ - sessionID: sessionID, - cache: memStore, - bodyFname: path.Join(dirname, fmt.Sprintf("%s.%s", sessionPrefix, "body")), - headerFname: path.Join(dirname, fmt.Sprintf("%s.%s", sessionPrefix, "header")), - sessionFname: path.Join(dirname, fmt.Sprintf("%s.%s", sessionPrefix, "session")), - senderSeqNumsFname: path.Join(dirname, fmt.Sprintf("%s.%s", sessionPrefix, "senderseqnums")), - targetSeqNumsFname: path.Join(dirname, fmt.Sprintf("%s.%s", sessionPrefix, "targetseqnums")), - fileSync: fileSync, - } - - if err := store.Refresh(); err != nil { - return nil, err - } - - return store, nil -} - -// Reset deletes the store files and sets the seqnums back to 1. -func (store *fileStore) Reset() error { - if err := store.cache.Reset(); err != nil { - return errors.Wrap(err, "cache reset") - } - - if err := store.Close(); err != nil { - return errors.Wrap(err, "close") - } - if err := removeFile(store.bodyFname); err != nil { - return err - } - if err := removeFile(store.headerFname); err != nil { - return err - } - if err := removeFile(store.sessionFname); err != nil { - return err - } - if err := removeFile(store.senderSeqNumsFname); err != nil { - return err - } - if err := removeFile(store.targetSeqNumsFname); err != nil { - return err - } - return store.Refresh() -} - -// Refresh closes the store files and then reloads from them. -func (store *fileStore) Refresh() (err error) { - if err = store.cache.Reset(); err != nil { - err = errors.Wrap(err, "cache reset") - return - } - - if err = store.Close(); err != nil { - return err - } - - creationTimePopulated, err := store.populateCache() - if err != nil { - return err - } - - if store.bodyFile, err = openOrCreateFile(store.bodyFname, 0660); err != nil { - return err - } - if store.headerFile, err = openOrCreateFile(store.headerFname, 0660); err != nil { - return err - } - if store.sessionFile, err = openOrCreateFile(store.sessionFname, 0660); err != nil { - return err - } - if store.senderSeqNumsFile, err = openOrCreateFile(store.senderSeqNumsFname, 0660); err != nil { - return err - } - if store.targetSeqNumsFile, err = openOrCreateFile(store.targetSeqNumsFname, 0660); err != nil { - return err - } - - if !creationTimePopulated { - if err := store.setSession(); err != nil { - return err - } - } - - if err := store.SetNextSenderMsgSeqNum(store.NextSenderMsgSeqNum()); err != nil { - return errors.Wrap(err, "set next sender") - } - - if err := store.SetNextTargetMsgSeqNum(store.NextTargetMsgSeqNum()); err != nil { - return errors.Wrap(err, "set next target") - } - return nil -} - -func (store *fileStore) populateCache() (creationTimePopulated bool, err error) { - if timeBytes, err := os.ReadFile(store.sessionFname); err == nil { - var ctime time.Time - if err := ctime.UnmarshalText(timeBytes); err == nil { - store.cache.SetCreationTime(ctime) - creationTimePopulated = true - } - } - - if senderSeqNumBytes, err := os.ReadFile(store.senderSeqNumsFname); err == nil { - if senderSeqNum, err := strconv.Atoi(strings.Trim(string(senderSeqNumBytes), "\r\n")); err == nil { - if err = store.cache.SetNextSenderMsgSeqNum(senderSeqNum); err != nil { - return creationTimePopulated, errors.Wrap(err, "cache set next sender") - } - } - } - - if targetSeqNumBytes, err := os.ReadFile(store.targetSeqNumsFname); err == nil { - if targetSeqNum, err := strconv.Atoi(strings.Trim(string(targetSeqNumBytes), "\r\n")); err == nil { - if err = store.cache.SetNextTargetMsgSeqNum(targetSeqNum); err != nil { - return creationTimePopulated, errors.Wrap(err, "cache set next target") - } - } - } - - return creationTimePopulated, nil -} - -func (store *fileStore) setSession() error { - store.fileMu.Lock() - defer store.fileMu.Unlock() - - if _, err := store.sessionFile.Seek(0, io.SeekStart); err != nil { - return fmt.Errorf("unable to rewind file: %s: %s", store.sessionFname, err.Error()) - } - - data, err := store.cache.CreationTime().MarshalText() - if err != nil { - return fmt.Errorf("unable to marshal session time to file: %s: %s", store.sessionFname, err.Error()) - } - if _, err := store.sessionFile.Write(data); err != nil { - return fmt.Errorf("unable to write to file: %s: %s", store.sessionFname, err.Error()) - } - if store.fileSync { - if err := store.sessionFile.Sync(); err != nil { - return fmt.Errorf("unable to flush file: %s: %s", store.sessionFname, err.Error()) - } - } - return nil -} - -func (store *fileStore) setSeqNum(f *os.File, seqNum int) error { - store.fileMu.Lock() - defer store.fileMu.Unlock() - if _, err := f.Seek(0, io.SeekStart); err != nil { - return fmt.Errorf("unable to rewind file: %s: %s", f.Name(), err.Error()) - } - if _, err := fmt.Fprintf(f, "%019d", seqNum); err != nil { - return fmt.Errorf("unable to write to file: %s: %s", f.Name(), err.Error()) - } - if store.fileSync { - if err := f.Sync(); err != nil { - return fmt.Errorf("unable to flush file: %s: %s", f.Name(), err.Error()) - } - } - return nil -} - -// NextSenderMsgSeqNum returns the next MsgSeqNum that will be sent. -func (store *fileStore) NextSenderMsgSeqNum() int { - return store.cache.NextSenderMsgSeqNum() -} - -// NextTargetMsgSeqNum returns the next MsgSeqNum that should be received. -func (store *fileStore) NextTargetMsgSeqNum() int { - return store.cache.NextTargetMsgSeqNum() -} - -// SetNextSenderMsgSeqNum sets the next MsgSeqNum that will be sent. -func (store *fileStore) SetNextSenderMsgSeqNum(next int) error { - if err := store.setSeqNum(store.senderSeqNumsFile, next); err != nil { - return errors.Wrap(err, "file") - } - return store.cache.SetNextSenderMsgSeqNum(next) -} - -// SetNextTargetMsgSeqNum sets the next MsgSeqNum that should be received. -func (store *fileStore) SetNextTargetMsgSeqNum(next int) error { - if err := store.setSeqNum(store.targetSeqNumsFile, next); err != nil { - return errors.Wrap(err, "file") - } - return store.cache.SetNextTargetMsgSeqNum(next) -} - -// IncrNextSenderMsgSeqNum increments the next MsgSeqNum that will be sent. -func (store *fileStore) IncrNextSenderMsgSeqNum() error { - if err := store.SetNextSenderMsgSeqNum(store.cache.NextSenderMsgSeqNum() + 1); err != nil { - return errors.Wrap(err, "file") - } - return nil -} - -// IncrNextTargetMsgSeqNum increments the next MsgSeqNum that should be received. -func (store *fileStore) IncrNextTargetMsgSeqNum() error { - if err := store.SetNextTargetMsgSeqNum(store.cache.NextTargetMsgSeqNum() + 1); err != nil { - return errors.Wrap(err, "file") - } - return nil -} - -// CreationTime returns the creation time of the store. -func (store *fileStore) CreationTime() time.Time { - return store.cache.CreationTime() -} - -// SetCreationTime is a no-op for FileStore. -func (store *fileStore) SetCreationTime(_ time.Time) { -} - -func (store *fileStore) SaveMessage(seqNum int, msg []byte) error { - store.fileMu.Lock() - defer store.fileMu.Unlock() - offset, err := store.bodyFile.Seek(0, io.SeekEnd) - if err != nil { - return fmt.Errorf("unable to seek to end of file: %s: %s", store.bodyFname, err.Error()) - } - if _, err := store.headerFile.Seek(0, io.SeekEnd); err != nil { - return fmt.Errorf("unable to seek to end of file: %s: %s", store.headerFname, err.Error()) - } - if _, err := fmt.Fprintf(store.headerFile, "%d,%d,%d\n", seqNum, offset, len(msg)); err != nil { - return fmt.Errorf("unable to write to file: %s: %s", store.headerFname, err.Error()) - } - - if _, err := store.bodyFile.Write(msg); err != nil { - return fmt.Errorf("unable to write to file: %s: %s", store.bodyFname, err.Error()) - } - if store.fileSync { - if err := store.bodyFile.Sync(); err != nil { - return fmt.Errorf("unable to flush file: %s: %s", store.bodyFname, err.Error()) - } - if err := store.headerFile.Sync(); err != nil { - return fmt.Errorf("unable to flush file: %s: %s", store.headerFname, err.Error()) - } - } - - return nil -} - -func (store *fileStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error { - err := store.SaveMessage(seqNum, msg) - if err != nil { - return err - } - return store.IncrNextSenderMsgSeqNum() -} - -func (store *fileStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error { - store.fileMu.Lock() - defer store.fileMu.Unlock() - - // Sync files and seek to start of header file - if err := store.bodyFile.Sync(); err != nil { - return fmt.Errorf("unable to flush file: %s: %s", store.bodyFname, err.Error()) - } else if err = store.headerFile.Sync(); err != nil { - return fmt.Errorf("unable to flush file: %s: %s", store.headerFname, err.Error()) - } else if _, err = store.headerFile.Seek(0, io.SeekStart); err != nil { - return fmt.Errorf("unable to seek to start of file: %s: %s", store.headerFname, err.Error()) - } - - // Iterate over the header file - for { - var seqNum, size int - var offset int64 - if cnt, err := fmt.Fscanf(store.headerFile, "%d,%d,%d\n", &seqNum, &offset, &size); err != nil { - if errors.Is(err, io.EOF) { - break - } - return fmt.Errorf("unable to read from file: %s: %s", store.headerFname, err.Error()) - } else if cnt < 3 || seqNum > endSeqNum { - // If we have reached the end of possible iteration then break - break - } else if seqNum < beginSeqNum { - // If we have not yet reached the starting sequence number then continue - continue - } - // Otherwise process the file - msg := make([]byte, size) - if _, err := store.bodyFile.ReadAt(msg, offset); err != nil { - return fmt.Errorf("unable to read from file: %s: %s", store.bodyFname, err.Error()) - } else if err = cb(msg); err != nil { - return err - } - } - return nil -} - -func (store *fileStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) { - var msgs [][]byte - err := store.IterateMessages(beginSeqNum, endSeqNum, func(msg []byte) error { - msgs = append(msgs, msg) - return nil - }) - return msgs, err -} - -// Close closes the store's files. -func (store *fileStore) Close() error { - if err := closeSyncFile(store.bodyFile); err != nil { - return err - } - if err := closeSyncFile(store.headerFile); err != nil { - return err - } - if err := closeSyncFile(store.sessionFile); err != nil { - return err - } - if err := closeSyncFile(store.senderSeqNumsFile); err != nil { - return err - } - if err := closeSyncFile(store.targetSeqNumsFile); err != nil { - return err - } - - store.bodyFile = nil - store.headerFile = nil - store.sessionFile = nil - store.senderSeqNumsFile = nil - store.targetSeqNumsFile = nil - - return nil -} diff --git a/store/file/file_store_test.go b/store/file/file_store_test.go deleted file mode 100644 index ee0497828..000000000 --- a/store/file/file_store_test.go +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright (c) quickfixengine.org All rights reserved. -// -// This file may be distributed under the terms of the quickfixengine.org -// license as defined by quickfixengine.org and appearing in the file -// LICENSE included in the packaging of this file. -// -// This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING -// THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A -// PARTICULAR PURPOSE. -// -// See http://www.quickfixengine.org/LICENSE for licensing information. -// -// Contact ask@quickfixengine.org if any conditions of this licensing -// are not clear to you. - -package file - -import ( - "fmt" - "os" - "path" - "strconv" - "strings" - "testing" - "time" - - "github.com/quickfixgo/quickfix" - "github.com/quickfixgo/quickfix/internal/testsuite" - assert2 "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" -) - -// FileStoreTestSuite runs all tests in the MessageStoreTestSuite against the FileStore implementation. -type FileStoreTestSuite struct { - testsuite.StoreTestSuite - fileStoreRootPath string -} - -func (suite *FileStoreTestSuite) SetupTest() { - suite.fileStoreRootPath = path.Join(os.TempDir(), fmt.Sprintf("FileStoreTestSuite-%d", os.Getpid())) - fileStorePath := path.Join(suite.fileStoreRootPath, fmt.Sprintf("%d", time.Now().UnixNano())) - sessionID := quickfix.SessionID{BeginString: "FIX.4.4", SenderCompID: "SENDER", TargetCompID: "TARGET"} - - // create settings - settings, err := quickfix.ParseSettings(strings.NewReader(fmt.Sprintf(` -[DEFAULT] -FileStorePath=%s - -[SESSION] -BeginString=%s -SenderCompID=%s -TargetCompID=%s`, fileStorePath, sessionID.BeginString, sessionID.SenderCompID, sessionID.TargetCompID))) - require.Nil(suite.T(), err) - - // create store - suite.MsgStore, err = NewStoreFactory(settings).Create(sessionID) - require.Nil(suite.T(), err) -} - -func (suite *FileStoreTestSuite) TearDownTest() { - suite.MsgStore.Close() - os.RemoveAll(suite.fileStoreRootPath) -} - -func TestFileStoreTestSuite(t *testing.T) { - suite.Run(t, new(FileStoreTestSuite)) -} - -func TestStringParse(t *testing.T) { - assert := assert2.New(t) - i, err := strconv.Atoi(strings.Trim("00005\n", "\r\n")) - assert.Nil(err) - assert.Equal(5, i) - - i, err = strconv.Atoi(strings.Trim("00006\r", "\r\n")) - assert.Nil(err) - assert.Equal(6, i) -} diff --git a/store/file/util.go b/store/file/util.go deleted file mode 100644 index 9baf380f6..000000000 --- a/store/file/util.go +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright (c) quickfixengine.org All rights reserved. -// -// This file may be distributed under the terms of the quickfixengine.org -// license as defined by quickfixengine.org and appearing in the file -// LICENSE included in the packaging of this file. -// -// This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING -// THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A -// PARTICULAR PURPOSE. -// -// See http://www.quickfixengine.org/LICENSE for licensing information. -// -// Contact ask@quickfixengine.org if any conditions of this licensing -// are not clear to you. - -package file - -import ( - "fmt" - "os" - "strings" - - "github.com/pkg/errors" - "github.com/quickfixgo/quickfix" -) - -func createFilenamePrefix(s quickfix.SessionID) string { - sender := []string{s.SenderCompID} - if s.SenderSubID != "" { - sender = append(sender, s.SenderSubID) - } - if s.SenderLocationID != "" { - sender = append(sender, s.SenderLocationID) - } - - target := []string{s.TargetCompID} - if s.TargetSubID != "" { - target = append(target, s.TargetSubID) - } - if s.TargetLocationID != "" { - target = append(target, s.TargetLocationID) - } - - fname := []string{s.BeginString, strings.Join(sender, "_"), strings.Join(target, "_")} - if s.Qualifier != "" { - fname = append(fname, s.Qualifier) - } - return strings.Join(fname, "-") -} - -// closeSyncFile behaves like Sync and Close, except that no error is returned if the file does not exist. -func closeSyncFile(f *os.File) error { - if f != nil { - if err := f.Sync(); err != nil { - if !os.IsNotExist(err) { - return err - } - } - if err := f.Close(); err != nil { - if !os.IsNotExist(err) { - return err - } - } - } - return nil -} - -// removeFile behaves like os.Remove, except that no error is returned if the file does not exist. -func removeFile(fname string) error { - if err := os.Remove(fname); (err != nil) && !os.IsNotExist(err) { - return errors.Wrapf(err, "remove %v", fname) - } - return nil -} - -// openOrCreateFile opens a file for reading and writing, creating it if necessary. -func openOrCreateFile(fname string, perm os.FileMode) (f *os.File, err error) { - if f, err = os.OpenFile(fname, os.O_RDWR, perm); err != nil { - if f, err = os.OpenFile(fname, os.O_RDWR|os.O_CREATE, perm); err != nil { - return nil, fmt.Errorf("error opening or creating file: %s: %s", fname, err.Error()) - } - } - return f, nil -} diff --git a/store/mongo/mongo_store.go b/store/mongo/mongo_store.go deleted file mode 100644 index 42696388e..000000000 --- a/store/mongo/mongo_store.go +++ /dev/null @@ -1,393 +0,0 @@ -// Copyright (c) quickfixengine.org All rights reserved. -// -// This file may be distributed under the terms of the quickfixengine.org -// license as defined by quickfixengine.org and appearing in the file -// LICENSE included in the packaging of this file. -// -// This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING -// THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A -// PARTICULAR PURPOSE. -// -// See http://www.quickfixengine.org/LICENSE for licensing information. -// -// Contact ask@quickfixengine.org if any conditions of this licensing -// are not clear to you. - -package mongo - -import ( - "context" - "fmt" - "time" - - "github.com/pkg/errors" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - - "github.com/quickfixgo/quickfix" - "github.com/quickfixgo/quickfix/config" -) - -type mongoStoreFactory struct { - settings *quickfix.Settings - messagesCollection string - sessionsCollection string -} - -type mongoStore struct { - sessionID quickfix.SessionID - cache quickfix.MessageStore - mongoURL string - mongoDatabase string - db *mongo.Client - messagesCollection string - sessionsCollection string - allowTransactions bool -} - -// NewStoreFactory returns a mongo-based implementation of MessageStoreFactory. -func NewStoreFactory(settings *quickfix.Settings) quickfix.MessageStoreFactory { - return NewStoreFactoryPrefixed(settings, "") -} - -// NewStoreFactoryPrefixed returns a mongo-based implementation of MessageStoreFactory, with prefix on collections. -func NewStoreFactoryPrefixed(settings *quickfix.Settings, collectionsPrefix string) quickfix.MessageStoreFactory { - return mongoStoreFactory{ - settings: settings, - messagesCollection: collectionsPrefix + "messages", - sessionsCollection: collectionsPrefix + "sessions", - } -} - -// Create creates a new MongoStore implementation of the MessageStore interface. -func (f mongoStoreFactory) Create(sessionID quickfix.SessionID) (msgStore quickfix.MessageStore, err error) { - globalSettings := f.settings.GlobalSettings() - dynamicSessions, _ := globalSettings.BoolSetting(config.DynamicSessions) - - sessionSettings, ok := f.settings.SessionSettings()[sessionID] - if !ok { - if dynamicSessions { - sessionSettings = globalSettings - } else { - return nil, fmt.Errorf("unknown session: %v", sessionID) - } - } - mongoConnectionURL, err := sessionSettings.Setting(config.MongoStoreConnection) - if err != nil { - return nil, err - } - mongoDatabase, err := sessionSettings.Setting(config.MongoStoreDatabase) - if err != nil { - return nil, err - } - - // Optional. - mongoReplicaSet, _ := sessionSettings.Setting(config.MongoStoreReplicaSet) - - return newMongoStore(sessionID, mongoConnectionURL, mongoDatabase, mongoReplicaSet, f.messagesCollection, f.sessionsCollection) -} - -func newMongoStore(sessionID quickfix.SessionID, mongoURL, mongoDatabase, mongoReplicaSet, messagesCollection, sessionsCollection string) (store *mongoStore, err error) { - - memStore, memErr := quickfix.NewMemoryStoreFactory().Create(sessionID) - if memErr != nil { - err = errors.Wrap(memErr, "cache creation") - return - } - - allowTransactions := len(mongoReplicaSet) > 0 - store = &mongoStore{ - sessionID: sessionID, - cache: memStore, - mongoURL: mongoURL, - mongoDatabase: mongoDatabase, - messagesCollection: messagesCollection, - sessionsCollection: sessionsCollection, - allowTransactions: allowTransactions, - } - - if err = store.cache.Reset(); err != nil { - err = errors.Wrap(err, "cache reset") - return - } - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - store.db, err = mongo.Connect(ctx, options.Client().ApplyURI(mongoURL).SetDirect(len(mongoReplicaSet) == 0).SetReplicaSet(mongoReplicaSet)) - if err != nil { - return - } - err = store.populateCache() - - return -} - -func generateMessageFilter(s *quickfix.SessionID) (messageFilter *mongoQuickFixEntryData) { - messageFilter = &mongoQuickFixEntryData{ - BeginString: s.BeginString, - SessionQualifier: s.Qualifier, - SenderCompID: s.SenderCompID, - SenderSubID: s.SenderSubID, - SenderLocID: s.SenderLocationID, - TargetCompID: s.TargetCompID, - TargetSubID: s.TargetSubID, - TargetLocID: s.TargetLocationID, - } - return -} - -type mongoQuickFixEntryData struct { - // Message specific data. - Msgseq int `bson:"msgseq,omitempty"` - Message []byte `bson:"message,omitempty"` - // Session specific data. - CreationTime time.Time `bson:"creation_time,omitempty"` - IncomingSeqNum int `bson:"incoming_seq_num,omitempty"` - OutgoingSeqNum int `bson:"outgoing_seq_num,omitempty"` - // Indexed data. - BeginString string `bson:"begin_string"` - SessionQualifier string `bson:"session_qualifier"` - SenderCompID string `bson:"sender_comp_id"` - SenderSubID string `bson:"sender_sub_id"` - SenderLocID string `bson:"sender_loc_id"` - TargetCompID string `bson:"target_comp_id"` - TargetSubID string `bson:"target_sub_id"` - TargetLocID string `bson:"target_loc_id"` -} - -// Reset deletes the store records and sets the seqnums back to 1. -func (store *mongoStore) Reset() error { - msgFilter := generateMessageFilter(&store.sessionID) - _, err := store.db.Database(store.mongoDatabase).Collection(store.messagesCollection).DeleteMany(context.Background(), msgFilter) - - if err != nil { - return err - } - - if err = store.cache.Reset(); err != nil { - return err - } - - sessionUpdate := generateMessageFilter(&store.sessionID) - sessionUpdate.CreationTime = store.cache.CreationTime() - sessionUpdate.IncomingSeqNum = store.cache.NextTargetMsgSeqNum() - sessionUpdate.OutgoingSeqNum = store.cache.NextSenderMsgSeqNum() - _, err = store.db.Database(store.mongoDatabase).Collection(store.sessionsCollection).UpdateOne(context.Background(), msgFilter, bson.M{"$set": sessionUpdate}) - - return err -} - -// Refresh reloads the store from the database. -func (store *mongoStore) Refresh() error { - if err := store.cache.Reset(); err != nil { - return err - } - return store.populateCache() -} - -func (store *mongoStore) populateCache() error { - msgFilter := generateMessageFilter(&store.sessionID) - res := store.db.Database(store.mongoDatabase).Collection(store.sessionsCollection).FindOne(context.Background(), msgFilter) - if res.Err() != nil && res.Err() != mongo.ErrNoDocuments { - return errors.Wrap(res.Err(), "query") - } - - if res.Err() != mongo.ErrNoDocuments { - // session record found, load it - sessionData := &mongoQuickFixEntryData{} - if err := res.Decode(&sessionData); err != nil { - return errors.Wrap(err, "decode") - } - - store.cache.SetCreationTime(sessionData.CreationTime) - if err := store.cache.SetNextTargetMsgSeqNum(sessionData.IncomingSeqNum); err != nil { - return errors.Wrap(err, "cache set next target") - } - - if err := store.cache.SetNextSenderMsgSeqNum(sessionData.OutgoingSeqNum); err != nil { - return errors.Wrap(err, "cache set next sender") - } - - return nil - } - - // session record not found, create it - msgFilter.CreationTime = store.cache.CreationTime() - msgFilter.IncomingSeqNum = store.cache.NextTargetMsgSeqNum() - msgFilter.OutgoingSeqNum = store.cache.NextSenderMsgSeqNum() - - if _, err := store.db.Database(store.mongoDatabase).Collection(store.sessionsCollection).InsertOne(context.Background(), msgFilter); err != nil { - return errors.Wrap(err, "insert") - } - return nil -} - -// NextSenderMsgSeqNum returns the next MsgSeqNum that will be sent. -func (store *mongoStore) NextSenderMsgSeqNum() int { - return store.cache.NextSenderMsgSeqNum() -} - -// NextTargetMsgSeqNum returns the next MsgSeqNum that should be received. -func (store *mongoStore) NextTargetMsgSeqNum() int { - return store.cache.NextTargetMsgSeqNum() -} - -// SetNextSenderMsgSeqNum sets the next MsgSeqNum that will be sent. -func (store *mongoStore) SetNextSenderMsgSeqNum(next int) error { - msgFilter := generateMessageFilter(&store.sessionID) - sessionUpdate := generateMessageFilter(&store.sessionID) - sessionUpdate.IncomingSeqNum = store.cache.NextTargetMsgSeqNum() - sessionUpdate.OutgoingSeqNum = next - sessionUpdate.CreationTime = store.cache.CreationTime() - if _, err := store.db.Database(store.mongoDatabase).Collection(store.sessionsCollection).UpdateOne(context.Background(), msgFilter, bson.M{"$set": sessionUpdate}); err != nil { - return err - } - return store.cache.SetNextSenderMsgSeqNum(next) -} - -// SetNextTargetMsgSeqNum sets the next MsgSeqNum that should be received. -func (store *mongoStore) SetNextTargetMsgSeqNum(next int) error { - msgFilter := generateMessageFilter(&store.sessionID) - sessionUpdate := generateMessageFilter(&store.sessionID) - sessionUpdate.IncomingSeqNum = next - sessionUpdate.OutgoingSeqNum = store.cache.NextSenderMsgSeqNum() - sessionUpdate.CreationTime = store.cache.CreationTime() - if _, err := store.db.Database(store.mongoDatabase).Collection(store.sessionsCollection).UpdateOne(context.Background(), msgFilter, bson.M{"$set": sessionUpdate}); err != nil { - return err - } - return store.cache.SetNextTargetMsgSeqNum(next) -} - -// IncrNextSenderMsgSeqNum increments the next MsgSeqNum that will be sent. -func (store *mongoStore) IncrNextSenderMsgSeqNum() error { - if err := store.SetNextSenderMsgSeqNum(store.cache.NextSenderMsgSeqNum() + 1); err != nil { - return errors.Wrap(err, "save sequence number") - } - return nil -} - -// IncrNextTargetMsgSeqNum increments the next MsgSeqNum that should be received. -func (store *mongoStore) IncrNextTargetMsgSeqNum() error { - if err := store.SetNextTargetMsgSeqNum(store.cache.NextTargetMsgSeqNum() + 1); err != nil { - return errors.Wrap(err, "save sequence number") - } - return nil -} - -// CreationTime returns the creation time of the store. -func (store *mongoStore) CreationTime() time.Time { - return store.cache.CreationTime() -} - -// SetCreationTime is a no-op for MongoStore. -func (store *mongoStore) SetCreationTime(_ time.Time) { -} - -func (store *mongoStore) SaveMessage(seqNum int, msg []byte) (err error) { - msgFilter := generateMessageFilter(&store.sessionID) - msgFilter.Msgseq = seqNum - msgFilter.Message = msg - _, err = store.db.Database(store.mongoDatabase).Collection(store.messagesCollection).InsertOne(context.Background(), msgFilter) - return -} - -func (store *mongoStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error { - - if !store.allowTransactions { - err := store.SaveMessage(seqNum, msg) - if err != nil { - return err - } - return store.IncrNextSenderMsgSeqNum() - } - - // If the mongodb supports replicasets, perform this operation as a transaction instead- - var next int - err := store.db.UseSession(context.Background(), func(sessionCtx mongo.SessionContext) error { - if err := sessionCtx.StartTransaction(); err != nil { - return err - } - - msgFilter := generateMessageFilter(&store.sessionID) - msgFilter.Msgseq = seqNum - msgFilter.Message = msg - _, err := store.db.Database(store.mongoDatabase).Collection(store.messagesCollection).InsertOne(sessionCtx, msgFilter) - if err != nil { - return err - } - - next = store.cache.NextSenderMsgSeqNum() + 1 - - msgFilter = generateMessageFilter(&store.sessionID) - sessionUpdate := generateMessageFilter(&store.sessionID) - sessionUpdate.IncomingSeqNum = store.cache.NextTargetMsgSeqNum() - sessionUpdate.OutgoingSeqNum = next - sessionUpdate.CreationTime = store.cache.CreationTime() - _, err = store.db.Database(store.mongoDatabase).Collection(store.sessionsCollection).UpdateOne(sessionCtx, msgFilter, bson.M{"$set": sessionUpdate}) - if err != nil { - return err - } - - return sessionCtx.CommitTransaction(context.Background()) - }) - if err != nil { - return err - } - - return store.cache.SetNextSenderMsgSeqNum(next) -} - -func (store *mongoStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error { - msgFilter := generateMessageFilter(&store.sessionID) - // Marshal into database form. - msgFilterBytes, err := bson.Marshal(msgFilter) - if err != nil { - return err - } - seqFilter := bson.M{} - err = bson.Unmarshal(msgFilterBytes, &seqFilter) - if err != nil { - return err - } - // Modify the query to use a range for the sequence filter. - seqFilter["msgseq"] = bson.M{ - "$gte": beginSeqNum, - "$lte": endSeqNum, - } - sortOpt := options.Find().SetSort(bson.D{{Key: "msgseq", Value: 1}}) - cursor, err := store.db.Database(store.mongoDatabase).Collection(store.messagesCollection).Find(context.Background(), seqFilter, sortOpt) - if err != nil { - return err - } - defer func() { _ = cursor.Close(context.Background()) }() - for cursor.Next(context.Background()) { - if err = cursor.Decode(&msgFilter); err != nil { - return err - } else if err = cb(msgFilter.Message); err != nil { - return err - } - } - return nil -} - -func (store *mongoStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) { - var msgs [][]byte - err := store.IterateMessages(beginSeqNum, endSeqNum, func(msg []byte) error { - msgs = append(msgs, msg) - return nil - }) - return msgs, err -} - -// Close closes the store's database connection. -func (store *mongoStore) Close() error { - if store.db != nil { - err := store.db.Disconnect(context.Background()) - if err != nil { - return errors.Wrap(err, "error disconnecting from database") - } - store.db = nil - } - return nil -} diff --git a/store/mongo/mongo_store_test.go b/store/mongo/mongo_store_test.go deleted file mode 100644 index 62c487ba7..000000000 --- a/store/mongo/mongo_store_test.go +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright (c) quickfixengine.org All rights reserved. -// -// This file may be distributed under the terms of the quickfixengine.org -// license as defined by quickfixengine.org and appearing in the file -// LICENSE included in the packaging of this file. -// -// This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING -// THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A -// PARTICULAR PURPOSE. -// -// See http://www.quickfixengine.org/LICENSE for licensing information. -// -// Contact ask@quickfixengine.org if any conditions of this licensing -// are not clear to you. - -package mongo - -import ( - "fmt" - "log" - "os" - "strings" - "testing" - - "github.com/quickfixgo/quickfix" - "github.com/quickfixgo/quickfix/internal/testsuite" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" -) - -// MongoStoreTestSuite runs all tests in the message.StoreTestSuite against the MongoStore implementation. -type MongoStoreTestSuite struct { - testsuite.StoreTestSuite -} - -func (suite *MongoStoreTestSuite) SetupTest() { - mongoDbCxn := os.Getenv("MONGODB_TEST_CXN") - if len(mongoDbCxn) <= 0 { - log.Println("MONGODB_TEST_CXN environment arg is not provided, skipping...") - suite.T().SkipNow() - } - mongoDatabase := "automated_testing_database" - mongoReplicaSet := "replicaset" - - // create settings - sessionID := quickfix.SessionID{BeginString: "FIX.4.4", SenderCompID: "SENDER", TargetCompID: "TARGET"} - settings, err := quickfix.ParseSettings(strings.NewReader(fmt.Sprintf(` -[DEFAULT] -MongoStoreConnection=%s -MongoStoreDatabase=%s -MongoStoreReplicaSet=%s - -[SESSION] -BeginString=%s -SenderCompID=%s -TargetCompID=%s`, mongoDbCxn, mongoDatabase, mongoReplicaSet, sessionID.BeginString, sessionID.SenderCompID, sessionID.TargetCompID))) - require.Nil(suite.T(), err) - - // create store - suite.MsgStore, err = NewStoreFactory(settings).Create(sessionID) - require.Nil(suite.T(), err) - err = suite.MsgStore.Reset() - require.Nil(suite.T(), err) -} - -func (suite *MongoStoreTestSuite) TearDownTest() { - if suite.MsgStore != nil { - err := suite.MsgStore.Close() - require.Nil(suite.T(), err) - } -} - -func TestMongoStoreTestSuite(t *testing.T) { - suite.Run(t, new(MongoStoreTestSuite)) -} diff --git a/store/sql/sql_store.go b/store/sql/sql_store.go deleted file mode 100644 index aeaeb6eb3..000000000 --- a/store/sql/sql_store.go +++ /dev/null @@ -1,400 +0,0 @@ -// Copyright (c) quickfixengine.org All rights reserved. -// -// This file may be distributed under the terms of the quickfixengine.org -// license as defined by quickfixengine.org and appearing in the file -// LICENSE included in the packaging of this file. -// -// This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING -// THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A -// PARTICULAR PURPOSE. -// -// See http://www.quickfixengine.org/LICENSE for licensing information. -// -// Contact ask@quickfixengine.org if any conditions of this licensing -// are not clear to you. - -package sql - -import ( - "database/sql" - "fmt" - "regexp" - "time" - - "github.com/pkg/errors" - - "github.com/quickfixgo/quickfix" - "github.com/quickfixgo/quickfix/config" -) - -type sqlStoreFactory struct { - settings *quickfix.Settings -} - -type sqlStore struct { - sessionID quickfix.SessionID - cache quickfix.MessageStore - sqlDriver string - sqlDataSourceName string - sqlConnMaxLifetime time.Duration - db *sql.DB - placeholder placeholderFunc -} - -type placeholderFunc func(int) string - -var rePlaceholder = regexp.MustCompile(`\?`) - -func sqlString(raw string, placeholder placeholderFunc) string { - if placeholder == nil { - return raw - } - idx := 0 - return rePlaceholder.ReplaceAllStringFunc(raw, func(_ string) string { - p := placeholder(idx) - idx++ - return p - }) -} - -func postgresPlaceholder(i int) string { - return fmt.Sprintf("$%d", i+1) -} - -// NewStoreFactory returns a sql-based implementation of MessageStoreFactory. -func NewStoreFactory(settings *quickfix.Settings) quickfix.MessageStoreFactory { - return sqlStoreFactory{settings: settings} -} - -// Create creates a new SQLStore implementation of the MessageStore interface. -func (f sqlStoreFactory) Create(sessionID quickfix.SessionID) (msgStore quickfix.MessageStore, err error) { - globalSettings := f.settings.GlobalSettings() - dynamicSessions, _ := globalSettings.BoolSetting(config.DynamicSessions) - - sessionSettings, ok := f.settings.SessionSettings()[sessionID] - if !ok { - if dynamicSessions { - sessionSettings = globalSettings - } else { - return nil, fmt.Errorf("unknown session: %v", sessionID) - } - } - - sqlDriver, err := sessionSettings.Setting(config.SQLStoreDriver) - if err != nil { - return nil, err - } - sqlDataSourceName, err := sessionSettings.Setting(config.SQLStoreDataSourceName) - if err != nil { - return nil, err - } - sqlConnMaxLifetime := 0 * time.Second - if sessionSettings.HasSetting(config.SQLStoreConnMaxLifetime) { - sqlConnMaxLifetime, err = sessionSettings.DurationSetting(config.SQLStoreConnMaxLifetime) - if err != nil { - return nil, err - } - } - return newSQLStore(sessionID, sqlDriver, sqlDataSourceName, sqlConnMaxLifetime) -} - -func newSQLStore(sessionID quickfix.SessionID, driver string, dataSourceName string, connMaxLifetime time.Duration) (store *sqlStore, err error) { - - memStore, memErr := quickfix.NewMemoryStoreFactory().Create(sessionID) - if memErr != nil { - err = errors.Wrap(memErr, "cache creation") - return - } - - store = &sqlStore{ - sessionID: sessionID, - cache: memStore, - sqlDriver: driver, - sqlDataSourceName: dataSourceName, - sqlConnMaxLifetime: connMaxLifetime, - } - if err = store.cache.Reset(); err != nil { - err = errors.Wrap(err, "cache reset") - return - } - - if store.sqlDriver == "postgres" || store.sqlDriver == "pgx" { - store.placeholder = postgresPlaceholder - } - - if store.db, err = sql.Open(store.sqlDriver, store.sqlDataSourceName); err != nil { - return nil, err - } - store.db.SetConnMaxLifetime(store.sqlConnMaxLifetime) - - if err = store.db.Ping(); err != nil { // ensure immediate connection - return nil, err - } - if err = store.populateCache(); err != nil { - return nil, err - } - - return store, nil -} - -// Reset deletes the store records and sets the seqnums back to 1. -func (store *sqlStore) Reset() error { - s := store.sessionID - _, err := store.db.Exec(sqlString(`DELETE FROM messages - WHERE beginstring=? AND session_qualifier=? - AND sendercompid=? AND sendersubid=? AND senderlocid=? - AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder), - s.BeginString, s.Qualifier, - s.SenderCompID, s.SenderSubID, s.SenderLocationID, - s.TargetCompID, s.TargetSubID, s.TargetLocationID) - if err != nil { - return err - } - - if err = store.cache.Reset(); err != nil { - return err - } - - _, err = store.db.Exec(sqlString(`UPDATE sessions - SET creation_time=?, incoming_seqnum=?, outgoing_seqnum=? - WHERE beginstring=? AND session_qualifier=? - AND sendercompid=? AND sendersubid=? AND senderlocid=? - AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder), - store.cache.CreationTime(), store.cache.NextTargetMsgSeqNum(), store.cache.NextSenderMsgSeqNum(), - s.BeginString, s.Qualifier, - s.SenderCompID, s.SenderSubID, s.SenderLocationID, - s.TargetCompID, s.TargetSubID, s.TargetLocationID) - - return err -} - -// Refresh reloads the store from the database. -func (store *sqlStore) Refresh() error { - if err := store.cache.Reset(); err != nil { - return err - } - return store.populateCache() -} - -func (store *sqlStore) populateCache() error { - s := store.sessionID - var creationTime time.Time - var incomingSeqNum, outgoingSeqNum int - row := store.db.QueryRow(sqlString(`SELECT creation_time, incoming_seqnum, outgoing_seqnum - FROM sessions - WHERE beginstring=? AND session_qualifier=? - AND sendercompid=? AND sendersubid=? AND senderlocid=? - AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder), - s.BeginString, s.Qualifier, - s.SenderCompID, s.SenderSubID, s.SenderLocationID, - s.TargetCompID, s.TargetSubID, s.TargetLocationID) - - err := row.Scan(&creationTime, &incomingSeqNum, &outgoingSeqNum) - - // session record found, load it - if err == nil { - store.cache.SetCreationTime(creationTime) - if err = store.cache.SetNextTargetMsgSeqNum(incomingSeqNum); err != nil { - return errors.Wrap(err, "cache set next target") - } - if err = store.cache.SetNextSenderMsgSeqNum(outgoingSeqNum); err != nil { - return errors.Wrap(err, "cache set next sender") - } - return nil - } - - // fatal error, give up - if err != sql.ErrNoRows { - return err - } - - // session record not found, create it - _, err = store.db.Exec(sqlString(`INSERT INTO sessions ( - creation_time, incoming_seqnum, outgoing_seqnum, - beginstring, session_qualifier, - sendercompid, sendersubid, senderlocid, - targetcompid, targetsubid, targetlocid) - VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, store.placeholder), - store.cache.CreationTime(), - store.cache.NextTargetMsgSeqNum(), - store.cache.NextSenderMsgSeqNum(), - s.BeginString, s.Qualifier, - s.SenderCompID, s.SenderSubID, s.SenderLocationID, - s.TargetCompID, s.TargetSubID, s.TargetLocationID) - - return err -} - -// NextSenderMsgSeqNum returns the next MsgSeqNum that will be sent. -func (store *sqlStore) NextSenderMsgSeqNum() int { - return store.cache.NextSenderMsgSeqNum() -} - -// NextTargetMsgSeqNum returns the next MsgSeqNum that should be received. -func (store *sqlStore) NextTargetMsgSeqNum() int { - return store.cache.NextTargetMsgSeqNum() -} - -// SetNextSenderMsgSeqNum sets the next MsgSeqNum that will be sent. -func (store *sqlStore) SetNextSenderMsgSeqNum(next int) error { - s := store.sessionID - _, err := store.db.Exec(sqlString(`UPDATE sessions SET outgoing_seqnum = ? - WHERE beginstring=? AND session_qualifier=? - AND sendercompid=? AND sendersubid=? AND senderlocid=? - AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder), - next, s.BeginString, s.Qualifier, - s.SenderCompID, s.SenderSubID, s.SenderLocationID, - s.TargetCompID, s.TargetSubID, s.TargetLocationID) - if err != nil { - return err - } - return store.cache.SetNextSenderMsgSeqNum(next) -} - -// SetNextTargetMsgSeqNum sets the next MsgSeqNum that should be received. -func (store *sqlStore) SetNextTargetMsgSeqNum(next int) error { - s := store.sessionID - _, err := store.db.Exec(sqlString(`UPDATE sessions SET incoming_seqnum = ? - WHERE beginstring=? AND session_qualifier=? - AND sendercompid=? AND sendersubid=? AND senderlocid=? - AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder), - next, s.BeginString, s.Qualifier, - s.SenderCompID, s.SenderSubID, s.SenderLocationID, - s.TargetCompID, s.TargetSubID, s.TargetLocationID) - if err != nil { - return err - } - return store.cache.SetNextTargetMsgSeqNum(next) -} - -// IncrNextSenderMsgSeqNum increments the next MsgSeqNum that will be sent. -func (store *sqlStore) IncrNextSenderMsgSeqNum() error { - if err := store.SetNextSenderMsgSeqNum(store.cache.NextSenderMsgSeqNum() + 1); err != nil { - return errors.Wrap(err, "store next") - } - return nil -} - -// IncrNextTargetMsgSeqNum increments the next MsgSeqNum that should be received. -func (store *sqlStore) IncrNextTargetMsgSeqNum() error { - if err := store.SetNextTargetMsgSeqNum(store.cache.NextTargetMsgSeqNum() + 1); err != nil { - return errors.Wrap(err, "store next") - } - return nil -} - -// CreationTime returns the creation time of the store. -func (store *sqlStore) CreationTime() time.Time { - return store.cache.CreationTime() -} - -// SetCreationTime is a no-op for SQLStore. -func (store *sqlStore) SetCreationTime(_ time.Time) { -} - -func (store *sqlStore) SaveMessage(seqNum int, msg []byte) error { - s := store.sessionID - - _, err := store.db.Exec(sqlString(`INSERT INTO messages ( - msgseqnum, message, - beginstring, session_qualifier, - sendercompid, sendersubid, senderlocid, - targetcompid, targetsubid, targetlocid) - VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, store.placeholder), - seqNum, string(msg), - s.BeginString, s.Qualifier, - s.SenderCompID, s.SenderSubID, s.SenderLocationID, - s.TargetCompID, s.TargetSubID, s.TargetLocationID) - - return err -} - -func (store *sqlStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []byte) error { - s := store.sessionID - - tx, err := store.db.Begin() - if err != nil { - return err - } - defer tx.Rollback() - - _, err = tx.Exec(sqlString(`INSERT INTO messages ( - msgseqnum, message, - beginstring, session_qualifier, - sendercompid, sendersubid, senderlocid, - targetcompid, targetsubid, targetlocid) - VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, store.placeholder), - seqNum, string(msg), - s.BeginString, s.Qualifier, - s.SenderCompID, s.SenderSubID, s.SenderLocationID, - s.TargetCompID, s.TargetSubID, s.TargetLocationID) - if err != nil { - return err - } - - next := store.cache.NextSenderMsgSeqNum() + 1 - _, err = tx.Exec(sqlString(`UPDATE sessions SET outgoing_seqnum = ? - WHERE beginstring=? AND session_qualifier=? - AND sendercompid=? AND sendersubid=? AND senderlocid=? - AND targetcompid=? AND targetsubid=? AND targetlocid=?`, store.placeholder), - next, s.BeginString, s.Qualifier, - s.SenderCompID, s.SenderSubID, s.SenderLocationID, - s.TargetCompID, s.TargetSubID, s.TargetLocationID) - if err != nil { - return err - } - - err = tx.Commit() - if err != nil { - return err - } - - return store.cache.SetNextSenderMsgSeqNum(next) -} - -func (store *sqlStore) IterateMessages(beginSeqNum, endSeqNum int, cb func([]byte) error) error { - s := store.sessionID - rows, err := store.db.Query(sqlString(`SELECT message FROM messages - WHERE beginstring=? AND session_qualifier=? - AND sendercompid=? AND sendersubid=? AND senderlocid=? - AND targetcompid=? AND targetsubid=? AND targetlocid=? - AND msgseqnum>=? AND msgseqnum<=? - ORDER BY msgseqnum`, store.placeholder), - s.BeginString, s.Qualifier, - s.SenderCompID, s.SenderSubID, s.SenderLocationID, - s.TargetCompID, s.TargetSubID, s.TargetLocationID, - beginSeqNum, endSeqNum) - if err != nil { - return err - } - defer func() { _ = rows.Close() }() - - for rows.Next() { - var message string - if err = rows.Scan(&message); err != nil { - return err - } else if err = cb([]byte(message)); err != nil { - return err - } - } - - return rows.Err() -} - -func (store *sqlStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error) { - var msgs [][]byte - err := store.IterateMessages(beginSeqNum, endSeqNum, func(msg []byte) error { - msgs = append(msgs, msg) - return nil - }) - return msgs, err -} - -// Close closes the store's database connection. -func (store *sqlStore) Close() error { - if store.db != nil { - store.db.Close() - store.db = nil - } - return nil -} diff --git a/store/sql/sql_store_test.go b/store/sql/sql_store_test.go deleted file mode 100644 index 73c1d747a..000000000 --- a/store/sql/sql_store_test.go +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright (c) quickfixengine.org All rights reserved. -// -// This file may be distributed under the terms of the quickfixengine.org -// license as defined by quickfixengine.org and appearing in the file -// LICENSE included in the packaging of this file. -// -// This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING -// THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A -// PARTICULAR PURPOSE. -// -// See http://www.quickfixengine.org/LICENSE for licensing information. -// -// Contact ask@quickfixengine.org if any conditions of this licensing -// are not clear to you. - -package sql - -import ( - "database/sql" - "fmt" - "os" - "path" - "path/filepath" - "strings" - "testing" - "time" - - _ "github.com/mattn/go-sqlite3" - "github.com/quickfixgo/quickfix" - "github.com/quickfixgo/quickfix/internal/testsuite" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" -) - -// SqlStoreTestSuite runs all tests in the MessageStoreTestSuite against the SqlStore implementation. -type SQLStoreTestSuite struct { - testsuite.StoreTestSuite - sqlStoreRootPath string -} - -func (suite *SQLStoreTestSuite) SetupTest() { - suite.sqlStoreRootPath = path.Join(os.TempDir(), fmt.Sprintf("SqlStoreTestSuite-%d", os.Getpid())) - err := os.MkdirAll(suite.sqlStoreRootPath, os.ModePerm) - require.Nil(suite.T(), err) - sqlDriver := "sqlite3" - sqlDsn := path.Join(suite.sqlStoreRootPath, fmt.Sprintf("%d.db", time.Now().UnixNano())) - - // create tables - db, err := sql.Open(sqlDriver, sqlDsn) - require.Nil(suite.T(), err) - ddlFnames, err := filepath.Glob(fmt.Sprintf("../../_sql/%s/*.sql", sqlDriver)) - require.Nil(suite.T(), err) - for _, fname := range ddlFnames { - sqlBytes, err := os.ReadFile(fname) - require.Nil(suite.T(), err) - _, err = db.Exec(string(sqlBytes)) - require.Nil(suite.T(), err) - } - - // create settings - sessionID := quickfix.SessionID{BeginString: "FIX.4.4", SenderCompID: "SENDER", TargetCompID: "TARGET"} - settings, err := quickfix.ParseSettings(strings.NewReader(fmt.Sprintf(` -[DEFAULT] -SQLStoreDriver=%s -SQLStoreDataSourceName=%s -SQLStoreConnMaxLifetime=14400s - -[SESSION] -BeginString=%s -SenderCompID=%s -TargetCompID=%s`, sqlDriver, sqlDsn, sessionID.BeginString, sessionID.SenderCompID, sessionID.TargetCompID))) - require.Nil(suite.T(), err) - - // create store - suite.MsgStore, err = NewStoreFactory(settings).Create(sessionID) - require.Nil(suite.T(), err) -} - -func (suite *SQLStoreTestSuite) TestSqlPlaceholderReplacement() { - got := sqlString("A ? B ? C ?", postgresPlaceholder) - suite.Equal("A $1 B $2 C $3", got) -} - -func (suite *SQLStoreTestSuite) TearDownTest() { - suite.MsgStore.Close() - os.RemoveAll(suite.sqlStoreRootPath) -} - -func TestSqlStoreTestSuite(t *testing.T) { - suite.Run(t, new(SQLStoreTestSuite)) -}