From 73034203d0e50c3a4bea7dea3a663ba721ac528d Mon Sep 17 00:00:00 2001 From: William Arin Date: Wed, 18 Jun 2025 18:16:41 +0800 Subject: [PATCH 1/8] feat: data download now skips existing data --- src/Command/DataDownloadCommand.php | 14 +- src/Domain/Data/Dto/DownloadRequestDto.php | 11 +- .../Data/Exception/EmptyHistoryException.php | 7 + .../DownloadDataMessageHandler.php | 1 + .../Data/Service/Exchange/CcxtAdapter.php | 43 ++- src/Domain/Data/Service/OhlcvDownloader.php | 308 +++++++++++------- .../Data/Service/OhlcvDownloaderTest.php | 99 ++++-- 7 files changed, 321 insertions(+), 162 deletions(-) create mode 100644 src/Domain/Data/Exception/EmptyHistoryException.php diff --git a/src/Command/DataDownloadCommand.php b/src/Command/DataDownloadCommand.php index a3fd660..d97c0fb 100644 --- a/src/Command/DataDownloadCommand.php +++ b/src/Command/DataDownloadCommand.php @@ -62,6 +62,12 @@ protected function configure(): void 'E', InputOption::VALUE_OPTIONAL, 'The UTC end date/time (Format: Y-m-d[THH:MM:SS]). Defaults to "now".' + ) + ->addOption( + 'force', + 'f', + InputOption::VALUE_NONE, + 'Force re-download and overwrite of the entire date range, even if data exists.' ); } @@ -74,6 +80,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int $exchange = $input->getArgument('exchange'); $symbol = $input->getArgument('symbol'); $timeframe = $input->getArgument('timeframe'); + $forceOverwrite = $input->getOption('force'); $startDateStr = $input->getOption('start-date'); $endDateStr = $input->getOption('end-date'); @@ -89,6 +96,10 @@ protected function execute(InputInterface $input, OutputInterface $output): int return Command::INVALID; } + if ($forceOverwrite) { + $io->warning('Force mode enabled: Existing data in the specified range will be overwritten.'); + } + $io->title('🚀 Stochastix OHLCV Data Downloader 🚀'); $io->newLine(); $io->section('Download Progress'); @@ -116,7 +127,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int $symbol, $timeframe, $startDate, - $endDate + $endDate, + $forceOverwrite ); $this->progressBar->finish(); diff --git a/src/Domain/Data/Dto/DownloadRequestDto.php b/src/Domain/Data/Dto/DownloadRequestDto.php index 361fe47..de88be7 100644 --- a/src/Domain/Data/Dto/DownloadRequestDto.php +++ b/src/Domain/Data/Dto/DownloadRequestDto.php @@ -20,17 +20,16 @@ public function __construct( public string $startDate, #[Assert\Date(message: 'End date must be in Y-m-d format.')] public string $endDate, + public bool $forceOverwrite = false, ) { } public function validateDateRange(ExecutionContextInterface $context): void { - if ($this->startDate !== null && $this->endDate !== null) { - if ($this->endDate < $this->startDate) { - $context->buildViolation('End date must be after or the same as start date.') - ->atPath('endDate') - ->addViolation(); - } + if ($this->startDate !== null && $this->endDate !== null && $this->endDate < $this->startDate) { + $context->buildViolation('End date must be after or the same as start date.') + ->atPath('endDate') + ->addViolation(); } } } diff --git a/src/Domain/Data/Exception/EmptyHistoryException.php b/src/Domain/Data/Exception/EmptyHistoryException.php new file mode 100644 index 0000000..0f2c295 --- /dev/null +++ b/src/Domain/Data/Exception/EmptyHistoryException.php @@ -0,0 +1,7 @@ +timeframe, $startDate, $endDate, + $dto->forceOverwrite, $jobId ); diff --git a/src/Domain/Data/Service/Exchange/CcxtAdapter.php b/src/Domain/Data/Service/Exchange/CcxtAdapter.php index 4ee313f..1c46006 100644 --- a/src/Domain/Data/Service/Exchange/CcxtAdapter.php +++ b/src/Domain/Data/Service/Exchange/CcxtAdapter.php @@ -6,14 +6,15 @@ use Psr\EventDispatcher\EventDispatcherInterface; use Psr\Log\LoggerInterface; use Stochastix\Domain\Data\Event\DownloadProgressEvent; +use Stochastix\Domain\Data\Exception\EmptyHistoryException; use Stochastix\Domain\Data\Exception\ExchangeException; -class CcxtAdapter implements ExchangeAdapterInterface +readonly class CcxtAdapter implements ExchangeAdapterInterface { public function __construct( - private readonly EventDispatcherInterface $eventDispatcher, - private readonly LoggerInterface $logger, - private readonly ExchangeFactory $exchangeFactory + private EventDispatcherInterface $eventDispatcher, + private LoggerInterface $logger, + private ExchangeFactory $exchangeFactory ) { } @@ -47,6 +48,7 @@ public function fetchOhlcv( $limit = $exchange->limits['OHLCV']['limit'] ?? 1000; $durationMs = Exchange::parse_timeframe($timeframe) * 1000; $totalDuration = max(1, $endTimestamp - $since); + $isFirstFetch = true; while ($since <= $endTimestamp) { try { @@ -56,10 +58,15 @@ public function fetchOhlcv( } if (empty($ohlcvs)) { + if ($isFirstFetch) { + throw new EmptyHistoryException("Exchange returned no data for {$symbol} starting from {$startTime->format('Y-m-d H:i:s')}. Data may not be available for this period."); + } $this->logger->info("No more OHLCV data returned for {$symbol} starting from " . ($since / 1000)); break; } + $isFirstFetch = false; + $lastTimestamp = 0; $batchRecordCount = 0; @@ -107,4 +114,32 @@ public function fetchOhlcv( usleep(200000); } } + + public function fetchFirstAvailableTimestamp(string $exchangeId, string $symbol, string $timeframe): ?\DateTimeImmutable + { + $exchange = $this->exchangeFactory->create($exchangeId); + if (!$exchange->has['fetchOHLCV']) { + return null; // The exchange can't fetch candles at all. + } + + try { + // Attempt to fetch just the very first record available from the exchange. + $ohlcvs = $exchange->fetch_ohlcv($symbol, $timeframe, null, 1); + + // If a record is returned, extract its timestamp. + if (!empty($ohlcvs) && isset($ohlcvs[0][0])) { + // CCXT timestamps are in milliseconds, convert to seconds. + return (new \DateTimeImmutable())->setTimestamp((int) ($ohlcvs[0][0] / 1000)); + } + } catch (\Throwable $e) { + $this->logger->warning( + 'Could not determine first available timestamp for {symbol} on {exchange}.', + ['symbol' => $symbol, 'exchange' => $exchangeId, 'reason' => $e->getMessage()] + ); + + return null; + } + + return null; + } } diff --git a/src/Domain/Data/Service/OhlcvDownloader.php b/src/Domain/Data/Service/OhlcvDownloader.php index ebc9651..45e537b 100644 --- a/src/Domain/Data/Service/OhlcvDownloader.php +++ b/src/Domain/Data/Service/OhlcvDownloader.php @@ -4,8 +4,7 @@ use Psr\Log\LoggerInterface; use Stochastix\Domain\Data\Exception\DownloaderException; -use Stochastix\Domain\Data\Exception\ExchangeException; -use Stochastix\Domain\Data\Exception\StorageException; +use Stochastix\Domain\Data\Exception\EmptyHistoryException; use Stochastix\Domain\Data\Service\Exchange\ExchangeAdapterInterface; use Symfony\Component\DependencyInjection\Attribute\Autowire; @@ -20,112 +19,183 @@ public function __construct( ) { } - /** - * Downloads OHLCV data, streams it to a temporary file, - * and then merges it with any existing data. - * - * @return string the final path to the data file - * - * @throws DownloaderException - */ public function download( string $exchangeId, string $symbol, string $timeframe, \DateTimeImmutable $startTime, \DateTimeImmutable $endTime, + bool $forceOverwrite = false, ?string $jobId = null, ): string { $finalPath = $this->generateFilePath($exchangeId, $symbol, $timeframe); - $tempPath = $this->binaryStorage->getTempFilePath($finalPath); - $mergedPath = $this->binaryStorage->getMergedTempFilePath($finalPath); - - $this->logger->info( - 'Starting download: {exchange}/{symbol} [{timeframe}] from {start} to {end}', - [ - 'exchange' => $exchangeId, - 'symbol' => $symbol, - 'timeframe' => $timeframe, - 'start' => $startTime->format('Y-m-d H:i:s'), - 'end' => $endTime->format('Y-m-d H:i:s'), - 'final_path' => $finalPath, - ] - ); - - // Ensure temp files are cleaned up if they exist from previous failed runs. - $this->cleanupFile($tempPath, 'previous .tmp file'); - $this->cleanupFile($mergedPath, 'previous .merged.tmp file'); try { - // 1. Download data to the .tmp file - $this->downloadToTemp($exchangeId, $symbol, $timeframe, $startTime, $endTime, $tempPath, $jobId); - - // Check if any data was actually downloaded before merging/renaming - if (!file_exists($tempPath) || filesize($tempPath) <= 64) { - $this->logger->warning('No new data downloaded to {temp}. Ensuring final file exists.', ['temp' => $tempPath]); - // If original doesn't exist, create an empty one. - if (!file_exists($finalPath)) { - $this->binaryStorage->createFile($finalPath, $symbol, $timeframe); - } - $this->cleanupFile($tempPath); // Clean up empty/failed temp. + $rangesToDownload = $this->calculateMissingRanges($finalPath, $startTime, $endTime, $timeframe, $forceOverwrite); - return $finalPath; // Return path to original or new empty file. + if (empty($rangesToDownload)) { + $this->logger->info('Local data is already complete for the requested range. No download needed.', ['path' => $finalPath]); + + return $finalPath; } - // 2. Check if original file exists and needs merging. - $originalExists = file_exists($finalPath) && filesize($finalPath) > 64; + $this->logger->info('Found {count} missing data range(s) to download.', ['count' => count($rangesToDownload)]); - if (!$originalExists) { - // No original file, just rename .tmp to .stchx - $this->logger->info('No existing data found. Renaming {temp} to {final}.', [ - 'temp' => $tempPath, - 'final' => $finalPath, - ]); - $this->binaryStorage->atomicRename($tempPath, $finalPath); - } else { - // Original exists, perform the K-way merge - $this->logger->info('Existing data found. Merging {final} and {temp} into {merged}.', [ - 'final' => $finalPath, - 'temp' => $tempPath, - 'merged' => $mergedPath, - ]); - $this->binaryStorage->mergeAndWrite($finalPath, $tempPath, $mergedPath); - - $this->logger->info('Merge complete. Renaming {merged} to {final}.', [ - 'merged' => $mergedPath, - 'final' => $finalPath, - ]); - $this->binaryStorage->atomicRename($mergedPath, $finalPath); - } + $downloadedTempFiles = []; + + foreach ($rangesToDownload as $i => $range) { + [$chunkStartTime, $chunkEndTime] = $range; + + $tempPath = $this->binaryStorage->getTempFilePath($finalPath) . ".chunk.{$i}"; + $this->cleanupFile($tempPath); + + try { + $this->logger->info('Downloading chunk #{num}: {start} to {end}', ['num' => $i + 1, 'start' => $chunkStartTime->format('Y-m-d H:i:s'), 'end' => $chunkEndTime->format('Y-m-d H:i:s')]); + $this->downloadToTemp($exchangeId, $symbol, $timeframe, $chunkStartTime, $chunkEndTime, $tempPath, $jobId); + } catch (EmptyHistoryException $e) { + $this->logger->warning('Initial date {start_date} is too early. Attempting to find the earliest available data from the exchange.', ['start_date' => $chunkStartTime->format('Y-m-d H:i:s')]); + $firstAvailableDate = $this->exchangeAdapter->fetchFirstAvailableTimestamp($exchangeId, $symbol, $timeframe); + + if ($firstAvailableDate !== null && $firstAvailableDate <= $chunkEndTime) { + $this->logger->info('Found earliest data at {real_start}. Resuming download for the adjusted range.', ['real_start' => $firstAvailableDate->format('Y-m-d H:i:s')]); + $this->downloadToTemp($exchangeId, $symbol, $timeframe, $firstAvailableDate, $chunkEndTime, $tempPath, $jobId); + } else { + $this->logger->warning('Could not determine a valid start date or the earliest data is outside the requested range for {symbol}. Skipping this chunk.', ['symbol' => $symbol]); + } + } - $this->logger->info('Download and integration successful: {file}', ['file' => $finalPath]); + if (file_exists($tempPath) && filesize($tempPath) > 64) { + $downloadedTempFiles[] = $tempPath; + } + } - // 3. Clean up on success - $this->cleanupFile($tempPath, '.tmp file after success'); - $this->cleanupFile($mergedPath, '.merged.tmp file after success'); + if (empty($downloadedTempFiles) && !file_exists($finalPath)) { + $this->binaryStorage->createFile($finalPath, $symbol, $timeframe); + } elseif (!empty($downloadedTempFiles)) { + $this->mergeFiles(file_exists($finalPath) ? $finalPath : null, $downloadedTempFiles, $finalPath); + } return $finalPath; } catch (\Throwable $e) { $this->logger->error( - 'Download failed: {message}. Temp files may remain for inspection: {temp}, {merged}', - [ - 'message' => $e->getMessage(), - 'temp' => $tempPath, - 'merged' => $mergedPath, - 'exception' => $e, - ] + 'Download failed: {message}.', + ['message' => $e->getMessage(), 'exception' => $e] ); - - // Re-throw as a DownloaderException throw new DownloaderException("Download failed for {$exchangeId}/{$symbol}: {$e->getMessage()}", $e->getCode(), $e); } } /** - * Handles the actual fetching and writing to the temporary file. - * - * @throws DownloaderException|StorageException|ExchangeException + * @return array */ + private function calculateMissingRanges(string $filePath, \DateTimeImmutable $requestStart, \DateTimeImmutable $requestEnd, string $timeframe, bool $forceOverwrite): array + { + $fileExists = file_exists($filePath) && filesize($filePath) > 64; + + if ($forceOverwrite || !$fileExists) { + return [[$requestStart, $requestEnd]]; + } + + $header = $this->binaryStorage->readHeader($filePath); + + if ($header['numRecords'] < 1) { + return [[$requestStart, $requestEnd]]; + } + + $firstRecord = $this->binaryStorage->readRecordByIndex($filePath, 0); + $lastRecord = $this->binaryStorage->readRecordByIndex($filePath, $header['numRecords'] - 1); + + $localStart = new \DateTimeImmutable()->setTimestamp($firstRecord['timestamp']); + $localEnd = new \DateTimeImmutable()->setTimestamp($lastRecord['timestamp']); + + $rangesToDownload = []; + + // 1. Calculate the "before" chunk, correctly clipped by the request's end date. + $beforeChunkStart = $requestStart; + $beforeChunkEnd = $localStart->modify('-1 second'); + if ($beforeChunkStart < $beforeChunkEnd) { + $actualEnd = min($requestEnd, $beforeChunkEnd); + if ($beforeChunkStart <= $actualEnd) { + $rangesToDownload[] = [$beforeChunkStart, $actualEnd]; + } + } + + // 2. Calculate internal gaps, correctly clipped by the request's date range. + $internalGaps = $this->findGapsInFile($filePath, $timeframe); + foreach ($internalGaps as $gap) { + $downloadStart = max($requestStart, $gap[0]); + $downloadEnd = min($requestEnd, $gap[1]); + if ($downloadStart <= $downloadEnd) { + $rangesToDownload[] = [$downloadStart, $downloadEnd]; + } + } + + // 3. Calculate the "after" chunk, correctly clipped by the request's start date. + $afterChunkStart = $localEnd->modify('+1 second'); + $afterChunkEnd = $requestEnd; + if ($afterChunkStart < $afterChunkEnd) { + $actualStart = max($requestStart, $afterChunkStart); + if ($actualStart <= $afterChunkEnd) { + $rangesToDownload[] = [$actualStart, $afterChunkEnd]; + } + } + + return $this->sortAndMergeRanges($rangesToDownload); + } + + private function findGapsInFile(string $filePath, string $timeframe): array + { + $expectedInterval = $this->timeframeToSeconds($timeframe); + if ($expectedInterval === null) { + return []; + } + + $gaps = []; + $records = $this->binaryStorage->readRecordsSequentially($filePath); + $previousTimestamp = null; + + foreach ($records as $record) { + $currentTimestamp = $record['timestamp']; + if ($previousTimestamp !== null) { + $diff = $currentTimestamp - $previousTimestamp; + if ($diff > $expectedInterval) { + $gapStart = new \DateTimeImmutable()->setTimestamp($previousTimestamp + $expectedInterval); + $gapEnd = new \DateTimeImmutable()->setTimestamp($currentTimestamp - 1); + if ($gapStart <= $gapEnd) { + $gaps[] = [$gapStart, $gapEnd]; + } + } + } + $previousTimestamp = $currentTimestamp; + } + + return $gaps; + } + + private function mergeFiles(?string $originalPath, array $tempFiles, string $finalPath): void + { + $currentFileToMerge = $originalPath; + + foreach ($tempFiles as $i => $tempFile) { + $mergedPath = $this->binaryStorage->getMergedTempFilePath($finalPath) . ".{$i}"; + if ($currentFileToMerge === null) { + $this->binaryStorage->atomicRename($tempFile, $mergedPath); + } else { + $this->binaryStorage->mergeAndWrite($currentFileToMerge, $tempFile, $mergedPath); + } + + if ($currentFileToMerge !== null && $currentFileToMerge !== $finalPath) { + $this->cleanupFile($currentFileToMerge); + } + $this->cleanupFile($tempFile); + $currentFileToMerge = $mergedPath; + } + + if ($currentFileToMerge !== null) { + $this->binaryStorage->atomicRename($currentFileToMerge, $finalPath); + } + } + private function downloadToTemp( string $exchangeId, string $symbol, @@ -139,60 +209,64 @@ private function downloadToTemp( throw new DownloaderException("Exchange '{$exchangeId}' is not supported."); } - // Create the temp file with its header $this->binaryStorage->createFile($tempPath, $symbol, $timeframe); - $this->logger->debug('Initialized temp file: {file}', ['file' => $tempPath]); - - // Get the generator from the exchange adapter - $recordsGenerator = $this->exchangeAdapter->fetchOhlcv( - $exchangeId, - $symbol, - $timeframe, - $startTime, - $endTime, - $jobId, - ); - $this->logger->debug('Starting data fetch to temp file...'); - - // Stream records directly to the temp file + $recordsGenerator = $this->exchangeAdapter->fetchOhlcv($exchangeId, $symbol, $timeframe, $startTime, $endTime, $jobId); $recordCount = $this->binaryStorage->appendRecords($tempPath, $recordsGenerator); - $this->logger->info('Streamed {count} records to temp file.', ['count' => $recordCount]); - // Update the record count in the temp file's header if ($recordCount > 0) { $this->binaryStorage->updateRecordCount($tempPath, $recordCount); - $this->logger->debug('Updated temp header record count to {count}.', ['count' => $recordCount]); - } else { - $this->logger->warning('No records were downloaded for {symbol} in the specified range.', ['symbol' => $symbol]); } } - /** - * Generates the final path for the .stchx file. - */ private function generateFilePath(string $exchangeId, string $symbol, string $timeframe): string { $sanitizedSymbol = str_replace('/', '_', $symbol); - return sprintf( - '%s/%s/%s/%s.stchx', - rtrim($this->baseDataPath, '/'), - strtolower($exchangeId), - strtoupper($sanitizedSymbol), - $timeframe, - ); + return sprintf('%s/%s/%s/%s.stchx', rtrim($this->baseDataPath, '/'), strtolower($exchangeId), strtoupper($sanitizedSymbol), $timeframe); } - /** - * Safely deletes a file if it exists. - */ private function cleanupFile(string $filePath, string $reason = ''): void { - if (file_exists($filePath)) { - $this->logger->debug('Cleaning up {reason}: {file}', ['reason' => $reason, 'file' => $filePath]); - if (!@unlink($filePath)) { - $this->logger->warning('Could not clean up temporary file: {file}', ['file' => $filePath]); + if (file_exists($filePath) && !@unlink($filePath)) { + $this->logger->warning('Could not clean up temporary file: {file}', ['file' => $filePath]); + } + } + + private function timeframeToSeconds(string $timeframe): ?int + { + $unit = substr($timeframe, -1); + $value = (int) substr($timeframe, 0, -1); + + if ($value <= 0) { + return null; + } + + return match ($unit) { + 'm' => $value * 60, 'h' => $value * 3600, 'd' => $value * 86400, 'w' => $value * 604800, default => null + }; + } + + private function sortAndMergeRanges(array $ranges): array + { + if (count($ranges) <= 1) { + return $ranges; + } + + usort($ranges, static fn ($a, $b) => $a[0] <=> $b[0]); + $merged = []; + $currentRange = array_shift($ranges); + + foreach ($ranges as $range) { + if ($range[0] <= $currentRange[1]->modify('+1 second')) { + $currentRange[1] = max($currentRange[1], $range[1]); + } else { + $merged[] = $currentRange; + $currentRange = $range; } } + + $merged[] = $currentRange; + + return $merged; } } diff --git a/tests/Domain/Data/Service/OhlcvDownloaderTest.php b/tests/Domain/Data/Service/OhlcvDownloaderTest.php index 3fe721b..94a8098 100644 --- a/tests/Domain/Data/Service/OhlcvDownloaderTest.php +++ b/tests/Domain/Data/Service/OhlcvDownloaderTest.php @@ -39,30 +39,22 @@ protected function setUp(): void public function testDownloadForNewFile(): void { $finalPath = $this->vfsRoot->url() . '/market/binance/BTC_USDT/1h.stchx'; - $tempPath = $finalPath . '.tmp'; - // **THE FIX**: Create the parent directory structure so the service can create files within it. vfsStream::create(['market' => ['binance' => ['BTC_USDT' => []]]], $this->vfsRoot); $this->exchangeAdapterMock->method('supportsExchange')->willReturn(true); $this->exchangeAdapterMock->method('fetchOhlcv')->willReturn((static fn (): \Generator => yield ['ts' => 1])()); - // Configure mocks to interact with the virtual filesystem - $this->binaryStorageMock->method('getTempFilePath')->willReturn($tempPath); - $this->binaryStorageMock->expects($this->once()) // Expect only ONE call - ->method('createFile') - ->willReturnCallback(function (string $path) { - // This callback ensures that when the service tries to create the temp file, - // it actually appears in the virtual filesystem with content, passing the filesize() check. - file_put_contents($path, str_repeat('a', 128)); - }); + $this->binaryStorageMock->method('getTempFilePath')->willReturn($finalPath . '.tmp'); + $this->binaryStorageMock->method('getMergedTempFilePath')->willReturn($finalPath . '.merged.tmp'); + + $this->binaryStorageMock->expects($this->once()) + ->method('createFile') + ->willReturnCallback(fn (string $path) => file_put_contents($path, str_repeat('a', 128))); $this->binaryStorageMock->method('appendRecords')->willReturn(1); - // Assert the correct workflow for a new file - $this->binaryStorageMock->expects($this->never())->method('mergeAndWrite'); - $this->binaryStorageMock->expects($this->once())->method('atomicRename') - ->with($tempPath, $finalPath); + $this->binaryStorageMock->expects($this->exactly(2))->method('atomicRename'); $this->downloader->download( 'binance', @@ -73,44 +65,83 @@ public function testDownloadForNewFile(): void ); } - public function testDownloadMergesWithExistingFile(): void + public function testDownloadWithForceOverwriteMergesWithExistingFile(): void { $finalPath = $this->vfsRoot->url() . '/market/binance/BTC_USDT/1h.stchx'; $tempPath = $finalPath . '.tmp'; $mergedPath = $finalPath . '.merged.tmp'; - // **THE FIX**: Create the directory and the pre-existing file with content directly. - vfsStream::create([ - 'market' => [ - 'binance' => [ - 'BTC_USDT' => [ - '1h.stchx' => str_repeat('a', 128), // File size > 64 - ], - ], - ], - ], $this->vfsRoot); + vfsStream::create(['market' => ['binance' => ['BTC_USDT' => ['1h.stchx' => str_repeat('a', 128)]]]], $this->vfsRoot); $this->exchangeAdapterMock->method('supportsExchange')->willReturn(true); $this->exchangeAdapterMock->method('fetchOhlcv')->willReturn((static fn (): \Generator => yield ['ts' => 1])()); $this->binaryStorageMock->method('getTempFilePath')->willReturn($tempPath); $this->binaryStorageMock->method('getMergedTempFilePath')->willReturn($mergedPath); - - // The downloader will still create a temp file for the new data. - $this->binaryStorageMock->method('createFile') - ->willReturnCallback(fn (string $path) => file_put_contents($path, str_repeat('b', 128))); + $this->binaryStorageMock->method('createFile')->willReturnCallback(fn (string $path) => file_put_contents($path, str_repeat('b', 128))); $this->binaryStorageMock->method('appendRecords')->willReturn(1); - - // Assert the correct workflow for a merge $this->binaryStorageMock->expects($this->once())->method('mergeAndWrite'); + $this->binaryStorageMock->expects($this->once())->method('atomicRename') - ->with($mergedPath, $finalPath); + ->with($this->stringEndsWith('.merged.tmp.0'), $finalPath); $this->downloader->download( 'binance', 'BTC/USDT', '1h', new \DateTimeImmutable('2024-01-01'), - new \DateTimeImmutable('2024-01-02') + new \DateTimeImmutable('2024-01-02'), + true // Force overwrite + ); + } + + public function testDownloadFillsInternalGaps(): void + { + $finalPath = $this->vfsRoot->url() . '/market/binance/BTC_USDT/1h.stchx'; + vfsStream::create(['market' => ['binance' => ['BTC_USDT' => ['1h.stchx' => str_repeat('a', 128)]]]], $this->vfsRoot); + + // --- Mocks for Gap Detection --- + $this->binaryStorageMock->method('readHeader')->willReturn(['numRecords' => 2]); + $this->binaryStorageMock->method('readRecordByIndex') + ->willReturnOnConsecutiveCalls( + ['timestamp' => strtotime('2024-01-01 10:00:00')], // Local start + ['timestamp' => strtotime('2024-01-01 12:00:00')] // Local end (note the 1-hour gap) + ); + $this->binaryStorageMock->method('readRecordsSequentially') + ->willReturn((static fn () => yield from [ + ['timestamp' => strtotime('2024-01-01 10:00:00')], + ['timestamp' => strtotime('2024-01-01 12:00:00')], + ])()); + + // --- Mocks for Downloading the Gap --- + $this->exchangeAdapterMock->method('supportsExchange')->willReturn(true); + $this->exchangeAdapterMock->expects($this->once()) + ->method('fetchOhlcv') + ->with( + $this->anything(), + $this->anything(), + '1h', + $this->equalTo(new \DateTimeImmutable('2024-01-01 11:00:00')), + $this->equalTo(new \DateTimeImmutable('2024-01-01 11:59:59')) + ) + ->willReturn((static fn (): \Generator => yield ['timestamp' => strtotime('2024-01-01 11:00:00')])()); + + $this->binaryStorageMock->method('getTempFilePath')->willReturn($finalPath . '.tmp'); + $this->binaryStorageMock->method('getMergedTempFilePath')->willReturn($finalPath . '.merged.tmp'); + + $this->binaryStorageMock->method('createFile') + ->willReturnCallback(fn (string $path) => file_put_contents($path, str_repeat('a', 128))); + + $this->binaryStorageMock->method('appendRecords')->willReturn(1); + $this->binaryStorageMock->expects($this->once())->method('mergeAndWrite'); + + // --- Execute --- + $this->downloader->download( + 'binance', + 'BTC/USDT', + '1h', + new \DateTimeImmutable('2024-01-01 10:00:00'), + new \DateTimeImmutable('2024-01-01 12:00:00'), + false ); } From 91c3f801f9eea64f1aa0cd8f445867a1883b31fb Mon Sep 17 00:00:00 2001 From: William Arin Date: Wed, 18 Jun 2025 20:58:00 +0800 Subject: [PATCH 2/8] feat: new endpoint to fetch all existing exchanges --- docs/api.md | 21 +++++++++++++++++ .../Data/Controller/GetExchangesAction.php | 23 +++++++++++++++++++ src/Domain/Data/Service/MarketDataService.php | 12 ++++++++++ 3 files changed, 56 insertions(+) create mode 100644 src/Domain/Data/Controller/GetExchangesAction.php diff --git a/docs/api.md b/docs/api.md index ed6d8fe..2ba771b 100644 --- a/docs/api.md +++ b/docs/api.md @@ -52,6 +52,27 @@ This document outlines the RESTful API endpoints provided by the `stochastix-cor --- ### Market Data +* **`GET /api/data/exchanges`** + * **Description:** Retrieves a sorted list of all exchange IDs supported by the backend (via the CCXT library). + * **Requires:** None + * **Request Body:** None + * **Success Response:** `200 OK` + * **Example Success Response Body:** + ```json + [ + "ace", + "alpaca", + "ascendex", + "bequant", + "bigone", + "binance", + "binancecoinm", + "binanceus", + "binanceusdm", + "bingx" + ] + ``` + * **`GET /api/data-availability`** * **Description:** Scans the server for available market data (`.stchx` files) and returns a manifest detailing available symbols, their timeframes, and the start/end dates for each dataset. * **Requires:** None diff --git a/src/Domain/Data/Controller/GetExchangesAction.php b/src/Domain/Data/Controller/GetExchangesAction.php new file mode 100644 index 0000000..4bd9805 --- /dev/null +++ b/src/Domain/Data/Controller/GetExchangesAction.php @@ -0,0 +1,23 @@ +json($this->marketDataService->getExchanges()); + } +} diff --git a/src/Domain/Data/Service/MarketDataService.php b/src/Domain/Data/Service/MarketDataService.php index a1055d3..da7c730 100644 --- a/src/Domain/Data/Service/MarketDataService.php +++ b/src/Domain/Data/Service/MarketDataService.php @@ -2,6 +2,7 @@ namespace Stochastix\Domain\Data\Service; +use ccxt\Exchange; use Psr\Cache\InvalidArgumentException; use Stochastix\Domain\Data\Exception\ExchangeException; use Stochastix\Domain\Data\Service\Exchange\ExchangeFactory; @@ -54,4 +55,15 @@ public function getFuturesSymbols(string $exchangeId): array } }); } + + /** + * @return string[] + */ + public function getExchanges(): array + { + $exchanges = Exchange::$exchanges; + sort($exchanges); + + return $exchanges; + } } From a082ada1a83a189f621306447106227ab36438b4 Mon Sep 17 00:00:00 2001 From: William Arin Date: Wed, 18 Jun 2025 23:00:03 +0800 Subject: [PATCH 3/8] feat: endpoint to cancel a market data download --- config/app.yaml | 3 ++ config/services.yaml | 9 ++++ docs/api.md | 15 +++++++ .../Data/Controller/CancelDownloadAction.php | 41 +++++++++++++++++++ .../Exception/DownloadCancelledException.php | 7 ++++ .../DownloadDataMessageHandler.php | 4 ++ .../Data/Service/Exchange/CcxtAdapter.php | 16 +++++++- src/Domain/Data/Service/OhlcvDownloader.php | 3 ++ 8 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 src/Domain/Data/Controller/CancelDownloadAction.php create mode 100644 src/Domain/Data/Exception/DownloadCancelledException.php diff --git a/config/app.yaml b/config/app.yaml index 01acef6..481fb62 100644 --- a/config/app.yaml +++ b/config/app.yaml @@ -1,4 +1,7 @@ framework: + cache: + prefix_seed: 'stochastix.download' + messenger: transports: stochastix: '%env(MESSENGER_TRANSPORT_DSN)%' diff --git a/config/services.yaml b/config/services.yaml index 9d78712..d19dc1b 100644 --- a/config/services.yaml +++ b/config/services.yaml @@ -16,3 +16,12 @@ services: - '../src/**/Exception' - '../src/**/Event' - '../src/**/Model' + + stochastix.download.cancel.cache: + class: Symfony\Component\Cache\Adapter\FilesystemAdapter + arguments: + - 'stochastix.download.cancel' + - 3600 + - '%kernel.project_dir%/data/.cache' + tags: + - { name: 'cache.pool', namespace: 'stochastix.download' } diff --git a/docs/api.md b/docs/api.md index 2ba771b..2259848 100644 --- a/docs/api.md +++ b/docs/api.md @@ -135,6 +135,21 @@ This document outlines the RESTful API endpoints provided by the `stochastix-cor * `400 Bad Request`: Invalid input (e.g., validation errors in the request body, end date before start date). * `500 Internal Server Error`: If the download message could not be dispatched to the queue. +* **`DELETE /api/data/download/{jobId}`** + * **Description:** Requests the cancellation of a running download job. The cancellation may take a few moments to take effect, as it is checked between data chunk fetches. + * **Requires:** None + * **URL Parameters:** + * `jobId` (string, required): The unique ID of the download job to cancel. + * **Request Body:** None + * **Success Response:** `202 Accepted` + * **Example Success Response Body:** + ```json + { + "status": "cancellation_requested", + "jobId": "download_666c1e5a7b8d9" + } + ``` + * **Error Responses:** None. * **`GET /api/data/inspect/{exchangeId}/{symbol}/{timeframe}`** * **Description:** Inspects a specific market data file (`.stchx`). Returns the file's header metadata, a sample of the first and last records, and a full data consistency validation report (checking for gaps, duplicates, and out-of-order records). diff --git a/src/Domain/Data/Controller/CancelDownloadAction.php b/src/Domain/Data/Controller/CancelDownloadAction.php new file mode 100644 index 0000000..4296c42 --- /dev/null +++ b/src/Domain/Data/Controller/CancelDownloadAction.php @@ -0,0 +1,41 @@ +cache->getItem($cacheKey); + + $item->set(true); + $item->expiresAfter(3600); + $this->cache->save($item); + + return $this->json( + ['status' => 'cancellation_requested', 'jobId' => $jobId], + Response::HTTP_ACCEPTED + ); + } +} diff --git a/src/Domain/Data/Exception/DownloadCancelledException.php b/src/Domain/Data/Exception/DownloadCancelledException.php new file mode 100644 index 0000000..3beacb7 --- /dev/null +++ b/src/Domain/Data/Exception/DownloadCancelledException.php @@ -0,0 +1,7 @@ + 100, 'message' => 'Download completed successfully.', ]); + } catch (DownloadCancelledException $e) { + $this->logger->info('Data download job {jobId} was cancelled.', ['jobId' => $jobId, 'reason' => $e->getMessage()]); + $this->publishUpdate($topic, ['status' => 'cancelled', 'message' => 'Download was cancelled by the user.']); } catch (\Throwable $e) { $this->logger->error('Data download job {jobId} failed: {message}', [ 'jobId' => $jobId, diff --git a/src/Domain/Data/Service/Exchange/CcxtAdapter.php b/src/Domain/Data/Service/Exchange/CcxtAdapter.php index 1c46006..f10caf7 100644 --- a/src/Domain/Data/Service/Exchange/CcxtAdapter.php +++ b/src/Domain/Data/Service/Exchange/CcxtAdapter.php @@ -3,18 +3,23 @@ namespace Stochastix\Domain\Data\Service\Exchange; use ccxt\Exchange; +use Psr\Cache\CacheItemPoolInterface; use Psr\EventDispatcher\EventDispatcherInterface; use Psr\Log\LoggerInterface; use Stochastix\Domain\Data\Event\DownloadProgressEvent; +use Stochastix\Domain\Data\Exception\DownloadCancelledException; use Stochastix\Domain\Data\Exception\EmptyHistoryException; use Stochastix\Domain\Data\Exception\ExchangeException; +use Symfony\Component\DependencyInjection\Attribute\Autowire; readonly class CcxtAdapter implements ExchangeAdapterInterface { public function __construct( private EventDispatcherInterface $eventDispatcher, private LoggerInterface $logger, - private ExchangeFactory $exchangeFactory + private ExchangeFactory $exchangeFactory, + #[Autowire(service: 'stochastix.download.cancel.cache')] + private CacheItemPoolInterface $cache, ) { } @@ -49,8 +54,17 @@ public function fetchOhlcv( $durationMs = Exchange::parse_timeframe($timeframe) * 1000; $totalDuration = max(1, $endTimestamp - $since); $isFirstFetch = true; + $cancellationCacheKey = 'download.cancel.' . $jobId; while ($since <= $endTimestamp) { + if ($jobId) { + $cancellationItem = $this->cache->getItem($cancellationCacheKey); + if ($cancellationItem->isHit()) { + $this->cache->deleteItem($cancellationCacheKey); + throw new DownloadCancelledException("Download job {$jobId} was cancelled by user request."); + } + } + try { $ohlcvs = $exchange->fetch_ohlcv($symbol, $timeframe, $since, $limit); } catch (\Throwable $e) { diff --git a/src/Domain/Data/Service/OhlcvDownloader.php b/src/Domain/Data/Service/OhlcvDownloader.php index 45e537b..d1cf9b6 100644 --- a/src/Domain/Data/Service/OhlcvDownloader.php +++ b/src/Domain/Data/Service/OhlcvDownloader.php @@ -3,6 +3,7 @@ namespace Stochastix\Domain\Data\Service; use Psr\Log\LoggerInterface; +use Stochastix\Domain\Data\Exception\DownloadCancelledException; use Stochastix\Domain\Data\Exception\DownloaderException; use Stochastix\Domain\Data\Exception\EmptyHistoryException; use Stochastix\Domain\Data\Service\Exchange\ExchangeAdapterInterface; @@ -76,6 +77,8 @@ public function download( } return $finalPath; + } catch (DownloadCancelledException $e) { + throw $e; } catch (\Throwable $e) { $this->logger->error( 'Download failed: {message}.', From 93b0fcb944bbc5ae2cb8870b17207e1037bbfb51 Mon Sep 17 00:00:00 2001 From: William Arin Date: Thu, 19 Jun 2025 17:37:42 +0800 Subject: [PATCH 4/8] fix: no more job failing because of a failed mercure update --- .../RunBacktestMessageHandler.php | 11 +++++++++-- .../DownloadProgressSubscriber.php | 18 ++++++++++++++---- .../DownloadDataMessageHandler.php | 11 +++++++++-- 3 files changed, 32 insertions(+), 8 deletions(-) diff --git a/src/Domain/Backtesting/MessageHandler/RunBacktestMessageHandler.php b/src/Domain/Backtesting/MessageHandler/RunBacktestMessageHandler.php index 08dd456..4a22748 100644 --- a/src/Domain/Backtesting/MessageHandler/RunBacktestMessageHandler.php +++ b/src/Domain/Backtesting/MessageHandler/RunBacktestMessageHandler.php @@ -76,7 +76,14 @@ public function __invoke(RunBacktestMessage $message): void private function publishUpdate(string $topic, array $data): void { - $update = new Update($topic, json_encode($data, JSON_THROW_ON_ERROR)); - $this->mercureHub->publish($update); + try { + $update = new Update($topic, json_encode($data, JSON_THROW_ON_ERROR)); + $this->mercureHub->publish($update); + } catch (\Throwable $e) { + $this->logger->warning('Failed to publish update to Mercure. The backtest will continue.', [ + 'topic' => $topic, + 'error' => $e->getMessage(), + ]); + } } } diff --git a/src/Domain/Data/EventSubscriber/DownloadProgressSubscriber.php b/src/Domain/Data/EventSubscriber/DownloadProgressSubscriber.php index c4cb102..1674ce3 100644 --- a/src/Domain/Data/EventSubscriber/DownloadProgressSubscriber.php +++ b/src/Domain/Data/EventSubscriber/DownloadProgressSubscriber.php @@ -2,6 +2,7 @@ namespace Stochastix\Domain\Data\EventSubscriber; +use Psr\Log\LoggerInterface; use Stochastix\Domain\Data\Event\DownloadProgressEvent; use Symfony\Component\EventDispatcher\EventSubscriberInterface; use Symfony\Component\Mercure\HubInterface; @@ -9,8 +10,10 @@ final readonly class DownloadProgressSubscriber implements EventSubscriberInterface { - public function __construct(private HubInterface $mercureHub) - { + public function __construct( + private HubInterface $mercureHub, + private readonly LoggerInterface $logger, + ) { } public static function getSubscribedEvents(): array @@ -40,7 +43,14 @@ public function onDownloadProgress(DownloadProgressEvent $event): void 'message' => "Fetched {$event->recordsFetchedInBatch} records up to " . gmdate('Y-m-d H:i:s', $event->lastTimestamp), ]; - $update = new Update($topic, json_encode($data, JSON_THROW_ON_ERROR)); - $this->mercureHub->publish($update); + try { + $update = new Update($topic, json_encode($data, JSON_THROW_ON_ERROR)); + $this->mercureHub->publish($update); + } catch (\Throwable $e) { + $this->logger->warning('Failed to publish progress update to Mercure.', [ + 'jobId' => $event->jobId, + 'error' => $e->getMessage(), + ]); + } } } diff --git a/src/Domain/Data/MessageHandler/DownloadDataMessageHandler.php b/src/Domain/Data/MessageHandler/DownloadDataMessageHandler.php index eebd547..8fbcbc0 100644 --- a/src/Domain/Data/MessageHandler/DownloadDataMessageHandler.php +++ b/src/Domain/Data/MessageHandler/DownloadDataMessageHandler.php @@ -75,7 +75,14 @@ public function __invoke(DownloadDataMessage $message): void private function publishUpdate(string $topic, array $data): void { - $update = new Update($topic, json_encode($data, JSON_THROW_ON_ERROR)); - $this->mercureHub->publish($update); + try { + $update = new Update($topic, json_encode($data, JSON_THROW_ON_ERROR)); + $this->mercureHub->publish($update); + } catch (\Throwable $e) { + $this->logger->warning('Failed to publish update to Mercure. The process will continue.', [ + 'topic' => $topic, + 'error' => $e->getMessage(), + ]); + } } } From 1870ee8160e3e2bbce9b4468dcee8c46c02c1040 Mon Sep 17 00:00:00 2001 From: William Arin Date: Thu, 19 Jun 2025 19:36:35 +0800 Subject: [PATCH 5/8] feat: save partially downloaded data when a download is cancelled --- src/Domain/Data/Service/BinaryStorage.php | 78 +++++++++++++++++++ .../Data/Service/BinaryStorageInterface.php | 14 +++- src/Domain/Data/Service/OhlcvDownloader.php | 56 +++++++------ 3 files changed, 122 insertions(+), 26 deletions(-) diff --git a/src/Domain/Data/Service/BinaryStorage.php b/src/Domain/Data/Service/BinaryStorage.php index 3571fb5..86b898e 100644 --- a/src/Domain/Data/Service/BinaryStorage.php +++ b/src/Domain/Data/Service/BinaryStorage.php @@ -562,4 +562,82 @@ private function ensureDirectoryExists(string $directory): void throw new StorageException(sprintf('Directory "%s" was not created', $directory)); } } + + public function streamAndCommitRecords(string $filePath, iterable $records, int $commitInterval = 5000): int + { + $handle = @fopen($filePath, 'r+b'); + if ($handle === false) { + throw new StorageException("Could not open file '{$filePath}' for streaming write."); + } + + if (!flock($handle, LOCK_EX)) { + fclose($handle); + throw new StorageException("Could not acquire exclusive lock on '{$filePath}'."); + } + + $writtenCount = 0; + try { + // Seek to the end of the file to start appending records. + fseek($handle, 0, SEEK_END); + + foreach ($records as $record) { + $packedRecord = pack( + self::RECORD_PACK_FORMAT, + $record['timestamp'], + $record['open'], + $record['high'], + $record['low'], + $record['close'], + $record['volume'] + ); + + if (fwrite($handle, $packedRecord) !== self::RECORD_LENGTH_V1) { + throw new StorageException("Failed to write complete record to '{$filePath}'."); + } + ++$writtenCount; + + // Periodically update the record count in the header + if ($writtenCount > 0 && $writtenCount % $commitInterval === 0) { + $this->updateHeaderCountInPlace($handle, $writtenCount); + } + } + + // Final update for any remaining records + if ($writtenCount > 0) { + $this->updateHeaderCountInPlace($handle, $writtenCount); + } + } finally { + flock($handle, LOCK_UN); + fclose($handle); + } + + return $writtenCount; + } + + /** + * Updates the record count in the header of an already open and locked file handle. + */ + private function updateHeaderCountInPlace($fileHandle, int $recordCount): void + { + $currentPosition = ftell($fileHandle); + if ($currentPosition === false) { + throw new StorageException('Could not get current file position.'); + } + + if (fseek($fileHandle, 16) !== 0) { // Offset 16 is where numRecords starts + throw new StorageException('Could not seek to record count position.'); + } + + $packedCount = pack(self::UINT64_PACK_FORMAT, $recordCount); + + if (fwrite($fileHandle, $packedCount) !== 8) { // 8 bytes for uint64_t + throw new StorageException('Failed to write record count.'); + } + + // Important: flush the write to disk immediately. + fflush($fileHandle); + + // Return the file pointer to its original position to continue appending. + fseek($fileHandle, $currentPosition); + } } diff --git a/src/Domain/Data/Service/BinaryStorageInterface.php b/src/Domain/Data/Service/BinaryStorageInterface.php index 0a7b441..bfc758d 100644 --- a/src/Domain/Data/Service/BinaryStorageInterface.php +++ b/src/Domain/Data/Service/BinaryStorageInterface.php @@ -12,10 +12,6 @@ public function atomicRename(string $sourcePath, string $destinationPath): void; public function createFile(string $filePath, string $symbol, string $timeframe): void; - public function appendRecords(string $filePath, iterable $records): int; - - public function updateRecordCount(string $filePath, int $recordCount): void; - public function readHeader(string $filePath): array; public function readRecordByIndex(string $filePath, int $index): ?array; @@ -25,4 +21,14 @@ public function readRecordsSequentially(string $filePath): \Generator; public function mergeAndWrite(string $originalPath, string $newDataPath, string $outputPath): int; public function readRecordsByTimestampRange(string $filePath, int $startTimestamp, int $endTimestamp): \Generator; + + /** + * Streams records from a generator to a file, periodically updating the header's record count. + * + * @param string $filePath The path to the binary file. + * @param iterable $records The records to stream. + * @param int $commitInterval The number of records to write before updating the header. + * @return int The total number of records written. + */ + public function streamAndCommitRecords(string $filePath, iterable $records, int $commitInterval = 5000): int; } diff --git a/src/Domain/Data/Service/OhlcvDownloader.php b/src/Domain/Data/Service/OhlcvDownloader.php index d1cf9b6..64a4ba3 100644 --- a/src/Domain/Data/Service/OhlcvDownloader.php +++ b/src/Domain/Data/Service/OhlcvDownloader.php @@ -30,62 +30,73 @@ public function download( ?string $jobId = null, ): string { $finalPath = $this->generateFilePath($exchangeId, $symbol, $timeframe); + $attemptedTempFiles = []; + $exception = null; try { $rangesToDownload = $this->calculateMissingRanges($finalPath, $startTime, $endTime, $timeframe, $forceOverwrite); if (empty($rangesToDownload)) { $this->logger->info('Local data is already complete for the requested range. No download needed.', ['path' => $finalPath]); - return $finalPath; } $this->logger->info('Found {count} missing data range(s) to download.', ['count' => count($rangesToDownload)]); - $downloadedTempFiles = []; - foreach ($rangesToDownload as $i => $range) { - [$chunkStartTime, $chunkEndTime] = $range; - + $chunkStartTime = $range[0]; + $chunkEndTime = $range[1]; $tempPath = $this->binaryStorage->getTempFilePath($finalPath) . ".chunk.{$i}"; + $attemptedTempFiles[] = $tempPath; // Track that we are going to attempt to create this file. $this->cleanupFile($tempPath); try { $this->logger->info('Downloading chunk #{num}: {start} to {end}', ['num' => $i + 1, 'start' => $chunkStartTime->format('Y-m-d H:i:s'), 'end' => $chunkEndTime->format('Y-m-d H:i:s')]); $this->downloadToTemp($exchangeId, $symbol, $timeframe, $chunkStartTime, $chunkEndTime, $tempPath, $jobId); + } catch (EmptyHistoryException $e) { $this->logger->warning('Initial date {start_date} is too early. Attempting to find the earliest available data from the exchange.', ['start_date' => $chunkStartTime->format('Y-m-d H:i:s')]); $firstAvailableDate = $this->exchangeAdapter->fetchFirstAvailableTimestamp($exchangeId, $symbol, $timeframe); if ($firstAvailableDate !== null && $firstAvailableDate <= $chunkEndTime) { $this->logger->info('Found earliest data at {real_start}. Resuming download for the adjusted range.', ['real_start' => $firstAvailableDate->format('Y-m-d H:i:s')]); + // Retry the download with the adjusted start date $this->downloadToTemp($exchangeId, $symbol, $timeframe, $firstAvailableDate, $chunkEndTime, $tempPath, $jobId); } else { $this->logger->warning('Could not determine a valid start date or the earliest data is outside the requested range for {symbol}. Skipping this chunk.', ['symbol' => $symbol]); } } - - if (file_exists($tempPath) && filesize($tempPath) > 64) { - $downloadedTempFiles[] = $tempPath; + } + } catch (\Throwable $e) { + $exception = $e; // Store exception to re-throw after finally block + } finally { + // Discover any valid temp files that were created, even if the process was interrupted. + $validTempFiles = []; + foreach ($attemptedTempFiles as $path) { + if (file_exists($path) && filesize($path) > 64) { + $validTempFiles[] = $path; } } - if (empty($downloadedTempFiles) && !file_exists($finalPath)) { + if (!empty($validTempFiles)) { + $this->logger->info('Merging {count} successfully downloaded chunk(s).', ['count' => count($validTempFiles)]); + $this->mergeFiles(file_exists($finalPath) ? $finalPath : null, $validTempFiles, $finalPath); + } elseif ($exception === null && !file_exists($finalPath)) { + // Only create an empty file if nothing was downloaded AND no error occurred. $this->binaryStorage->createFile($finalPath, $symbol, $timeframe); - } elseif (!empty($downloadedTempFiles)) { - $this->mergeFiles(file_exists($finalPath) ? $finalPath : null, $downloadedTempFiles, $finalPath); } - return $finalPath; - } catch (DownloadCancelledException $e) { - throw $e; - } catch (\Throwable $e) { - $this->logger->error( - 'Download failed: {message}.', - ['message' => $e->getMessage(), 'exception' => $e] - ); - throw new DownloaderException("Download failed for {$exchangeId}/{$symbol}: {$e->getMessage()}", $e->getCode(), $e); + // If an exception was caught, re-throw it now after cleanup/merging is done. + if ($exception instanceof DownloadCancelledException) { + $this->logger->info('Download was cancelled by user. Progress has been saved.'); + throw $exception; + } elseif ($exception !== null) { + $this->logger->error('Download failed: {message}.', ['message' => $exception->getMessage(), 'exception' => $exception]); + throw new DownloaderException("Download failed for {$exchangeId}/{$symbol}: {$exception->getMessage()}", $exception->getCode(), $exception); + } } + + return $finalPath; } /** @@ -214,10 +225,11 @@ private function downloadToTemp( $this->binaryStorage->createFile($tempPath, $symbol, $timeframe); $recordsGenerator = $this->exchangeAdapter->fetchOhlcv($exchangeId, $symbol, $timeframe, $startTime, $endTime, $jobId); - $recordCount = $this->binaryStorage->appendRecords($tempPath, $recordsGenerator); + + $recordCount = $this->binaryStorage->streamAndCommitRecords($tempPath, $recordsGenerator); if ($recordCount > 0) { - $this->binaryStorage->updateRecordCount($tempPath, $recordCount); + $this->logger->info('Streamed and committed {count} records to temp file.', ['count' => $recordCount]); } } From 2719ee2f5cf6d4febca941e8c84c8ab97e25891a Mon Sep 17 00:00:00 2001 From: William Arin Date: Thu, 19 Jun 2025 19:38:23 +0800 Subject: [PATCH 6/8] fix: case sensitive binary file saving --- .../Backtesting/Service/MultiTimeframeDataService.php | 2 +- src/Domain/Data/Service/OhlcvDownloader.php | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/Domain/Backtesting/Service/MultiTimeframeDataService.php b/src/Domain/Backtesting/Service/MultiTimeframeDataService.php index 3da6956..7a182bc 100644 --- a/src/Domain/Backtesting/Service/MultiTimeframeDataService.php +++ b/src/Domain/Backtesting/Service/MultiTimeframeDataService.php @@ -70,7 +70,7 @@ private function generateFilePath(string $exchangeId, string $symbol, string $ti rtrim($this->baseDataPath, '/'), strtolower($exchangeId), strtoupper($sanitizedSymbol), - strtolower($timeframe) + $timeframe ); } } diff --git a/src/Domain/Data/Service/OhlcvDownloader.php b/src/Domain/Data/Service/OhlcvDownloader.php index 64a4ba3..9f28783 100644 --- a/src/Domain/Data/Service/OhlcvDownloader.php +++ b/src/Domain/Data/Service/OhlcvDownloader.php @@ -237,7 +237,13 @@ private function generateFilePath(string $exchangeId, string $symbol, string $ti { $sanitizedSymbol = str_replace('/', '_', $symbol); - return sprintf('%s/%s/%s/%s.stchx', rtrim($this->baseDataPath, '/'), strtolower($exchangeId), strtoupper($sanitizedSymbol), $timeframe); + return sprintf( + '%s/%s/%s/%s.stchx', + rtrim($this->baseDataPath, '/'), + strtolower($exchangeId), + strtoupper($sanitizedSymbol), + $timeframe, + ); } private function cleanupFile(string $filePath, string $reason = ''): void From b3bd0a0c89c2e65c00aeb7bba40f2deb813fb111 Mon Sep 17 00:00:00 2001 From: William Arin Date: Thu, 19 Jun 2025 20:40:48 +0800 Subject: [PATCH 7/8] fix: data saving if cancelled --- src/Domain/Data/Service/BinaryStorage.php | 45 ++++++++++++++--------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/src/Domain/Data/Service/BinaryStorage.php b/src/Domain/Data/Service/BinaryStorage.php index 86b898e..86adee1 100644 --- a/src/Domain/Data/Service/BinaryStorage.php +++ b/src/Domain/Data/Service/BinaryStorage.php @@ -580,29 +580,40 @@ public function streamAndCommitRecords(string $filePath, iterable $records, int // Seek to the end of the file to start appending records. fseek($handle, 0, SEEK_END); - foreach ($records as $record) { - $packedRecord = pack( - self::RECORD_PACK_FORMAT, - $record['timestamp'], - $record['open'], - $record['high'], - $record['low'], - $record['close'], - $record['volume'] - ); + // This inner try/catch ensures progress is saved even if the generator is interrupted. + try { + foreach ($records as $record) { + $packedRecord = pack( + self::RECORD_PACK_FORMAT, + $record['timestamp'], + $record['open'], + $record['high'], + $record['low'], + $record['close'], + $record['volume'] + ); + + if (fwrite($handle, $packedRecord) !== self::RECORD_LENGTH_V1) { + throw new StorageException("Failed to write complete record to '{$filePath}'."); + } + ++$writtenCount; - if (fwrite($handle, $packedRecord) !== self::RECORD_LENGTH_V1) { - throw new StorageException("Failed to write complete record to '{$filePath}'."); + // Periodically update the record count in the header + if ($writtenCount > 0 && $writtenCount % $commitInterval === 0) { + $this->updateHeaderCountInPlace($handle, $writtenCount); + } } - ++$writtenCount; - - // Periodically update the record count in the header - if ($writtenCount > 0 && $writtenCount % $commitInterval === 0) { + } catch (\Throwable $e) { + // The generator was interrupted (e.g., by cancellation). + // Before we stop, we must commit the progress we've made. + if ($writtenCount > 0) { $this->updateHeaderCountInPlace($handle, $writtenCount); } + // Re-throw the original exception to signal that the process was aborted. + throw $e; } - // Final update for any remaining records + // Final update if the loop completed successfully without exception. if ($writtenCount > 0) { $this->updateHeaderCountInPlace($handle, $writtenCount); } From 8fa34976d4699c68de2f2e77b316e3338243fedd Mon Sep 17 00:00:00 2001 From: William Arin Date: Thu, 19 Jun 2025 20:45:20 +0800 Subject: [PATCH 8/8] test: fix tests for OhlcvDownloader --- .../Data/Service/OhlcvDownloaderTest.php | 49 ++++++++----------- 1 file changed, 20 insertions(+), 29 deletions(-) diff --git a/tests/Domain/Data/Service/OhlcvDownloaderTest.php b/tests/Domain/Data/Service/OhlcvDownloaderTest.php index 94a8098..c2e69d6 100644 --- a/tests/Domain/Data/Service/OhlcvDownloaderTest.php +++ b/tests/Domain/Data/Service/OhlcvDownloaderTest.php @@ -39,6 +39,8 @@ protected function setUp(): void public function testDownloadForNewFile(): void { $finalPath = $this->vfsRoot->url() . '/market/binance/BTC_USDT/1h.stchx'; + $tempChunkPath = $finalPath . '.tmp.chunk.0'; + $mergedPath = $finalPath . '.merged.tmp.0'; vfsStream::create(['market' => ['binance' => ['BTC_USDT' => []]]], $this->vfsRoot); @@ -50,18 +52,15 @@ public function testDownloadForNewFile(): void $this->binaryStorageMock->expects($this->once()) ->method('createFile') - ->willReturnCallback(fn (string $path) => file_put_contents($path, str_repeat('a', 128))); + ->willReturnCallback(fn(string $path) => file_put_contents($path, str_repeat('a', 128))); - $this->binaryStorageMock->method('appendRecords')->willReturn(1); + $this->binaryStorageMock->method('streamAndCommitRecords')->willReturn(1); $this->binaryStorageMock->expects($this->exactly(2))->method('atomicRename'); $this->downloader->download( - 'binance', - 'BTC/USDT', - '1h', - new \DateTimeImmutable('2024-01-01'), - new \DateTimeImmutable('2024-01-02') + 'binance', 'BTC/USDT', '1h', + new \DateTimeImmutable('2024-01-01'), new \DateTimeImmutable('2024-01-02') ); } @@ -78,18 +77,17 @@ public function testDownloadWithForceOverwriteMergesWithExistingFile(): void $this->binaryStorageMock->method('getTempFilePath')->willReturn($tempPath); $this->binaryStorageMock->method('getMergedTempFilePath')->willReturn($mergedPath); $this->binaryStorageMock->method('createFile')->willReturnCallback(fn (string $path) => file_put_contents($path, str_repeat('b', 128))); - $this->binaryStorageMock->method('appendRecords')->willReturn(1); - $this->binaryStorageMock->expects($this->once())->method('mergeAndWrite'); + $this->binaryStorageMock->method('streamAndCommitRecords')->willReturn(1); + + $this->binaryStorageMock->expects($this->once())->method('mergeAndWrite'); $this->binaryStorageMock->expects($this->once())->method('atomicRename') ->with($this->stringEndsWith('.merged.tmp.0'), $finalPath); + $this->downloader->download( - 'binance', - 'BTC/USDT', - '1h', - new \DateTimeImmutable('2024-01-01'), - new \DateTimeImmutable('2024-01-02'), + 'binance', 'BTC/USDT', '1h', + new \DateTimeImmutable('2024-01-01'), new \DateTimeImmutable('2024-01-02'), true // Force overwrite ); } @@ -99,27 +97,23 @@ public function testDownloadFillsInternalGaps(): void $finalPath = $this->vfsRoot->url() . '/market/binance/BTC_USDT/1h.stchx'; vfsStream::create(['market' => ['binance' => ['BTC_USDT' => ['1h.stchx' => str_repeat('a', 128)]]]], $this->vfsRoot); - // --- Mocks for Gap Detection --- $this->binaryStorageMock->method('readHeader')->willReturn(['numRecords' => 2]); $this->binaryStorageMock->method('readRecordByIndex') ->willReturnOnConsecutiveCalls( - ['timestamp' => strtotime('2024-01-01 10:00:00')], // Local start - ['timestamp' => strtotime('2024-01-01 12:00:00')] // Local end (note the 1-hour gap) + ['timestamp' => strtotime('2024-01-01 10:00:00')], + ['timestamp' => strtotime('2024-01-01 12:00:00')] ); $this->binaryStorageMock->method('readRecordsSequentially') - ->willReturn((static fn () => yield from [ + ->willReturn((static fn() => yield from [ ['timestamp' => strtotime('2024-01-01 10:00:00')], ['timestamp' => strtotime('2024-01-01 12:00:00')], ])()); - // --- Mocks for Downloading the Gap --- $this->exchangeAdapterMock->method('supportsExchange')->willReturn(true); $this->exchangeAdapterMock->expects($this->once()) ->method('fetchOhlcv') ->with( - $this->anything(), - $this->anything(), - '1h', + $this->anything(), $this->anything(), '1h', $this->equalTo(new \DateTimeImmutable('2024-01-01 11:00:00')), $this->equalTo(new \DateTimeImmutable('2024-01-01 11:59:59')) ) @@ -127,18 +121,15 @@ public function testDownloadFillsInternalGaps(): void $this->binaryStorageMock->method('getTempFilePath')->willReturn($finalPath . '.tmp'); $this->binaryStorageMock->method('getMergedTempFilePath')->willReturn($finalPath . '.merged.tmp'); - $this->binaryStorageMock->method('createFile') - ->willReturnCallback(fn (string $path) => file_put_contents($path, str_repeat('a', 128))); + ->willReturnCallback(fn(string $path) => file_put_contents($path, str_repeat('a', 128))); + + $this->binaryStorageMock->method('streamAndCommitRecords')->willReturn(1); - $this->binaryStorageMock->method('appendRecords')->willReturn(1); $this->binaryStorageMock->expects($this->once())->method('mergeAndWrite'); - // --- Execute --- $this->downloader->download( - 'binance', - 'BTC/USDT', - '1h', + 'binance', 'BTC/USDT', '1h', new \DateTimeImmutable('2024-01-01 10:00:00'), new \DateTimeImmutable('2024-01-01 12:00:00'), false