Skip to content
This repository was archived by the owner on Nov 25, 2024. It is now read-only.

Commit e2a454d

Browse files
authored
Orca (#55)
* Orca
1 parent 86b0a8a commit e2a454d

File tree

17 files changed

+366
-332
lines changed

17 files changed

+366
-332
lines changed

core/actors/collector/_data.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,16 @@ async def start(self, msg: Event):
3232

3333
async def stop(self):
3434
self._stop_event.set()
35-
self._queue.put_nowait(STOP)
35+
await self._queue.put(STOP)
3636

3737
for task in self._tasks:
3838
task.cancel()
3939

40+
tasks_to_cancel = [task for task in self._tasks if not task.done()]
41+
4042
try:
4143
await asyncio.wait_for(
42-
asyncio.gather(*self._tasks, return_exceptions=True), timeout=5
44+
asyncio.gather(*tasks_to_cancel, return_exceptions=True), timeout=5
4345
)
4446
except asyncio.TimeoutError:
4547
logger.warning("Timeout while waiting for tasks to finish.")
@@ -74,6 +76,7 @@ async def _run_consumer(self, consumer):
7476
while not self._stop_event.is_set():
7577
data = await self._queue.get()
7678
if data is STOP:
79+
self._queue.task_done()
7780
break
7881
try:
7982
await consumer(data)

core/actors/state/_signal.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from typing import Any, Callable, List
23

34

@@ -9,5 +10,17 @@ def connect(self, subscriber: Callable[..., Any]) -> None:
910
self._subscribers.append(subscriber)
1011

1112
def emit(self, *args, **kwargs) -> None:
13+
tasks = []
14+
1215
for subscriber in self._subscribers:
13-
subscriber(*args, **kwargs)
16+
if asyncio.iscoroutinefunction(subscriber):
17+
tasks.append(subscriber(*args, **kwargs))
18+
else:
19+
subscriber(*args, **kwargs)
20+
21+
if tasks:
22+
asyncio.create_task(self._run_async_subscribers(tasks))
23+
24+
@staticmethod
25+
async def _run_async_subscribers(tasks: List[asyncio.Future]) -> None:
26+
await asyncio.gather(*tasks)

core/event_decorators.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@ async def run(self, task, *args, **kwargs):
4242
return await self._dispatcher.run(task, *args, **kwargs)
4343

4444
async def wait(self):
45-
await asyncio.sleep(1)
46-
await self._dispatcher.wait()
45+
try:
46+
await asyncio.wait_for(self._dispatcher.wait(), timeout=10)
47+
except asyncio.TimeoutError:
48+
pass
4749

4850
def _unregister(self):
4951
for event_type, handler in self._registered_handlers:

core/models/entity/ohlcv.py

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -67,44 +67,6 @@ def from_dict(cls, data: Dict) -> "OHLCV":
6767

6868
return cls.from_list([data[key] for key in ohlcv_keys])
6969

70-
@property
71-
def real_body(self) -> float:
72-
return abs(self.open - self.close)
73-
74-
@property
75-
def upper_shadow(self) -> float:
76-
return self.high - max(self.open, self.close)
77-
78-
@property
79-
def lower_shadow(self) -> float:
80-
return min(self.open, self.close) - self.low
81-
82-
@property
83-
def price_range(self) -> float:
84-
return self.high - self.low
85-
86-
@property
87-
def price_movement(self) -> float:
88-
return self.close - self.open
89-
90-
@property
91-
def body_range_ratio(self) -> float:
92-
return self.real_body / self.price_range if self.price_range != 0 else 0
93-
94-
@property
95-
def body_shadow_ratio(self) -> float:
96-
total_shadow = self.upper_shadow + self.lower_shadow
97-
return self.real_body / total_shadow if total_shadow != 0 else 0
98-
99-
@property
100-
def shadow_range_ratio(self) -> float:
101-
total_shadow = self.upper_shadow + self.lower_shadow
102-
return total_shadow / self.price_range if self.price_range != 0 else 0
103-
104-
@property
105-
def real_body_normalized(self) -> float:
106-
return self.real_body / self.price_range if self.price_range != 0 else 0
107-
10870
@property
10971
def type(self) -> CandleType:
11072
if self.price_movement > 0:

core/tasks/_base.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ def set_task(self, task: asyncio.Task):
1717

1818
async def wait_for_finishing(self) -> asyncio.Task:
1919
await self._task_event.wait()
20-
result = await self._task
20+
21+
if not self._task.done():
22+
result = await self._task
23+
else:
24+
result = None
2125

2226
if not self._task.done():
2327
self._task.cancel()

factor/generator/_population.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ def __next__(self) -> Individual:
247247
symbol, timeframe, strategy = next(self.iter)
248248
return Individual(symbol, timeframe, strategy)
249249
except StopIteration:
250-
raise StopIteration
250+
raise StopIteration from None
251251

252252
def _init(self) -> Iterator:
253253
sampled_symbols = self._generate_symbols()

feed/_historical.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def __init__(
3030
super().__init__(symbol, timeframe, datasource)
3131
self.datasource_factory = datasource_factory
3232
self.config = config_service.get("feed")
33+
self.bp = asyncio.Semaphore(10)
3334

3435
async def on_receive(self, msg: StartHistoricalFeed):
3536
await self.collector.start(msg)
@@ -64,18 +65,18 @@ async def _handle_market(self, batch: List[Bar]) -> None:
6465
await self.tell(
6566
NewMarketDataReceived(self.symbol, self.timeframe, self.datasource, bar)
6667
)
67-
await asyncio.sleep(0.0001)
6868

6969
async def _outbox(self, batch: List[Bar]) -> None:
70-
tasks = [
71-
self.ask(
72-
IngestMarketData(self.symbol, self.timeframe, self.datasource, bar)
73-
)
74-
for bar in batch
75-
if bar.closed
76-
]
70+
async with self.bp:
71+
tasks = [
72+
self.ask(
73+
IngestMarketData(self.symbol, self.timeframe, self.datasource, bar)
74+
)
75+
for bar in batch
76+
if bar.closed
77+
]
7778

78-
await asyncio.gather(*tasks)
79+
await asyncio.gather(*tasks)
7980

8081
@staticmethod
8182
async def batched(stream: AsyncIterator[Bar], batch_size: int):

feed/_realtime.py

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -47,58 +47,53 @@ async def on_receive(self, msg: StartRealtimeFeed):
4747

4848
@Producer
4949
async def _kline_producer(self, msg: StartRealtimeFeed):
50-
async with AsyncRealTimeData(
51-
self.datasource_factory.create(
52-
msg.datasource, ProtocolType.WS, WSType.PUBLIC
53-
),
50+
async for bars in self._stream_producer(
5451
KlineStreamStrategy(self.timeframe, self.symbol),
55-
) as stream:
56-
async for bars in stream:
57-
yield bars
52+
ProtocolType.WS,
53+
WSType.PUBLIC,
54+
msg,
55+
):
56+
yield bars
5857

5958
@Producer
6059
async def _ob_producer(self, msg: StartRealtimeFeed):
61-
async with AsyncRealTimeData(
62-
self.datasource_factory.create(
63-
msg.datasource, ProtocolType.WS, WSType.PUBLIC
64-
),
60+
async for orders in self._stream_producer(
6561
OrderBookStreamStrategy(self.symbol, self.config.get("dom", 15)),
66-
) as stream:
67-
async for orders in stream:
68-
yield orders
62+
ProtocolType.WS,
63+
WSType.PUBLIC,
64+
msg,
65+
):
66+
yield orders
6967

7068
@Producer
7169
async def _liquidation_producer(self, msg: StartRealtimeFeed):
72-
async with AsyncRealTimeData(
73-
self.datasource_factory.create(
74-
msg.datasource, ProtocolType.WS, WSType.PUBLIC
75-
),
70+
async for liquidations in self._stream_producer(
7671
LiquidationStreamStrategy(self.symbol),
77-
) as stream:
78-
async for liquidations in stream:
79-
yield liquidations
72+
ProtocolType.WS,
73+
WSType.PUBLIC,
74+
msg,
75+
):
76+
yield liquidations
8077

8178
@Producer
8279
async def _order_producer(self, msg: StartRealtimeFeed):
83-
async with AsyncRealTimeData(
84-
self.datasource_factory.create(
85-
msg.datasource, ProtocolType.WS, WSType.PRIVATE
86-
),
80+
async for order in self._stream_producer(
8781
OrderStreamStrategy(self.symbol),
88-
) as stream:
89-
async for order in stream:
90-
yield order
82+
ProtocolType.WS,
83+
WSType.PRIVATE,
84+
msg,
85+
):
86+
yield order
9187

9288
@Producer
9389
async def _position_producer(self, msg: StartRealtimeFeed):
94-
async with AsyncRealTimeData(
95-
self.datasource_factory.create(
96-
msg.datasource, ProtocolType.WS, WSType.PRIVATE
97-
),
90+
async for position in self._stream_producer(
9891
PositionStreamStrategy(self.symbol),
99-
) as stream:
100-
async for position in stream:
101-
yield position
92+
ProtocolType.WS,
93+
WSType.PRIVATE,
94+
msg,
95+
):
96+
yield position
10297

10398
@Consumer
10499
async def _consumer(self, data: List[Bar]):
@@ -129,3 +124,11 @@ async def _process_orders(self, orders: List[Order]):
129124
)
130125
)
131126
logger.info(f"{self.symbol}_{self.timeframe}:{order}")
127+
128+
async def _stream_producer(self, strategy, protocol_type, ws_type, msg):
129+
async with AsyncRealTimeData(
130+
self.datasource_factory.create(msg.datasource, protocol_type, ws_type),
131+
strategy,
132+
) as stream:
133+
async for data in stream:
134+
yield data

infrastructure/event_dispatcher/event_worker.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import asyncio
2+
from collections import deque
23
from typing import Any, AsyncIterable, Dict, Tuple
34

5+
import numpy as np
6+
47
from core.events._base import Event
58

69
from .event_handler import EventHandler
@@ -11,19 +14,28 @@ def __init__(
1114
self,
1215
event_handler: EventHandler,
1316
cancel_event: asyncio.Event,
17+
task_duration_limit: int = 100,
1418
):
1519
self._event_handler = event_handler
1620
self._cancel_event = cancel_event
1721
self._queue = asyncio.Queue()
22+
self._task_durations = deque(maxlen=task_duration_limit)
1823

1924
@property
20-
def queue_size(self):
21-
return self._queue.qsize()
25+
def score(self):
26+
return self._queue.qsize(), (
27+
np.mean(self._task_durations) if len(self._task_durations) > 2 else 0.0
28+
)
2229

2330
async def run(self):
2431
async for event, args, kwargs in self._get_event_stream():
32+
start_time = asyncio.get_event_loop().time()
33+
2534
await self._event_handler.handle_event(event, *args, **kwargs)
2635

36+
end_time = asyncio.get_event_loop().time()
37+
self._task_durations.append(end_time - start_time)
38+
2739
async def _get_event_stream(
2840
self,
2941
) -> AsyncIterable[Tuple[Event, Tuple[Any], Dict[str, Any]]]:

infrastructure/event_dispatcher/worker_pool.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ def __init__(
1818
num_piority_groups: int,
1919
event_handler: EventHandler,
2020
cancel_event: asyncio.Event,
21+
alpha: float = 0.7,
22+
beta: float = 0.3,
2123
):
2224
self.workers = []
2325
self.load_balancer = LoadBalancer(num_piority_groups)
@@ -26,6 +28,8 @@ def __init__(
2628
self.cancel_event = cancel_event
2729
self._num_priority_groups = num_piority_groups
2830
self._initialize_workers(num_workers)
31+
self.alpha = alpha
32+
self.beta = beta
2933

3034
async def dispatch_to_worker(self, event: Event, *args, **kwargs) -> None:
3135
priority_group = self.load_balancer.determine_priority_group(
@@ -60,14 +64,19 @@ def _distribute_workers(self, priority_group: int) -> List[EventWorker]:
6064
return self.workers[group_start:group_end]
6165

6266
def _choose_worker(self, group_workers: List[EventWorker]) -> EventWorker:
63-
not_busy_workers = [
64-
worker for worker in group_workers if worker.queue_size == 0
65-
]
67+
scores = np.array([worker.score for worker in group_workers])
68+
queue_sizes, median_times = scores[:, 0], scores[:, 1]
69+
70+
norm_queue_sizes = (queue_sizes - queue_sizes.min() + 1) / (
71+
queue_sizes.max() - queue_sizes.min() + 1
72+
)
73+
norm_median_times = (median_times - median_times.min() + 1) / (
74+
median_times.max() - median_times.min() + 1
75+
)
6676

67-
if not_busy_workers:
68-
return np.random.choice(not_busy_workers)
77+
combined_scores = self.alpha * norm_queue_sizes + self.beta * norm_median_times
6978

70-
weights = np.array([1 / (worker.queue_size + 1) for worker in group_workers])
79+
weights = 1 / (combined_scores + np.finfo(float).eps)
7180
total_weight = sum(weights)
7281

7382
choice_point = np.random.uniform(0, total_weight)

0 commit comments

Comments
 (0)