Skip to content

Commit 8452c70

Browse files
authored
feat: download data by chunks without redownloading existing data (#2)
1 parent a8bf402 commit 8452c70

19 files changed

+623
-184
lines changed

config/app.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
framework:
2+
cache:
3+
prefix_seed: 'stochastix.download'
4+
25
messenger:
36
transports:
47
stochastix: '%env(MESSENGER_TRANSPORT_DSN)%'

config/services.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,12 @@ services:
1616
- '../src/**/Exception'
1717
- '../src/**/Event'
1818
- '../src/**/Model'
19+
20+
stochastix.download.cancel.cache:
21+
class: Symfony\Component\Cache\Adapter\FilesystemAdapter
22+
arguments:
23+
- 'stochastix.download.cancel'
24+
- 3600
25+
- '%kernel.project_dir%/data/.cache'
26+
tags:
27+
- { name: 'cache.pool', namespace: 'stochastix.download' }

docs/api.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,27 @@ This document outlines the RESTful API endpoints provided by the `stochastix-cor
5252
---
5353
### Market Data
5454

55+
* **`GET /api/data/exchanges`**
56+
* **Description:** Retrieves a sorted list of all exchange IDs supported by the backend (via the CCXT library).
57+
* **Requires:** None
58+
* **Request Body:** None
59+
* **Success Response:** `200 OK`
60+
* **Example Success Response Body:**
61+
```json
62+
[
63+
"ace",
64+
"alpaca",
65+
"ascendex",
66+
"bequant",
67+
"bigone",
68+
"binance",
69+
"binancecoinm",
70+
"binanceus",
71+
"binanceusdm",
72+
"bingx"
73+
]
74+
```
75+
5576
* **`GET /api/data-availability`**
5677
* **Description:** Scans the server for available market data (`.stchx` files) and returns a manifest detailing available symbols, their timeframes, and the start/end dates for each dataset.
5778
* **Requires:** None
@@ -114,6 +135,21 @@ This document outlines the RESTful API endpoints provided by the `stochastix-cor
114135
* `400 Bad Request`: Invalid input (e.g., validation errors in the request body, end date before start date).
115136
* `500 Internal Server Error`: If the download message could not be dispatched to the queue.
116137

138+
* **`DELETE /api/data/download/{jobId}`**
139+
* **Description:** Requests the cancellation of a running download job. The cancellation may take a few moments to take effect, as it is checked between data chunk fetches.
140+
* **Requires:** None
141+
* **URL Parameters:**
142+
* `jobId` (string, required): The unique ID of the download job to cancel.
143+
* **Request Body:** None
144+
* **Success Response:** `202 Accepted`
145+
* **Example Success Response Body:**
146+
```json
147+
{
148+
"status": "cancellation_requested",
149+
"jobId": "download_666c1e5a7b8d9"
150+
}
151+
```
152+
* **Error Responses:** None.
117153

118154
* **`GET /api/data/inspect/{exchangeId}/{symbol}/{timeframe}`**
119155
* **Description:** Inspects a specific market data file (`.stchx`). Returns the file's header metadata, a sample of the first and last records, and a full data consistency validation report (checking for gaps, duplicates, and out-of-order records).

src/Command/DataDownloadCommand.php

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ protected function configure(): void
6262
'E',
6363
InputOption::VALUE_OPTIONAL,
6464
'The UTC end date/time (Format: Y-m-d[THH:MM:SS]). Defaults to "now".'
65+
)
66+
->addOption(
67+
'force',
68+
'f',
69+
InputOption::VALUE_NONE,
70+
'Force re-download and overwrite of the entire date range, even if data exists.'
6571
);
6672
}
6773

@@ -74,6 +80,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
7480
$exchange = $input->getArgument('exchange');
7581
$symbol = $input->getArgument('symbol');
7682
$timeframe = $input->getArgument('timeframe');
83+
$forceOverwrite = $input->getOption('force');
7784

7885
$startDateStr = $input->getOption('start-date');
7986
$endDateStr = $input->getOption('end-date');
@@ -89,6 +96,10 @@ protected function execute(InputInterface $input, OutputInterface $output): int
8996
return Command::INVALID;
9097
}
9198

99+
if ($forceOverwrite) {
100+
$io->warning('Force mode enabled: Existing data in the specified range will be overwritten.');
101+
}
102+
92103
$io->title('🚀 Stochastix OHLCV Data Downloader 🚀');
93104
$io->newLine();
94105
$io->section('Download Progress');
@@ -116,7 +127,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
116127
$symbol,
117128
$timeframe,
118129
$startDate,
119-
$endDate
130+
$endDate,
131+
$forceOverwrite
120132
);
121133

122134
$this->progressBar->finish();

src/Domain/Backtesting/MessageHandler/RunBacktestMessageHandler.php

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,14 @@ public function __invoke(RunBacktestMessage $message): void
7676

7777
private function publishUpdate(string $topic, array $data): void
7878
{
79-
$update = new Update($topic, json_encode($data, JSON_THROW_ON_ERROR));
80-
$this->mercureHub->publish($update);
79+
try {
80+
$update = new Update($topic, json_encode($data, JSON_THROW_ON_ERROR));
81+
$this->mercureHub->publish($update);
82+
} catch (\Throwable $e) {
83+
$this->logger->warning('Failed to publish update to Mercure. The backtest will continue.', [
84+
'topic' => $topic,
85+
'error' => $e->getMessage(),
86+
]);
87+
}
8188
}
8289
}

src/Domain/Backtesting/Service/MultiTimeframeDataService.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ private function generateFilePath(string $exchangeId, string $symbol, string $ti
7070
rtrim($this->baseDataPath, '/'),
7171
strtolower($exchangeId),
7272
strtoupper($sanitizedSymbol),
73-
strtolower($timeframe)
73+
$timeframe
7474
);
7575
}
7676
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
namespace Stochastix\Domain\Data\Controller;
4+
5+
use Psr\Cache\CacheItemPoolInterface;
6+
use Psr\Cache\InvalidArgumentException;
7+
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
8+
use Symfony\Component\DependencyInjection\Attribute\Autowire;
9+
use Symfony\Component\HttpFoundation\JsonResponse;
10+
use Symfony\Component\HttpFoundation\Response;
11+
use Symfony\Component\HttpKernel\Attribute\AsController;
12+
use Symfony\Component\Routing\Attribute\Route;
13+
14+
#[AsController]
15+
#[Route('/api/data/download/{jobId}', name: 'stochastix_api_data_cancel_download', methods: ['DELETE'])]
16+
class CancelDownloadAction extends AbstractController
17+
{
18+
public function __construct(
19+
#[Autowire(service: 'stochastix.download.cancel.cache')]
20+
private readonly CacheItemPoolInterface $cache,
21+
) {
22+
}
23+
24+
/**
25+
* @throws InvalidArgumentException
26+
*/
27+
public function __invoke(string $jobId): JsonResponse
28+
{
29+
$cacheKey = 'download.cancel.' . $jobId;
30+
$item = $this->cache->getItem($cacheKey);
31+
32+
$item->set(true);
33+
$item->expiresAfter(3600);
34+
$this->cache->save($item);
35+
36+
return $this->json(
37+
['status' => 'cancellation_requested', 'jobId' => $jobId],
38+
Response::HTTP_ACCEPTED
39+
);
40+
}
41+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
namespace Stochastix\Domain\Data\Controller;
4+
5+
use Stochastix\Domain\Data\Service\MarketDataService;
6+
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
7+
use Symfony\Component\HttpFoundation\JsonResponse;
8+
use Symfony\Component\HttpKernel\Attribute\AsController;
9+
use Symfony\Component\Routing\Attribute\Route;
10+
11+
#[AsController]
12+
#[Route('/api/data/exchanges', name: 'stochastix_api_data_exchanges', methods: ['GET'])]
13+
class GetExchangesAction extends AbstractController
14+
{
15+
public function __construct(private readonly MarketDataService $marketDataService)
16+
{
17+
}
18+
19+
public function __invoke(): JsonResponse
20+
{
21+
return $this->json($this->marketDataService->getExchanges());
22+
}
23+
}

src/Domain/Data/Dto/DownloadRequestDto.php

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,16 @@ public function __construct(
2020
public string $startDate,
2121
#[Assert\Date(message: 'End date must be in Y-m-d format.')]
2222
public string $endDate,
23+
public bool $forceOverwrite = false,
2324
) {
2425
}
2526

2627
public function validateDateRange(ExecutionContextInterface $context): void
2728
{
28-
if ($this->startDate !== null && $this->endDate !== null) {
29-
if ($this->endDate < $this->startDate) {
30-
$context->buildViolation('End date must be after or the same as start date.')
31-
->atPath('endDate')
32-
->addViolation();
33-
}
29+
if ($this->startDate !== null && $this->endDate !== null && $this->endDate < $this->startDate) {
30+
$context->buildViolation('End date must be after or the same as start date.')
31+
->atPath('endDate')
32+
->addViolation();
3433
}
3534
}
3635
}

src/Domain/Data/EventSubscriber/DownloadProgressSubscriber.php

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@
22

33
namespace Stochastix\Domain\Data\EventSubscriber;
44

5+
use Psr\Log\LoggerInterface;
56
use Stochastix\Domain\Data\Event\DownloadProgressEvent;
67
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
78
use Symfony\Component\Mercure\HubInterface;
89
use Symfony\Component\Mercure\Update;
910

1011
final readonly class DownloadProgressSubscriber implements EventSubscriberInterface
1112
{
12-
public function __construct(private HubInterface $mercureHub)
13-
{
13+
public function __construct(
14+
private HubInterface $mercureHub,
15+
private readonly LoggerInterface $logger,
16+
) {
1417
}
1518

1619
public static function getSubscribedEvents(): array
@@ -40,7 +43,14 @@ public function onDownloadProgress(DownloadProgressEvent $event): void
4043
'message' => "Fetched {$event->recordsFetchedInBatch} records up to " . gmdate('Y-m-d H:i:s', $event->lastTimestamp),
4144
];
4245

43-
$update = new Update($topic, json_encode($data, JSON_THROW_ON_ERROR));
44-
$this->mercureHub->publish($update);
46+
try {
47+
$update = new Update($topic, json_encode($data, JSON_THROW_ON_ERROR));
48+
$this->mercureHub->publish($update);
49+
} catch (\Throwable $e) {
50+
$this->logger->warning('Failed to publish progress update to Mercure.', [
51+
'jobId' => $event->jobId,
52+
'error' => $e->getMessage(),
53+
]);
54+
}
4555
}
4656
}

0 commit comments

Comments
 (0)