Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 140 additions & 15 deletions doc/sharding/01-components.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,32 +59,44 @@ public static function rebalanceWithNewNodes(Vector $schema, Set $newNodes, int

### 3. Queue Class (`src/Plugin/Sharding/Queue.php`)

Manages asynchronous command execution across cluster nodes.
Manages asynchronous command execution across cluster nodes with rollback support.

**Key Features:**
- Command queuing with dependencies
- `wait_for_id` synchronization mechanism
- Node-specific command targeting
- Parallel execution support
- **Rollback command storage** (NEW)
- **Operation group management** (NEW)
- **Automatic rollback execution** (NEW)

**Core Responsibilities:**
- Command ordering and dependencies
- Asynchronous execution management
- Inter-node communication
- Operation synchronization
- **Rollback orchestration** (NEW)
- **Group-based atomic operations** (NEW)

**Core Methods:**
- `add(string $nodeId, string $query, string $rollbackQuery, ?string $operationGroup = null)`: Add command with mandatory rollback support
- `rollbackOperationGroup(string $operationGroup)`: Execute rollback for entire operation group
- `getRollbackCommands(string $operationGroup)`: Retrieve rollback commands for group
- `executeRollbackSequence(array $commands)`: Execute rollback commands in reverse order

**Synchronization Pattern:**
```php
// Command A
$idA = $queue->add($node, "COMMAND A");
// Command A with rollback (rollback is now mandatory)
$idA = $queue->add($node, "CREATE TABLE t1", "DROP TABLE IF EXISTS t1", $group);

// Command B waits for A to complete
// Command B waits for A and has rollback
$queue->setWaitForId($idA);
$idB = $queue->add($node, "COMMAND B");
$idB = $queue->add($node, "ALTER CLUSTER c1 ADD t1", "ALTER CLUSTER c1 DROP t1", $group);

// Command C waits for B to complete
$queue->setWaitForId($idB);
$idC = $queue->add($node, "COMMAND C");
// On failure, rollback entire group
if ($error) {
$queue->rollbackOperationGroup($group);
}
```

### 4. State Class (`src/Plugin/Sharding/State.php`)
Expand Down Expand Up @@ -119,17 +131,19 @@ $rebalanceKey = "rebalance:{$tableName}";

Manages cluster topology and node communication.

**Key Features:**
- Node discovery and health monitoring
- Cluster configuration management
- Inter-node communication setup
- Active/inactive node tracking
**Key Methods:**
- `create(?Queue $queue = null, ?string $operationGroup = null)`: Create cluster with rollback support
- `addNodeIds(Queue $queue, ?string $operationGroup = null, string ...$nodeIds)`: Add nodes to cluster
- `addTables(Queue $queue, ?string $operationGroup = null, string ...$tables)`: Add tables to cluster
- `removeTables(Queue $queue, ?string $operationGroup = null, string ...$tables)`: Remove tables from cluster
- `processPendingTables(Queue $queue, ?string $operationGroup = null)`: Process pending table operations

**Core Responsibilities:**
- Cluster topology management
- Node health monitoring
- Communication channel setup
- Cluster state synchronization
- Rollback-aware cluster operations

### 6. Operator Class (`src/Plugin/Sharding/Operator.php`)

Expand Down Expand Up @@ -235,8 +249,8 @@ class TestableQueue {
// Allow null for pure mocking scenarios
}

public function add(string $nodeId, string $query): int {
return $this->queue?->add($nodeId, $query) ?? 0;
public function add(string $nodeId, string $query, string $rollbackQuery, ?string $operationGroup = null): int {
return $this->queue?->add($nodeId, $query, $rollbackQuery, $operationGroup) ?? 0;
}

// Other methods delegate similarly...
Expand All @@ -251,3 +265,114 @@ class TestableQueue {
- **Integration Testing**: Test with real dependencies where possible

This component architecture provides a robust, scalable foundation for distributed table sharding with comprehensive testing coverage and production-ready reliability.

## New Components (Recovery System)

### 7. CleanupManager Class (`src/Plugin/Sharding/CleanupManager.php`)

Manages cleanup of orphaned resources and failed operations.

**Key Features:**
- Orphaned cluster detection and removal
- Failed operation group cleanup
- Expired queue item removal
- Stale state entry cleanup

**Core Methods:**
- `performFullCleanup()`: Comprehensive cleanup of all resources
- `cleanupOrphanedTemporaryClusters()`: Remove temp clusters >1 hour old
- `cleanupFailedOperationGroups()`: Clean failed groups >24 hours old
- `cleanupExpiredQueueItems()`: Remove queue items >7 days old
- `cleanupStaleStateEntries()`: Clean state entries >30 days old

**Usage Pattern:**
```php
$cleanup = new CleanupManager($client, $cluster);
$results = $cleanup->performFullCleanup();
// Returns: ['resources_cleaned' => 42, 'actions_taken' => [...]]
```

### 8. HealthMonitor Class (`src/Plugin/Sharding/HealthMonitor.php`)

Monitors system health and performs auto-recovery.

**Key Features:**
- Comprehensive health checking
- Automatic issue detection
- Auto-recovery mechanisms
- Recommendation generation

**Health Checks:**
- Stuck rebalancing operations (>30 minutes)
- Failed operations detection
- Orphaned resource identification
- Queue depth monitoring

**Core Methods:**
- `performHealthCheck()`: Complete system health assessment
- `performAutoRecovery()`: Automatic recovery from issues
- `checkStuckOperations()`: Detect stuck rebalancing
- `checkFailedOperations()`: Find failed operations
- `checkOrphanedResources()`: Identify orphaned resources
- `checkQueueHealth()`: Monitor queue depth

**Usage Pattern:**
```php
$monitor = new HealthMonitor($client, $cluster);
$health = $monitor->performHealthCheck();

if ($health['overall_status'] !== 'healthy') {
// Automatic recovery
$recovery = $monitor->performAutoRecovery();

// Or manual intervention based on recommendations
foreach ($health['recommendations'] as $recommendation) {
echo $recommendation;
}
}
```

## Component Interaction Flows

### Rollback Flow

```
Table.shard()
├── Creates operation_group
├── Queue.add() with rollback [multiple times]
├── On failure:
│ └── Queue.rollbackOperationGroup()
│ ├── getRollbackCommands()
│ └── executeRollbackSequence()
└── On success: Complete
```

### Health Monitoring Flow

```
HealthMonitor.performHealthCheck()
├── checkStuckOperations()
├── checkFailedOperations()
├── checkOrphanedResources()
├── checkQueueHealth()
└── generateRecommendations()
└── performAutoRecovery() [if needed]
├── recoverStuckOperation()
├── recoverFailedOperation()
└── CleanupManager.performFullCleanup()
```

### Rebalancing Control Flow

```
Table.rebalance()
├── Create operation_group
├── Check stop signal
├── Execute operations
│ └── Check stop signal between steps
├── On stop signal:
│ └── rollbackOperationGroup()
└── On completion: Clear operation_group
```

The enhanced component architecture now provides automatic rollback, health monitoring, and resource cleanup capabilities, making the sharding system more robust and production-ready.
Loading