Skip to content

Commit 03e4255

Browse files
committed
Auto-restart ThetaTerminal when MDDS disconnects
1 parent b032925 commit 03e4255

File tree

2 files changed

+135
-51
lines changed

2 files changed

+135
-51
lines changed

lumibot/tools/thetadata_helper.py

Lines changed: 68 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
CONNECTION_MAX_RETRIES = 60
2727
BOOT_GRACE_PERIOD = 5.0
2828
MAX_RESTART_ATTEMPTS = 3
29+
MAX_TERMINAL_RESTART_CYCLES = 3
2930

3031

3132
def _resolve_asset_folder(asset_obj: Asset) -> str:
@@ -43,13 +44,20 @@ def _normalize_folder_component(value: str, fallback: str) -> str:
4344
THETA_DATA_PID = None
4445
THETA_DATA_LOG_HANDLE = None
4546

47+
48+
class ThetaDataConnectionError(RuntimeError):
49+
"""Raised when ThetaTerminal cannot reconnect to Theta Data after multiple restarts."""
50+
51+
pass
52+
4653
def reset_connection_diagnostics():
4754
"""Reset ThetaData connection counters (useful for tests)."""
4855
CONNECTION_DIAGNOSTICS.update({
4956
"check_connection_calls": 0,
5057
"start_terminal_calls": 0,
5158
"network_requests": 0,
5259
"placeholder_writes": 0,
60+
"terminal_restarts": 0,
5361
})
5462

5563

@@ -198,6 +206,7 @@ def reset_theta_terminal_tracking():
198206
"start_terminal_calls": 0,
199207
"network_requests": 0,
200208
"placeholder_writes": 0,
209+
"terminal_restarts": 0,
201210
}
202211

203212

@@ -1395,8 +1404,6 @@ def check_connection(username: str, password: str, wait_for_connection: bool = F
13951404

13961405
max_retries = CONNECTION_MAX_RETRIES
13971406
sleep_interval = CONNECTION_RETRY_SLEEP
1398-
restart_attempts = 0
1399-
proactive_restart_attempts = 0
14001407
client = None
14011408

14021409
def probe_status() -> Optional[str]:
@@ -1436,58 +1443,64 @@ def probe_status() -> Optional[str]:
14361443
logger.debug("ThetaTerminal running but not yet CONNECTED; waiting for status.")
14371444
return check_connection(username=username, password=password, wait_for_connection=True)
14381445

1439-
counter = 0
1440-
connected = False
1441-
1442-
while counter < max_retries:
1443-
status_text = probe_status()
1444-
if status_text == "CONNECTED":
1445-
if counter:
1446-
logger.info("ThetaTerminal connected after %s attempt(s).", counter + 1)
1447-
connected = True
1448-
break
1449-
elif status_text == "DISCONNECTED":
1450-
logger.debug("ThetaTerminal reports DISCONNECTED; will retry.")
1451-
elif status_text is not None:
1452-
logger.debug(f"ThetaTerminal returned unexpected status: {status_text}")
1446+
total_restart_cycles = 0
14531447

1454-
if not is_process_alive():
1455-
if restart_attempts >= MAX_RESTART_ATTEMPTS:
1456-
logger.error("ThetaTerminal not running after %s restart attempts.", restart_attempts)
1457-
break
1458-
restart_attempts += 1
1459-
logger.warning("ThetaTerminal process is not running (restart #%s).", restart_attempts)
1460-
client = start_theta_data_client(username=username, password=password)
1461-
time.sleep(max(BOOT_GRACE_PERIOD, sleep_interval))
1462-
counter = 0
1463-
continue
1448+
while True:
1449+
counter = 0
1450+
restart_attempts = 0
1451+
1452+
while counter < max_retries:
1453+
status_text = probe_status()
1454+
if status_text == "CONNECTED":
1455+
if counter or total_restart_cycles:
1456+
logger.info(
1457+
"ThetaTerminal connected after %s attempt(s) (restart cycles=%s).",
1458+
counter + 1,
1459+
total_restart_cycles,
1460+
)
1461+
return client, True
1462+
elif status_text == "DISCONNECTED":
1463+
logger.debug("ThetaTerminal reports DISCONNECTED; will retry.")
1464+
elif status_text is not None:
1465+
logger.debug(f"ThetaTerminal returned unexpected status: {status_text}")
1466+
1467+
if not is_process_alive():
1468+
if restart_attempts >= MAX_RESTART_ATTEMPTS:
1469+
logger.error("ThetaTerminal not running after %s restart attempts.", restart_attempts)
1470+
break
1471+
restart_attempts += 1
1472+
logger.warning("ThetaTerminal process is not running (restart #%s).", restart_attempts)
1473+
client = start_theta_data_client(username=username, password=password)
1474+
CONNECTION_DIAGNOSTICS["terminal_restarts"] = CONNECTION_DIAGNOSTICS.get("terminal_restarts", 0) + 1
1475+
time.sleep(max(BOOT_GRACE_PERIOD, sleep_interval))
1476+
counter = 0
1477+
continue
14641478

1465-
counter += 1
1466-
if counter % 10 == 0:
1467-
logger.info("Waiting for ThetaTerminal connection (attempt %s/%s).", counter, max_retries)
1468-
if counter and counter % 15 == 0:
1469-
if proactive_restart_attempts >= MAX_RESTART_ATTEMPTS:
1470-
logger.error(
1471-
"ThetaTerminal still disconnected after %s attempts; restart limit reached.",
1472-
counter,
1473-
)
1474-
break
1475-
proactive_restart_attempts += 1
1476-
logger.warning(
1477-
"ThetaTerminal disconnected for %s consecutive probes; restarting (proactive #%s).",
1478-
counter,
1479-
proactive_restart_attempts,
1479+
counter += 1
1480+
if counter % 10 == 0:
1481+
logger.info("Waiting for ThetaTerminal connection (attempt %s/%s).", counter, max_retries)
1482+
time.sleep(sleep_interval)
1483+
1484+
total_restart_cycles += 1
1485+
if total_restart_cycles > MAX_TERMINAL_RESTART_CYCLES:
1486+
logger.error(
1487+
"Unable to connect to Theta Data after %s restart cycle(s) (%s attempts each).",
1488+
MAX_TERMINAL_RESTART_CYCLES,
1489+
max_retries,
1490+
)
1491+
raise ThetaDataConnectionError(
1492+
f"Unable to connect to Theta Data after {MAX_TERMINAL_RESTART_CYCLES} restart cycle(s)."
14801493
)
1481-
client = start_theta_data_client(username=username, password=password)
1482-
time.sleep(max(BOOT_GRACE_PERIOD, sleep_interval))
1483-
counter = 0
1484-
continue
1485-
time.sleep(sleep_interval)
1486-
1487-
if not connected and counter >= max_retries:
1488-
logger.error("Cannot connect to Theta Data after %s attempts.", counter)
14891494

1490-
return client, connected
1495+
logger.warning(
1496+
"ThetaTerminal still disconnected after %s attempts; restarting (cycle %s/%s).",
1497+
max_retries,
1498+
total_restart_cycles,
1499+
MAX_TERMINAL_RESTART_CYCLES,
1500+
)
1501+
client = start_theta_data_client(username=username, password=password)
1502+
CONNECTION_DIAGNOSTICS["terminal_restarts"] = CONNECTION_DIAGNOSTICS.get("terminal_restarts", 0) + 1
1503+
time.sleep(max(BOOT_GRACE_PERIOD, sleep_interval))
14911504

14921505

14931506
def get_request(url: str, headers: dict, querystring: dict, username: str, password: str):
@@ -1542,6 +1555,7 @@ def get_request(url: str, headers: dict, querystring: dict, username: str, passw
15421555
)
15431556
restart_budget -= 1
15441557
start_theta_data_client(username=username, password=password)
1558+
CONNECTION_DIAGNOSTICS["terminal_restarts"] = CONNECTION_DIAGNOSTICS.get("terminal_restarts", 0) + 1
15451559
check_connection(username=username, password=password, wait_for_connection=True)
15461560
time.sleep(max(BOOT_GRACE_PERIOD, CONNECTION_RETRY_SLEEP))
15471561
consecutive_disconnects = 0
@@ -1588,6 +1602,9 @@ def get_request(url: str, headers: dict, querystring: dict, username: str, passw
15881602
else:
15891603
break
15901604

1605+
except ThetaDataConnectionError as exc:
1606+
logger.error("Theta Data connection failed after supervised restarts: %s", exc)
1607+
raise
15911608
except Exception as e:
15921609
logger.warning(f"Exception during request (attempt {counter + 1}): {e}")
15931610
check_connection(username=username, password=password, wait_for_connection=True)
@@ -1597,7 +1614,7 @@ def get_request(url: str, headers: dict, querystring: dict, username: str, passw
15971614

15981615
counter += 1
15991616
if counter > 1:
1600-
raise ValueError("Cannot connect to Theta Data!")
1617+
raise ThetaDataConnectionError("Unable to connect to Theta Data after repeated retries.")
16011618

16021619
# Store this page's response data
16031620
page_count += 1

tests/test_thetadata_helper.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1539,6 +1539,73 @@ def fake_get_request(url, headers, querystring, username, password):
15391539
assert "returned no expirations" in caplog.text
15401540

15411541

1542+
class TestThetaDataConnectionSupervision:
1543+
1544+
def setup_method(self):
1545+
thetadata_helper.reset_connection_diagnostics()
1546+
1547+
def test_check_connection_recovers_after_restart(self, monkeypatch):
1548+
statuses = iter(["DISCONNECTED", "DISCONNECTED", "CONNECTED"])
1549+
1550+
class FakeResponse:
1551+
def __init__(self, text):
1552+
self.text = text
1553+
1554+
def fake_get(url, timeout):
1555+
try:
1556+
text = next(statuses)
1557+
except StopIteration:
1558+
text = "CONNECTED"
1559+
return FakeResponse(text)
1560+
1561+
start_calls = []
1562+
1563+
def fake_start(username, password):
1564+
start_calls.append((username, password))
1565+
return object()
1566+
1567+
monkeypatch.setattr(thetadata_helper.requests, "get", fake_get)
1568+
monkeypatch.setattr(thetadata_helper, "start_theta_data_client", fake_start)
1569+
monkeypatch.setattr(thetadata_helper, "is_process_alive", lambda: True)
1570+
monkeypatch.setattr(thetadata_helper, "CONNECTION_MAX_RETRIES", 2, raising=False)
1571+
monkeypatch.setattr(thetadata_helper, "MAX_TERMINAL_RESTART_CYCLES", 2, raising=False)
1572+
monkeypatch.setattr(thetadata_helper, "BOOT_GRACE_PERIOD", 0, raising=False)
1573+
monkeypatch.setattr(thetadata_helper, "CONNECTION_RETRY_SLEEP", 0, raising=False)
1574+
monkeypatch.setattr(thetadata_helper.time, "sleep", lambda *args, **kwargs: None)
1575+
1576+
client, connected = thetadata_helper.check_connection("user", "pass", wait_for_connection=True)
1577+
1578+
assert connected is True
1579+
assert len(start_calls) == 1
1580+
assert thetadata_helper.CONNECTION_DIAGNOSTICS["terminal_restarts"] >= 1
1581+
1582+
def test_check_connection_raises_after_restart_cycles(self, monkeypatch):
1583+
statuses = iter(["DISCONNECTED"] * 10)
1584+
1585+
class FakeResponse:
1586+
def __init__(self, text):
1587+
self.text = text
1588+
1589+
def fake_get(url, timeout):
1590+
try:
1591+
text = next(statuses)
1592+
except StopIteration:
1593+
text = "DISCONNECTED"
1594+
return FakeResponse(text)
1595+
1596+
monkeypatch.setattr(thetadata_helper.requests, "get", fake_get)
1597+
monkeypatch.setattr(thetadata_helper, "start_theta_data_client", lambda *args, **kwargs: object())
1598+
monkeypatch.setattr(thetadata_helper, "is_process_alive", lambda: True)
1599+
monkeypatch.setattr(thetadata_helper, "CONNECTION_MAX_RETRIES", 1, raising=False)
1600+
monkeypatch.setattr(thetadata_helper, "MAX_TERMINAL_RESTART_CYCLES", 1, raising=False)
1601+
monkeypatch.setattr(thetadata_helper, "BOOT_GRACE_PERIOD", 0, raising=False)
1602+
monkeypatch.setattr(thetadata_helper, "CONNECTION_RETRY_SLEEP", 0, raising=False)
1603+
monkeypatch.setattr(thetadata_helper.time, "sleep", lambda *args, **kwargs: None)
1604+
1605+
with pytest.raises(thetadata_helper.ThetaDataConnectionError):
1606+
thetadata_helper.check_connection("user", "pass", wait_for_connection=True)
1607+
1608+
15421609
def test_finalize_day_frame_handles_dst_fallback():
15431610
tz = pytz.timezone("America/New_York")
15441611
utc = pytz.UTC

0 commit comments

Comments
 (0)