Skip to content

Adding handling of FAILING_OVER and FAILED_OVER events/push notifications #3716

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 29 commits into
base: feat/hitless-upgrade-sync-standalone
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
579d032
Handling of topology update push notifications for Standalone Redis c…
petyaslavova Jun 27, 2025
8d27a86
Adding sequence id to the maintenance push notifications. Adding unit…
petyaslavova Jul 11, 2025
32a16f0
Adding integration-like tests for migrating/migrated events handling
petyaslavova Jul 11, 2025
8bfdf13
Removed unused imports
petyaslavova Jul 11, 2025
33d7295
Revert changing of the default retry object initialization for connec…
petyaslavova Jul 11, 2025
346097f
Complete migrating/migrated integration-like tests
petyaslavova Jul 14, 2025
f3a9a71
Adding moving integration-like tests
petyaslavova Jul 15, 2025
c0438c8
Fixed BlockingConnectionPool locking strategy. Removed debug logging.…
petyaslavova Jul 17, 2025
6ca514f
Fixing linters
petyaslavova Jul 17, 2025
778abdf
Applying Copilot's comments
petyaslavova Jul 17, 2025
667109b
Fixed type annotations not compatible with older python versions
petyaslavova Jul 17, 2025
ef1742a
Add a few more tests and fix pool mock for python 3.9
petyaslavova Jul 17, 2025
7b43890
Adding maintenance state to connections. Migrating and Migrated are n…
petyaslavova Jul 18, 2025
08f1585
Refactored the tmp host address and timeout storing and the way to ap…
petyaslavova Jul 22, 2025
9a31a71
Apply review comments
petyaslavova Jul 24, 2025
602bbe9
Applying moving/moved only on connections to the same proxy.
petyaslavova Jul 26, 2025
953b41a
Applying review comments.
petyaslavova Aug 8, 2025
2210fed
Refactor to have less methods in pool classes and made some of the ex…
petyaslavova Aug 11, 2025
1427d99
Fixing lint errors
petyaslavova Aug 11, 2025
a2744f3
Fixing tests
petyaslavova Aug 11, 2025
260b34e
Fixing the docs of some of the new methods in connection pools. Handl…
petyaslavova Aug 14, 2025
4c6eb44
Applying review comments
petyaslavova Aug 15, 2025
10ded34
Adding handling of FAILING_OVER and FAILED_OVER events/push notificat…
petyaslavova Jul 24, 2025
07402d0
Merge branch 'feat/hitless-upgrade-sync-standalone' into ps_add_fail_…
petyaslavova Aug 18, 2025
b9afaf0
Fixing tests after merging with feature branch
petyaslavova Aug 18, 2025
058be2c
Fixing lint errors.
petyaslavova Aug 18, 2025
e4a8646
Update tests/test_maintenance_events_handling.py
petyaslavova Aug 18, 2025
51d24ba
Applying review comments
petyaslavova Aug 18, 2025
66c1fe0
Applying review comments
petyaslavova Aug 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 123 additions & 10 deletions redis/maintenance_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
class MaintenanceState(enum.Enum):
NONE = "none"
MOVING = "moving"
MIGRATING = "migrating"
MAINTENANCE = "maintenance"


if TYPE_CHECKING:
Expand Down Expand Up @@ -261,6 +261,105 @@ def __hash__(self) -> int:
return hash((self.__class__, self.id))


class NodeFailingOverEvent(MaintenanceEvent):
"""
Event for when a Redis cluster node is in the process of failing over.

This event is received when a node starts a failover process during
cluster maintenance operations or when handling node failures.

Args:
id (int): Unique identifier for this event
ttl (int): Time-to-live in seconds for this notification
"""

def __init__(self, id: int, ttl: int):
super().__init__(id, ttl)

def __repr__(self) -> str:
expiry_time = self.creation_time + self.ttl
remaining = max(0, expiry_time - time.monotonic())
return (
f"{self.__class__.__name__}("
f"id={self.id}, "
f"ttl={self.ttl}, "
f"creation_time={self.creation_time}, "
f"expires_at={expiry_time}, "
f"remaining={remaining:.1f}s, "
f"expired={self.is_expired()}"
f")"
)

def __eq__(self, other) -> bool:
"""
Two NodeFailingOverEvent events are considered equal if they have the same
id and are of the same type.
"""
if not isinstance(other, NodeFailingOverEvent):
return False
return self.id == other.id and type(self) is type(other)

def __hash__(self) -> int:
"""
Return a hash value for the event to allow
instances to be used in sets and as dictionary keys.

Returns:
int: Hash value based on event type and id
"""
return hash((self.__class__, self.id))


class NodeFailedOverEvent(MaintenanceEvent):
"""
Event for when a Redis cluster node has completed a failover.

This event is received when a node has finished the failover process
during cluster maintenance operations or after handling node failures.

Args:
id (int): Unique identifier for this event
"""

DEFAULT_TTL = 5

def __init__(self, id: int):
super().__init__(id, NodeFailedOverEvent.DEFAULT_TTL)

def __repr__(self) -> str:
expiry_time = self.creation_time + self.ttl
remaining = max(0, expiry_time - time.monotonic())
return (
f"{self.__class__.__name__}("
f"id={self.id}, "
f"ttl={self.ttl}, "
f"creation_time={self.creation_time}, "
f"expires_at={expiry_time}, "
f"remaining={remaining:.1f}s, "
f"expired={self.is_expired()}"
f")"
)

def __eq__(self, other) -> bool:
"""
Two NodeFailedOverEvent events are considered equal if they have the same
id and are of the same type.
"""
if not isinstance(other, NodeFailedOverEvent):
return False
return self.id == other.id and type(self) is type(other)

def __hash__(self) -> int:
"""
Return a hash value for the event to allow
instances to be used in sets and as dictionary keys.

Returns:
int: Hash value based on event type and id
"""
return hash((self.__class__, self.id))


class MaintenanceEventsConfig:
"""
Configuration class for maintenance events handling behaviour. Events are received through
Expand Down Expand Up @@ -457,40 +556,54 @@ def handle_node_moved_event(self, event: NodeMovingEvent):


class MaintenanceEventConnectionHandler:
# 1 = "starting maintenance" events, 0 = "completed maintenance" events
_EVENT_TYPES: dict[type["MaintenanceEvent"], int] = {
NodeMigratingEvent: 1,
NodeFailingOverEvent: 1,
NodeMigratedEvent: 0,
NodeFailedOverEvent: 0,
}

def __init__(
self, connection: "ConnectionInterface", config: MaintenanceEventsConfig
) -> None:
self.connection = connection
self.config = config

def handle_event(self, event: MaintenanceEvent):
if isinstance(event, NodeMigratingEvent):
return self.handle_migrating_event(event)
elif isinstance(event, NodeMigratedEvent):
return self.handle_migration_completed_event(event)
else:
# get the event type by checking its class in the _EVENT_TYPES dict
event_type = self._EVENT_TYPES.get(event.__class__, None)

if event_type is None:
logging.error(f"Unhandled event type: {event}")
return

def handle_migrating_event(self, notification: NodeMigratingEvent):
if event_type:
self.handle_maintenance_start_event(MaintenanceState.MAINTENANCE)
else:
self.handle_maintenance_completed_event()

def handle_maintenance_start_event(self, maintenance_state: MaintenanceState):
if (
self.connection.maintenance_state == MaintenanceState.MOVING
or not self.config.is_relax_timeouts_enabled()
):
return
self.connection.maintenance_state = MaintenanceState.MIGRATING

self.connection.maintenance_state = maintenance_state
self.connection.set_tmp_settings(tmp_relax_timeout=self.config.relax_timeout)
# extend the timeout for all created connections
self.connection.update_current_socket_timeout(self.config.relax_timeout)

def handle_migration_completed_event(self, notification: "NodeMigratedEvent"):
def handle_maintenance_completed_event(self):
# Only reset timeouts if state is not MOVING and relax timeouts are enabled
if (
self.connection.maintenance_state == MaintenanceState.MOVING
or not self.config.is_relax_timeouts_enabled()
):
return
self.connection.reset_tmp_settings(reset_relax_timeout=True)
# Node migration completed - reset the connection
# Maintenance completed - reset the connection
# timeouts by providing -1 as the relax timeout
self.connection.update_current_socket_timeout(-1)
self.connection.maintenance_state = MaintenanceState.NONE
Loading