@@ -107,23 +107,38 @@ async def _cancel_order(
107
107
108
108
logging .info (f"Order { order_id } for symbol { symbol .name } canceled." )
109
109
110
+ async def _fetch_orders (self , datasource ):
111
+ service = self ._datasource_factory .create (datasource , ProtocolType .REST )
112
+
113
+ return await asyncio .to_thread (service .fetch_all_open_orders )
114
+
110
115
async def _fetch_open_orders (self ):
111
116
services = [
112
117
DataSourceType .BYBIT ,
113
118
]
114
119
monitor_interval = self .order_config .get ("monitor_interval" , 10 )
115
120
116
- for datasource in services :
117
- service = self ._datasource_factory .create (datasource , ProtocolType .REST )
118
- orders = await asyncio .to_thread (service .fetch_all_open_orders )
121
+ open_orders = []
122
+
123
+ tasks = [
124
+ asyncio .create_task (self ._fetch_orders (datasource ))
125
+ for datasource in services
126
+ ]
127
+
128
+ results = await asyncio .gather (* tasks , return_exceptions = True )
129
+
130
+ for datasource , result in zip (services , results ):
131
+ open_orders .extend (
132
+ [(order_id , symbol , datasource ) for order_id , symbol in result ]
133
+ )
119
134
120
- for order_id , symbol in orders :
121
- pq_order = PQOrder (order_id , symbol , datasource )
135
+ for order_id , symbol , datasource in open_orders :
136
+ pq_order = PQOrder (order_id , symbol , datasource )
122
137
123
- if not any (
124
- existing_order .order_id == pq_order .order_id
125
- for existing_order in self ._order_queue ._queue
126
- ):
127
- await self ._order_queue .put (pq_order )
138
+ if not any (
139
+ existing_order .order_id == pq_order .order_id
140
+ for existing_order in self ._order_queue ._queue
141
+ ):
142
+ await self ._order_queue .put (pq_order )
128
143
129
- await asyncio .sleep (monitor_interval )
144
+ await asyncio .sleep (monitor_interval )
0 commit comments