diff --git a/zulip/integrations/zephyr/zephyr_mirror_backend.py b/zulip/integrations/zephyr/zephyr_mirror_backend.py index 8b5197118..9d0b575a6 100755 --- a/zulip/integrations/zephyr/zephyr_mirror_backend.py +++ b/zulip/integrations/zephyr/zephyr_mirror_backend.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -from typing import Any, Dict, IO, List, Optional, Set, Text, Tuple, cast +from typing import Any, Dict, IO, List, NoReturn, Optional, Set, Text, Tuple, cast from types import FrameType import sys @@ -17,6 +17,8 @@ import tempfile import select +from zulip import RandomExponentialBackoff + DEFAULT_SITE = "https://api.zulip.com" class States: @@ -218,32 +220,41 @@ def maybe_restart_mirroring_script() -> None: except OSError: # We don't care whether we failed to cancel subs properly, but we should log it logger.exception("") - while True: + backoff = RandomExponentialBackoff( + maximum_retries=3, + ) + while backoff.keep_going(): try: os.execvp(os.path.abspath(__file__), sys.argv) + # No need for backoff.succeed, since this can't be reached except Exception: logger.exception("Error restarting mirroring script; trying again... Traceback:") - time.sleep(1) + backoff.fail() + raise Exception("Failed to reload too many times, aborting!") -def process_loop(log: Optional[IO[Any]]) -> None: +def process_loop(log: Optional[IO[Any]]) -> NoReturn: restart_check_count = 0 last_check_time = time.time() + recieve_backoff = RandomExponentialBackoff() while True: select.select([zephyr._z.getFD()], [], [], 15) try: + process_backoff = RandomExponentialBackoff() # Fetch notices from the queue until its empty while True: notice = zephyr.receive(block=False) + recieve_backoff.succeed() if notice is None: break try: process_notice(notice, log) + process_backoff.succeed() except Exception: logger.exception("Error relaying zephyr:") - time.sleep(2) + process_backoff.fail() except Exception: logger.exception("Error checking for new zephyrs:") - time.sleep(1) + recieve_backoff.fail() continue if time.time() - last_check_time > 15: @@ -756,15 +767,16 @@ def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None: # whole process logger.exception("Error forwarding message:") -def zulip_to_zephyr(options: int) -> None: +def zulip_to_zephyr(options: int) -> NoReturn: # Sync messages from zulip to zephyr logger.info("Starting syncing messages.") + backoff = RandomExponentialBackoff(timeout_success_equivalent=120) while True: try: zulip_client.call_on_each_message(maybe_forward_to_zephyr) except Exception: logger.exception("Error syncing messages:") - time.sleep(1) + backoff.fail() def subscribed_to_mail_messages() -> bool: # In case we have lost our AFS tokens and those won't be able to @@ -1134,7 +1146,6 @@ def die_gracefully(signal: int, frame: FrameType) -> None: # Run the zulip => zephyr mirror in the child configure_logger(logger, "zulip=>zephyr") zulip_to_zephyr(options) - sys.exit(0) else: child_pid = None CURRENT_STATE = States.ZephyrToZulip diff --git a/zulip/zulip/__init__.py b/zulip/zulip/__init__.py index 94e4be07d..f61935be1 100644 --- a/zulip/zulip/__init__.py +++ b/zulip/zulip/__init__.py @@ -33,7 +33,27 @@ API_VERSTRING = "v1/" class CountingBackoff: - def __init__(self, maximum_retries: int = 10, timeout_success_equivalent: Optional[float] = None, delay_cap: float = 90.0) -> None: + def __init__( + self, + maximum_retries: int = 10, + timeout_success_equivalent: Optional[float] = None, + delay_cap: float = 90.0, + ) -> None: + """Sets up a retry-backoff object. Example usage: + backoff = zulip.CountingBackoff() + while backoff.keep_going(): + try: + something() + backoff.succeed() + except Exception: + backoff.fail() + + timeout_success_equivalent is used in cases where 'success' is + never possible to determine automatically; it sets the + threshold in seconds before the next keep_going/fail, above + which the last run is treated like it was a success. + + """ self.number_of_retries = 0 self.maximum_retries = maximum_retries self.timeout_success_equivalent = timeout_success_equivalent