Skip to content

Commit 7b69b1f

Browse files
committed
ENH: Use joblib.Parallel for Backtest.optimize(method='grid')
Reduce memory use and improve parallel support at least on Windows.
1 parent 2b18a06 commit 7b69b1f

File tree

3 files changed

+16
-57
lines changed

3 files changed

+16
-57
lines changed

backtesting/backtesting.py

Lines changed: 14 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,9 @@
88

99
from __future__ import annotations
1010

11-
import multiprocessing as mp
12-
import os
1311
import sys
1412
import warnings
1513
from abc import ABCMeta, abstractmethod
16-
from concurrent.futures import ProcessPoolExecutor, as_completed
1714
from copy import copy
1815
from functools import lru_cache, partial
1916
from itertools import chain, product, repeat
@@ -23,6 +20,7 @@
2320

2421
import numpy as np
2522
import pandas as pd
23+
from joblib import Parallel, delayed
2624
from numpy.random import default_rng
2725

2826
try:
@@ -1497,41 +1495,15 @@ def _optimize_grid() -> Union[pd.Series, Tuple[pd.Series, pd.Series]]:
14971495
[p.values() for p in param_combos],
14981496
names=next(iter(param_combos)).keys()))
14991497

1500-
def _batch(seq):
1501-
n = np.clip(int(len(seq) // (os.cpu_count() or 1)), 1, 300)
1502-
for i in range(0, len(seq), n):
1503-
yield seq[i:i + n]
1504-
1505-
# Save necessary objects into "global" state; pass into concurrent executor
1506-
# (and thus pickle) nothing but two numbers; receive nothing but numbers.
1507-
# With start method "fork", children processes will inherit parent address space
1508-
# in a copy-on-write manner, achieving better performance/RAM benefit.
1509-
backtest_uuid = np.random.random()
1510-
param_batches = list(_batch(param_combos))
1511-
Backtest._mp_backtests[backtest_uuid] = (self, param_batches, maximize)
1512-
try:
1513-
# If multiprocessing start method is 'fork' (i.e. on POSIX), use
1514-
# a pool of processes to compute results in parallel.
1515-
# Otherwise (i.e. on Windos), sequential computation will be "faster".
1516-
if mp.get_start_method(allow_none=False) == 'fork':
1517-
with ProcessPoolExecutor() as executor:
1518-
futures = [executor.submit(Backtest._mp_task, backtest_uuid, i)
1519-
for i in range(len(param_batches))]
1520-
for future in _tqdm(as_completed(futures), total=len(futures),
1521-
desc='Backtest.optimize'):
1522-
batch_index, values = future.result()
1523-
for value, params in zip(values, param_batches[batch_index]):
1524-
heatmap[tuple(params.values())] = value
1525-
else:
1526-
if os.name == 'posix':
1527-
warnings.warn("For multiprocessing support in `Backtest.optimize()` "
1528-
"set multiprocessing start method to 'fork'.")
1529-
for batch_index in _tqdm(range(len(param_batches))):
1530-
_, values = Backtest._mp_task(backtest_uuid, batch_index)
1531-
for value, params in zip(values, param_batches[batch_index]):
1532-
heatmap[tuple(params.values())] = value
1533-
finally:
1534-
del Backtest._mp_backtests[backtest_uuid]
1498+
with Parallel(prefer='threads', require='sharedmem', max_nbytes='50M',
1499+
n_jobs=-2, return_as='generator') as parallel:
1500+
results = _tqdm(
1501+
parallel(delayed(self._mp_task)(self, params, maximize=maximize)
1502+
for params in param_combos),
1503+
total=len(param_combos),
1504+
desc='Backtest.optimize')
1505+
for value, params in zip(results, param_combos):
1506+
heatmap[tuple(params.values())] = value
15351507

15361508
if pd.isnull(heatmap).all():
15371509
# No trade was made in any of the runs. Just make a random
@@ -1580,7 +1552,7 @@ def memoized_run(tup):
15801552
stats = self.run(**dict(tup))
15811553
return -maximize(stats)
15821554

1583-
progress = iter(_tqdm(repeat(None), total=max_tries, leave=False, desc='Backtest.optimize'))
1555+
progress = iter(_tqdm(repeat(None), total=max_tries, desc='Backtest.optimize'))
15841556
_names = tuple(kwargs.keys())
15851557

15861558
def objective_function(x):
@@ -1625,11 +1597,9 @@ def cons(x):
16251597
return output
16261598

16271599
@staticmethod
1628-
def _mp_task(backtest_uuid, batch_index):
1629-
bt, param_batches, maximize_func = Backtest._mp_backtests[backtest_uuid]
1630-
return batch_index, [maximize_func(stats) if stats['# Trades'] else np.nan
1631-
for stats in (bt.run(**params)
1632-
for params in param_batches[batch_index])]
1600+
def _mp_task(bt, params, *, maximize):
1601+
stats = bt.run(**params)
1602+
return maximize(stats) if stats['# Trades'] else np.nan
16331603

16341604
_mp_backtests: Dict[float, Tuple['Backtest', List, Callable]] = {}
16351605

backtesting/test/_test.py

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -621,18 +621,6 @@ def test_max_tries(self):
621621
**OPT_PARAMS)
622622
self.assertEqual(len(heatmap), 6)
623623

624-
def test_multiprocessing_windows_spawn(self):
625-
df = GOOG.iloc[:100]
626-
kw = {'fast': [10]}
627-
628-
stats1 = Backtest(df, SmaCross).optimize(**kw)
629-
with patch('multiprocessing.get_start_method', lambda **_: 'spawn'):
630-
with self.assertWarns(UserWarning) as cm:
631-
stats2 = Backtest(df, SmaCross).optimize(**kw)
632-
633-
self.assertIn('multiprocessing support', cm.warning.args[0])
634-
assert stats1.filter(chars := tuple('[^_]')).equals(stats2.filter(chars)), (stats1, stats2)
635-
636624
def test_optimize_invalid_param(self):
637625
bt = Backtest(GOOG.iloc[:100], SmaCross)
638626
self.assertRaises(AttributeError, bt.optimize, foo=range(3))
@@ -648,7 +636,7 @@ def test_optimize_speed(self):
648636
start = time.process_time()
649637
bt.optimize(fast=(2, 5, 7), slow=[10, 15, 20, 30])
650638
end = time.process_time()
651-
self.assertLess(end - start, .2)
639+
self.assertLess(end - start, 1)
652640

653641

654642
class TestPlot(TestCase):

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
'numpy >= 1.17.0',
3535
'pandas >= 0.25.0, != 0.25.0',
3636
'bokeh >= 1.4.0, != 3.0.*',
37+
'joblib',
3738
],
3839
extras_require={
3940
'doc': [

0 commit comments

Comments
 (0)