11import BigNumber from 'bignumber.js' ;
2- import { merge , Observable , of } from 'rxjs' ;
3- import { filter , map , mergeMap , repeat , take } from 'rxjs/operators' ;
2+ import { curry } from 'ramda' ;
3+ import { empty , merge , Observable , of } from 'rxjs' ;
4+ import { map , mergeMap , repeat , take } from 'rxjs/operators' ;
45import { Config } from '../config' ;
56import { OrderSide } from '../constants' ;
67import { Logger } from '../logger' ;
@@ -27,9 +28,7 @@ type GetOrderBuilderParams = {
2728 accumulateOrderFillsForQuoteAssetReceived : (
2829 config : Config
2930 ) => ( source : Observable < SwapSuccess > ) => Observable < BigNumber > ;
30- quantityAboveMinimum : (
31- asset : string
32- ) => ( filledQuantity : BigNumber ) => boolean ;
31+ minimumQuantity$ : Observable < BigNumber > ;
3332 store : ArbyStore ;
3433} ;
3534
@@ -38,13 +37,40 @@ type CEXorder = {
3837 side : OrderSide ;
3938} ;
4039
40+ const quantityAboveMinimum = curry (
41+ (
42+ store : ArbyStore ,
43+ logger : Logger ,
44+ assetToTradeOnCEX : string ,
45+ minimumQuantity$ : Observable < BigNumber > ,
46+ quantity : BigNumber
47+ ) => {
48+ logger . info (
49+ `Swap success. Accumulated ${ assetToTradeOnCEX } quantity: ${ quantity . toFixed ( ) } `
50+ ) ;
51+ store . resetLastOrderUpdatePrice ( ) ;
52+ return minimumQuantity$ . pipe (
53+ take ( 1 ) ,
54+ mergeMap ( minimumQuantity => {
55+ if ( quantity . isGreaterThanOrEqualTo ( minimumQuantity ) ) {
56+ return of ( quantity ) ;
57+ }
58+ logger . info (
59+ `Will not execute CEX order because ${ quantity . toFixed ( ) } is below the minimum allowed CEX quantity ${ minimumQuantity . toFixed ( ) } `
60+ ) ;
61+ return empty ( ) ;
62+ } )
63+ ) ;
64+ }
65+ ) ;
66+
4167const getOrderBuilder$ = ( {
4268 config,
4369 logger,
4470 getOpenDEXswapSuccess$,
4571 accumulateOrderFillsForBaseAssetReceived,
4672 accumulateOrderFillsForQuoteAssetReceived,
47- quantityAboveMinimum ,
73+ minimumQuantity$ ,
4874 store,
4975} : GetOrderBuilderParams ) : Observable < CEXorder > => {
5076 const {
@@ -59,19 +85,17 @@ const getOrderBuilder$ = ({
5985 config . CEX_QUOTEASSET === 'BTC'
6086 ? config . CEX_BASEASSET
6187 : config . CEX_QUOTEASSET ;
88+ const filterMinimum = quantityAboveMinimum (
89+ store ,
90+ logger ,
91+ assetToTradeOnCEX ,
92+ minimumQuantity$
93+ ) ;
6294 const receivedQuoteAssetOrder$ = receivedQuoteAssetSwapSuccess$ . pipe (
6395 // accumulate OpenDEX order fills when receiving
6496 // quote asset
6597 accumulateOrderFillsForQuoteAssetReceived ( config ) ,
66- mergeMap ( ( quantity : BigNumber ) => {
67- logger . info (
68- `Swap success. Accumulated ${ assetToTradeOnCEX } quantity: ${ quantity . toFixed ( ) } `
69- ) ;
70- store . resetLastOrderUpdatePrice ( ) ;
71- return of ( quantity ) ;
72- } ) ,
73- // filter based on minimum CEX order quantity
74- filter ( quantityAboveMinimum ( assetToTradeOnCEX ) ) ,
98+ mergeMap ( filterMinimum ) ,
7599 map ( quantity => {
76100 return { quantity, side : OrderSide . BUY } ;
77101 } ) ,
@@ -84,15 +108,7 @@ const getOrderBuilder$ = ({
84108 // accumulate OpenDEX order fills when receiving
85109 // quote asset
86110 accumulateOrderFillsForBaseAssetReceived ( config ) ,
87- mergeMap ( ( quantity : BigNumber ) => {
88- logger . info (
89- `Swap success. Accumulated ${ assetToTradeOnCEX } quantity: ${ quantity . toFixed ( ) } `
90- ) ;
91- store . resetLastOrderUpdatePrice ( ) ;
92- return of ( quantity ) ;
93- } ) ,
94- // filter based on minimum CEX order quantity
95- filter ( quantityAboveMinimum ( assetToTradeOnCEX ) ) ,
111+ mergeMap ( filterMinimum ) ,
96112 map ( quantity => {
97113 return { quantity, side : OrderSide . SELL } ;
98114 } ) ,
0 commit comments