@@ -66,16 +66,16 @@ $command = [
66
66
The queue system uses ` wait_for_id ` to ensure proper command ordering:
67
67
68
68
``` php
69
- // Command A
70
- $idA = $queue->add($node, "COMMAND A");
69
+ // Command A with mandatory rollback
70
+ $idA = $queue->add($node, "COMMAND A", "ROLLBACK A", $operationGroup );
71
71
72
72
// Command B waits for A to complete
73
73
$queue->setWaitForId($idA);
74
- $idB = $queue->add($node, "COMMAND B");
74
+ $idB = $queue->add($node, "COMMAND B", "ROLLBACK B", $operationGroup );
75
75
76
76
// Command C waits for B to complete
77
77
$queue->setWaitForId($idB);
78
- $idC = $queue->add($node, "COMMAND C");
78
+ $idC = $queue->add($node, "COMMAND C", "ROLLBACK C", $operationGroup );
79
79
80
80
// Reset dependencies for independent commands
81
81
$queue->resetWaitForId();
@@ -85,19 +85,19 @@ $queue->resetWaitForId();
85
85
86
86
### Adding Commands with Rollback
87
87
88
- The queue system now supports adding commands with automatic rollback:
88
+ All queue commands now require explicit rollback commands :
89
89
90
90
``` php
91
- // Add command with automatic rollback generation
92
- $queue->addWithRollback (
91
+ // Add command with explicit rollback (rollback is mandatory)
92
+ $queue->add (
93
93
$nodeId,
94
94
"CREATE TABLE users_s0 (id bigint)",
95
- null , // Auto-generate rollback
95
+ "DROP TABLE IF EXISTS users_s0" , // Explicit rollback required
96
96
$operationGroup
97
97
);
98
98
99
- // Add command with explicit rollback
100
- $queue->addWithRollback (
99
+ // Add cluster command with rollback
100
+ $queue->add (
101
101
$nodeId,
102
102
"ALTER CLUSTER c1 ADD users_s0",
103
103
"ALTER CLUSTER c1 DROP users_s0", // Explicit rollback
@@ -112,10 +112,10 @@ Related commands are grouped for atomic execution:
112
112
``` php
113
113
$operationGroup = "shard_create_users_" . uniqid();
114
114
115
- // All commands in the same group
116
- $queue->addWithRollback ($node1, $cmd1, $rollback1, $operationGroup);
117
- $queue->addWithRollback ($node2, $cmd2, $rollback2, $operationGroup);
118
- $queue->addWithRollback ($node3, $cmd3, $rollback3, $operationGroup);
115
+ // All commands in the same group (rollback required for each)
116
+ $queue->add ($node1, $cmd1, $rollback1, $operationGroup);
117
+ $queue->add ($node2, $cmd2, $rollback2, $operationGroup);
118
+ $queue->add ($node3, $cmd3, $rollback3, $operationGroup);
119
119
120
120
// On failure, rollback entire group
121
121
if ($error) {
@@ -177,39 +177,41 @@ protected function moveShardWithIntermediateCluster(
177
177
$tempClusterName = "temp_move_{$shardId}_" . uniqid();
178
178
179
179
// Step 1: Create shard table on target node
180
- $createQueueId = $queue->add($targetNode, $this->getCreateTableShardSQL($shardId));
180
+ $createQueueId = $queue->add($targetNode, $this->getCreateTableShardSQL($shardId), "DROP TABLE IF EXISTS {$shardName}" );
181
181
182
182
// Step 2: Create temporary cluster on SOURCE node (where the data IS)
183
183
// CRITICAL: Use cluster name as path to ensure uniqueness
184
184
$clusterQueueId = $queue->add(
185
185
$sourceNode,
186
- "CREATE CLUSTER {$tempClusterName} '{$tempClusterName}' as path"
186
+ "CREATE CLUSTER {$tempClusterName} '{$tempClusterName}' as path",
187
+ "DELETE CLUSTER {$tempClusterName}"
187
188
);
188
189
189
190
// Step 3: Add shard to cluster on SOURCE node FIRST (before JOIN)
190
191
$queue->setWaitForId($clusterQueueId);
191
- $queue->add($sourceNode, "ALTER CLUSTER {$tempClusterName} ADD {$shardName}");
192
+ $queue->add($sourceNode, "ALTER CLUSTER {$tempClusterName} ADD {$shardName}", "ALTER CLUSTER {$tempClusterName} DROP {$shardName}" );
192
193
193
194
// Step 4: TARGET node joins the cluster that SOURCE created
194
195
// Wait for table creation on target node to complete first
195
196
$queue->setWaitForId($createQueueId);
196
197
$joinQueueId = $queue->add(
197
198
$targetNode,
198
- "JOIN CLUSTER {$tempClusterName} AT '{$sourceNode}' '{$tempClusterName}' as path"
199
+ "JOIN CLUSTER {$tempClusterName} AT '{$sourceNode}' '{$tempClusterName}' as path",
200
+ "DELETE CLUSTER {$tempClusterName}"
199
201
);
200
202
201
203
// Step 5: CRITICAL - Wait for JOIN to complete (data is now synced)
202
204
// JOIN CLUSTER is synchronous, so once it's processed, data is fully copied
203
205
$queue->setWaitForId($joinQueueId);
204
- $dropQueueId = $queue->add($sourceNode, "ALTER CLUSTER {$tempClusterName} DROP {$shardName}");
206
+ $dropQueueId = $queue->add($sourceNode, "ALTER CLUSTER {$tempClusterName} DROP {$shardName}", "ALTER CLUSTER {$tempClusterName} ADD {$shardName}" );
205
207
206
208
// Step 6: Only after DROP from cluster, remove the table from source
207
209
$queue->setWaitForId($dropQueueId);
208
- $deleteQueueId = $queue->add($sourceNode, "DROP TABLE {$shardName}");
210
+ $deleteQueueId = $queue->add($sourceNode, "DROP TABLE {$shardName}", "" );
209
211
210
212
// Step 7: Clean up temporary cluster ONLY on SOURCE node after all operations complete
211
213
$queue->setWaitForId($deleteQueueId);
212
- return $queue->add($sourceNode, "DELETE CLUSTER {$tempClusterName}");
214
+ return $queue->add($sourceNode, "DELETE CLUSTER {$tempClusterName}", "" );
213
215
}
214
216
```
215
217
@@ -253,15 +255,15 @@ protected function handleRFNNewNodes(Queue $queue, Vector $schema, Vector $newSc
253
255
254
256
foreach ($shardsForNewNode as $shard) {
255
257
// Create shard table on new node
256
- $queue->add($newNode, $this->getCreateTableShardSQL($shard));
258
+ $queue->add($newNode, $this->getCreateTableShardSQL($shard), "DROP TABLE IF EXISTS {$shardName}" );
257
259
258
260
// Set up replication (no wait needed - parallel creation)
259
261
$existingNodes = $this->getExistingNodesForShard($schema, $shard);
260
262
$connectedNodes = $existingNodes->merge(new Set([$newNode]));
261
263
$primaryNode = $existingNodes->first();
262
264
263
265
// Use existing cluster replication mechanism
264
- $this->handleReplication($primaryNode, $queue, $connectedNodes, $clusterMap, $shard);
266
+ $this->handleReplication($primaryNode, $queue, $connectedNodes, $clusterMap, $shard, $operationGroup );
265
267
}
266
268
}
267
269
}
@@ -279,7 +281,7 @@ protected function createDistributedTablesFromSchema(Queue $queue, Vector $newSc
279
281
// Phase 1: Drop all distributed tables with proper force option
280
282
foreach ($newSchema as $row) {
281
283
$sql = "DROP TABLE IF EXISTS {$this->name} OPTION force=1";
282
- $queueId = $queue->add($row['node'], $sql);
284
+ $queueId = $queue->add($row['node'], $sql, "" );
283
285
$dropQueueIds->add($queueId);
284
286
}
285
287
@@ -297,7 +299,7 @@ protected function createDistributedTablesFromSchema(Queue $queue, Vector $newSc
297
299
}
298
300
299
301
$sql = $this->getCreateShardedTableSQLWithSchema($row['shards'], $newSchema);
300
- $queue->add($row['node'], $sql);
302
+ $queue->add($row['node'], $sql, "DROP TABLE IF EXISTS {$this->name}" );
301
303
}
302
304
303
305
// Reset wait dependencies
@@ -310,14 +312,16 @@ protected function createDistributedTablesFromSchema(Queue $queue, Vector $newSc
310
312
### Queue Command Addition
311
313
312
314
``` php
313
- public function add(string $nodeId, string $query): int {
315
+ public function add(string $nodeId, string $query, string $rollbackQuery, ?string $operationGroup = null ): int {
314
316
$queueId = $this->generateNextId();
315
317
316
318
$command = [
317
319
'id' => $queueId,
318
320
'node' => $nodeId,
319
321
'query' => $query,
322
+ 'rollback_query' => $rollbackQuery,
320
323
'wait_for_id' => $this->currentWaitForId,
324
+ 'operation_group' => $operationGroup ?? '',
321
325
'created_at' => time(),
322
326
'status' => 'pending',
323
327
];
@@ -373,14 +377,14 @@ public function process(Node $node): void {
373
377
When operations must be strictly ordered:
374
378
375
379
``` php
376
- // Pattern: Each operation waits for the previous to complete
377
- $step1Id = $queue->add($node, "STEP 1 COMMAND");
380
+ // Pattern: Each operation waits for the previous to complete (rollback required)
381
+ $step1Id = $queue->add($node, "STEP 1 COMMAND", "STEP 1 ROLLBACK" );
378
382
379
383
$queue->setWaitForId($step1Id);
380
- $step2Id = $queue->add($node, "STEP 2 COMMAND");
384
+ $step2Id = $queue->add($node, "STEP 2 COMMAND", "STEP 2 ROLLBACK" );
381
385
382
386
$queue->setWaitForId($step2Id);
383
- $step3Id = $queue->add($node, "STEP 3 COMMAND");
387
+ $step3Id = $queue->add($node, "STEP 3 COMMAND", "STEP 3 ROLLBACK" );
384
388
385
389
$queue->resetWaitForId();
386
390
```
@@ -390,35 +394,35 @@ $queue->resetWaitForId();
390
394
When some operations can run in parallel but must converge:
391
395
392
396
``` php
393
- // Phase 1: Parallel operations
397
+ // Phase 1: Parallel operations (rollback required for each)
394
398
$queue->resetWaitForId(); // Ensure no dependencies
395
- $parallel1Id = $queue->add($node1, "PARALLEL COMMAND 1");
396
- $parallel2Id = $queue->add($node2, "PARALLEL COMMAND 2");
397
- $parallel3Id = $queue->add($node3, "PARALLEL COMMAND 3");
399
+ $parallel1Id = $queue->add($node1, "PARALLEL COMMAND 1", "PARALLEL ROLLBACK 1" );
400
+ $parallel2Id = $queue->add($node2, "PARALLEL COMMAND 2", "PARALLEL ROLLBACK 2" );
401
+ $parallel3Id = $queue->add($node3, "PARALLEL COMMAND 3", "PARALLEL ROLLBACK 3" );
398
402
399
403
// Phase 2: Wait for all parallel operations to complete
400
404
$maxParallelId = max($parallel1Id, $parallel2Id, $parallel3Id);
401
405
$queue->setWaitForId($maxParallelId);
402
406
403
407
// Phase 3: Sequential operations that depend on all parallel operations
404
- $finalStepId = $queue->add($node1, "FINAL COMMAND");
408
+ $finalStepId = $queue->add($node1, "FINAL COMMAND", "FINAL ROLLBACK" );
405
409
```
406
410
407
411
### Cross-Node Synchronization
408
412
409
413
When operations on different nodes must be coordinated:
410
414
411
415
``` php
412
- // Node A prepares
413
- $prepareId = $queue->add($nodeA, "PREPARE OPERATION");
416
+ // Node A prepares (rollback required)
417
+ $prepareId = $queue->add($nodeA, "PREPARE OPERATION", "PREPARE ROLLBACK" );
414
418
415
419
// Node B waits for Node A to prepare, then joins
416
420
$queue->setWaitForId($prepareId);
417
- $joinId = $queue->add($nodeB, "JOIN OPERATION");
421
+ $joinId = $queue->add($nodeB, "JOIN OPERATION", "JOIN ROLLBACK" );
418
422
419
423
// Node A waits for Node B to join, then finalizes
420
424
$queue->setWaitForId($joinId);
421
- $finalizeId = $queue->add($nodeA, "FINALIZE OPERATION");
425
+ $finalizeId = $queue->add($nodeA, "FINALIZE OPERATION", "FINALIZE ROLLBACK" );
422
426
```
423
427
424
428
## Error Handling in Queue Operations
0 commit comments