@@ -21,6 +21,7 @@ import { getNewTrade$, GetTradeParams } from './trade/trade';
2121import { getStartShutdown$ } from './utils' ;
2222import { Dictionary , Market } from 'ccxt' ;
2323import { verifyMarkets } from './centralized/verify-markets' ;
24+ import { initDB$ , InitDBparams , InitDBResponse } from './db/db' ;
2425
2526type StartArbyParams = {
2627 config$ : Observable < Config > ;
@@ -43,6 +44,10 @@ type StartArbyParams = {
4344 loadMarkets$,
4445 } : InitCEXparams ) => Observable < InitCEXResponse > ;
4546 verifyMarkets : ( config : Config , CEXmarkets : Dictionary < Market > ) => boolean ;
47+ initDB$ : ( {
48+ dataDir : string ,
49+ logger : Logger ,
50+ } : InitDBparams ) => Observable < InitDBResponse > ;
4651} ;
4752
4853const logConfig = ( config : Config , logger : Logger ) => {
@@ -86,54 +91,63 @@ export const startArby = ({
8691 trade$,
8792 cleanup$,
8893 initCEX$,
94+ initDB$,
8995 verifyMarkets,
9096} : StartArbyParams ) : Observable < any > => {
9197 const store = getArbyStore ( ) ;
9298 return config$ . pipe (
9399 mergeMap ( config => {
94- const CEX$ = initCEX$ ( {
95- config ,
96- loadMarkets$ ,
97- getExchange ,
100+ const loggers = getLoggers ( config ) ;
101+ const db$ = initDB$ ( {
102+ logger : loggers . db ,
103+ dataDir : config . DATA_DIR ,
98104 } ) ;
99- return CEX$ . pipe (
100- mergeMap ( ( { markets : CEXmarkets , exchange : CEX } ) => {
101- const loggers = getLoggers ( config ) ;
102- loggers . global . info ( 'Starting. Hello, Arby.' ) ;
103- logConfig ( config , loggers . global ) ;
104- verifyMarkets ( config , CEXmarkets ) ;
105- const tradeComplete$ = trade$ ( {
105+ return db$ . pipe (
106+ mergeMap ( ( ) => {
107+ const CEX$ = initCEX$ ( {
106108 config,
107- loggers,
108- getOpenDEXcomplete$,
109- shutdown$,
110- getCentralizedExchangeOrder$,
111- catchOpenDEXerror,
112- getCentralizedExchangePrice$,
113- CEX ,
114- store,
115- } ) . pipe ( takeUntil ( shutdown$ ) ) ;
116- return concat (
117- tradeComplete$ ,
118- cleanup$ ( {
119- config,
120- loggers,
121- removeOpenDEXorders$,
122- removeCEXorders$,
123- CEX ,
124- } )
125- ) . pipe (
126- catchError ( e => {
127- loggers . global . info (
128- `Unrecoverable error: ${ JSON . stringify ( e ) } - cleaning up.`
129- ) ;
130- return cleanup$ ( {
109+ loadMarkets$,
110+ getExchange,
111+ } ) ;
112+ return CEX$ . pipe (
113+ mergeMap ( ( { markets : CEXmarkets , exchange : CEX } ) => {
114+ loggers . global . info ( 'Starting. Hello, Arby.' ) ;
115+ logConfig ( config , loggers . global ) ;
116+ verifyMarkets ( config , CEXmarkets ) ;
117+ const tradeComplete$ = trade$ ( {
131118 config,
132119 loggers,
133- removeOpenDEXorders$,
134- removeCEXorders$,
120+ getOpenDEXcomplete$,
121+ shutdown$,
122+ getCentralizedExchangeOrder$,
123+ catchOpenDEXerror,
124+ getCentralizedExchangePrice$,
135125 CEX ,
136- } ) ;
126+ store,
127+ } ) . pipe ( takeUntil ( shutdown$ ) ) ;
128+ return concat (
129+ tradeComplete$ ,
130+ cleanup$ ( {
131+ config,
132+ loggers,
133+ removeOpenDEXorders$,
134+ removeCEXorders$,
135+ CEX ,
136+ } )
137+ ) . pipe (
138+ catchError ( e => {
139+ loggers . global . info (
140+ `Unrecoverable error: ${ JSON . stringify ( e ) } - cleaning up.`
141+ ) ;
142+ return cleanup$ ( {
143+ config,
144+ loggers,
145+ removeOpenDEXorders$,
146+ removeCEXorders$,
147+ CEX ,
148+ } ) ;
149+ } )
150+ ) ;
137151 } )
138152 ) ;
139153 } )
@@ -155,6 +169,7 @@ if (!module.parent) {
155169 cleanup$ : getCleanup$ ,
156170 initCEX$,
157171 verifyMarkets,
172+ initDB$,
158173 } ) . subscribe ( {
159174 error : error => {
160175 if ( error . message ) {
0 commit comments