Skip to content

Commit 995dcc0

Browse files
1: run DMF in parallel
1 parent c4aeaaa commit 995dcc0

File tree

5 files changed

+255
-21
lines changed

5 files changed

+255
-21
lines changed

solution/deps.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ sorbet:
113113
policy: sorbet/sorbet-policies
114114
dashboard: sorbet/sorbet-dashboards
115115
image: sorbet
116-
tag: v1.2.0-preview.1
116+
tag: 931169543e23666c99a20e1c679e7661ec1c50eb
117117
envsubst: SORBET_TAG
118118
stern: # tail any pod logs with pattern matchin
119119
tag: 1.30.0

tests/ctst/HOW_TO_WRITE_TESTS.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,23 @@ possible. Solutions exist:
5959
relative checks.
6060
- As a last resort, we might have a dedicated test suite.
6161

62+
### Cold Storage Tests and Parallel Execution
63+
64+
Previously, `@ColdStorage` tests were forced to run sequentially due to shared
65+
DMF volume access. This has been resolved with the following improvements:
66+
67+
- **Bucket-specific file isolation**: The sorbet mock backend now uses S3 alias
68+
naming (`/cold-data/data/s3-aliases/{bucket}-{key}-{versionId}/`) which provides
69+
perfect isolation between parallel test runs.
70+
- **Intelligent file counting**: DMF volume checks now scan only for files
71+
belonging to the specific test's bucket name.
72+
- **Per-test cleanup**: Each test cleans up only its own files, preventing
73+
interference with parallel tests.
74+
75+
This means `@ColdStorage` tests can now run with full parallelization,
76+
significantly reducing test execution time. Please follow the rules for parallel
77+
execution, if you are using `@ColdStorage` tests.
78+
6279
## 5. Focus on validating features.
6380

6481
We only want to assert against externally visible state, as given in the

tests/ctst/common/hooks.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ export const replicationLockTags = [
2727
const noParallelRun = atMostOnePicklePerTag([
2828
'@AfterAll',
2929
'@PRA',
30-
'@ColdStorage',
3130
...replicationLockTags
3231
]);
3332

tests/ctst/steps/dmf.ts

Lines changed: 111 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,110 @@
11
import { Then, Given, After } from '@cucumber/cucumber';
22
import assert from 'assert';
3-
import { execShellCommand } from 'common/utils';
43
import Zenko from 'world/Zenko';
4+
import { execInCluster } from './utils/kubernetes';
5+
import { Utils } from 'cli-testing';
56

6-
async function cleanDmfVolume() {
7-
await execShellCommand('rm -rf /cold-data/*');
7+
/**
8+
* Clean up S3 alias files for this specific bucket
9+
* @param world - The Zenko world object
10+
* @param bucketName - The name of the bucket to clean up
11+
* @returns void
12+
*/
13+
async function cleanDmfVolumeForBucket(world: Zenko, bucketName: string) {
14+
if (!bucketName) {
15+
return;
16+
}
17+
18+
const commands = [
19+
`find /cold-data/data/s3-aliases -name "${bucketName}-*" -type f -delete 2>/dev/null || true`,
20+
`find /cold-data/data/s3-aliases -name "${bucketName}-*" -type d -empty -delete 2>/dev/null || true`
21+
];
22+
23+
for (const command of commands) {
24+
await execInCluster(world, command);
25+
}
826
}
927

28+
/**
29+
* Check if the DMF volume contains the expected number of objects.
30+
* This requires sorbet mock backend with UseS3Naming=true.
31+
* Files are stored as: /cold-data/data/s3-aliases/{bucket}-{key}-{versionId}/content
32+
* This enables parallel test execution by providing bucket-level isolation
33+
* @param this - The Zenko world object
34+
* @param objectCount - The expected number of objects
35+
* @returns void
36+
*/
1037
Then('dmf volume should contain {int} objects',
11-
{ timeout: 2 * 60 * 1000 }, async (objectCount: number) => {
38+
{ timeout: 2 * 60 * 1000 }, async function (this: Zenko, objectCount: number) {
39+
const bucketName = this.getSaved<string>('bucketName');
40+
if (!bucketName) {
41+
throw new Error('bucketName not found in test context. Ensure bucket is created before this step.');
42+
}
43+
1244
let conditionOk = false;
13-
while (!conditionOk) {
14-
// Getting the number of objects inside the volume used
15-
// by the mock dmf to store transitioned objects
16-
const outStr = await execShellCommand('find /cold-data -type f | wc -l');
17-
// we store two files per object (content and manifest.json)
18-
conditionOk = Number(outStr) === objectCount * 2;
45+
let attempts = 0;
46+
const maxAttempts = 60;
47+
48+
while (!conditionOk && attempts < maxAttempts) {
49+
try {
50+
const outStr = await execInCluster(
51+
this,
52+
`find /cold-data/data/s3-aliases -name "${bucketName}-*" -type f | wc -l`
53+
);
54+
const fileCount = Number(outStr.trim());
55+
56+
// We expect 2 files per object (content + manifest.json)
57+
const expectedFileCount = objectCount * 2;
58+
conditionOk = fileCount === expectedFileCount;
59+
60+
if (!conditionOk) {
61+
this.logger.debug(`DMF volume check for bucket ${bucketName}`, {
62+
expected: expectedFileCount,
63+
found: fileCount,
64+
attempt: attempts + 1,
65+
maxAttempts
66+
});
67+
68+
if (attempts % 10 === 0) {
69+
const filesFound = await execInCluster(
70+
this,
71+
`find /cold-data/data/s3-aliases -name "${bucketName}-*" -type f 2>/dev/null`
72+
);
73+
this.logger.debug(`Files found for bucket ${bucketName}:`, { files: filesFound });
74+
}
75+
76+
await Utils.sleep(2000);
77+
attempts++;
78+
}
79+
} catch (error) {
80+
this.logger.error('Error checking DMF volume', { error, bucket: bucketName });
81+
throw error;
82+
}
83+
}
84+
85+
if (!conditionOk) {
86+
const finalCount = await execInCluster(
87+
this,
88+
`find /cold-data/data/s3-aliases -name "${bucketName}-*" -type f | wc -l`
89+
);
90+
const actualFiles = await execInCluster(
91+
this,
92+
`find /cold-data/data/s3-aliases -name "${bucketName}-*" -type f 2>/dev/null`
93+
);
94+
95+
assert.fail(
96+
`DMF volume should contain ${objectCount * 2} files for bucket ${bucketName}, ` +
97+
`but found ${finalCount.trim()} after ${attempts} attempts. ` +
98+
`Files found: ${actualFiles}`
99+
);
19100
}
20-
assert(conditionOk);
101+
102+
this.logger.debug(`DMF volume check passed for bucket ${bucketName}`, {
103+
expectedObjects: objectCount,
104+
foundFiles: objectCount * 2,
105+
attempts,
106+
maxAttempts,
107+
});
21108
});
22109

23110
Given('a flaky backend that will require {int} retries for {string}',
@@ -29,6 +116,17 @@ Given('a flaky backend that will require {int} retries for {string}',
29116
this.addToSaved('backendFlakiness', op);
30117
});
31118

32-
After({ tags: '@Dmf' }, async () => {
33-
await cleanDmfVolume();
119+
After({ tags: '@Dmf' }, async function (this: Zenko, results) {
120+
const bucketName = this.getSaved<string>('bucketName');
121+
122+
if (results.result?.status === 'FAILED') {
123+
this.logger.warn('DMF volume was not cleaned for failed test', {
124+
bucket: bucketName,
125+
reason: 'test failed - keeping files for debugging'
126+
});
127+
return;
128+
}
129+
130+
await cleanDmfVolumeForBucket(this, bucketName);
131+
this.logger.debug(`Cleaned DMF volume for bucket: ${bucketName}`);
34132
});

tests/ctst/steps/utils/kubernetes.ts

Lines changed: 126 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ export async function createJobAndWaitForCompletion(
174174
);
175175
});
176176
} catch (err: unknown) {
177-
world.logger.error('Error creating or waiting for job completion', {
177+
world.logger.debug('Error creating or waiting for job completion', {
178178
jobName,
179179
err,
180180
});
@@ -219,7 +219,7 @@ export async function createAndRunPod(
219219
resolve();
220220
} else if (phase === 'Failed') {
221221
clearTimeout(timeoutId);
222-
world.logger.error('Pod failed', {
222+
world.logger.debug('Pod failed', {
223223
podName,
224224
status: watchObj.object?.status
225225
});
@@ -248,7 +248,7 @@ export async function createAndRunPod(
248248

249249
return response.body;
250250
} catch (err: unknown) {
251-
world.logger.error('Failed to create and run pod:', { err });
251+
world.logger.debug('Failed to create and run pod:', { err });
252252
throw new Error(`Failed to create and run pod: ${err}`);
253253
}
254254
}
@@ -295,7 +295,7 @@ export async function waitForZenkoToStabilize(
295295
'zenkos',
296296
'end2end',
297297
).catch(err => {
298-
world.logger.error('Error getting Zenko CR', {
298+
world.logger.debug('Error getting Zenko CR', {
299299
err: err as unknown,
300300
});
301301
return null;
@@ -433,7 +433,7 @@ export async function displayCRStatus(world: Zenko, namespace = 'default') {
433433
'zenkos',
434434
'end2end',
435435
).catch(err => {
436-
world.logger.error('Error getting Zenko CR', {
436+
world.logger.debug('Error getting Zenko CR', {
437437
err: err as unknown,
438438
});
439439
return null;
@@ -524,7 +524,7 @@ export async function createSecret(
524524
const response = await coreClient.createNamespacedSecret(namespace, secret);
525525
return response;
526526
} catch (err) {
527-
world.logger.error('Error creating secret', {
527+
world.logger.debug('Error creating secret', {
528528
namespace,
529529
secret,
530530
err,
@@ -615,3 +615,123 @@ export async function getZenkoVersion(
615615
}
616616
}
617617

618+
/**
619+
* Execute a shell command in a pod with host volume access
620+
* Simplified to only support host path mounting for system volumes
621+
* @param world - The Zenko world object
622+
* @param command - The command to execute
623+
* @param options - The options for the command execution
624+
* @returns The output of the command
625+
*/
626+
export async function execCommandWithVolumeAccess(
627+
world: Zenko,
628+
command: string,
629+
options: {
630+
volumeMountPath?: string;
631+
hostPath?: string;
632+
image?: string;
633+
namespace?: string;
634+
timeout?: number;
635+
cleanup?: boolean;
636+
} = {}
637+
): Promise<string> {
638+
const {
639+
volumeMountPath = '/cold-data',
640+
hostPath = '/cold-data',
641+
image = 'alpine:3.22',
642+
namespace = 'default',
643+
timeout = 30000,
644+
cleanup = true,
645+
} = options;
646+
647+
// Generate unique pod name to prevent conflicts between concurrent tests
648+
const timestamp = Date.now();
649+
const randomId = Math.random().toString(36).substring(2, 8);
650+
const testContext = world.getSaved?.('bucketName') || 'test';
651+
const podName = `ctst-exec-${testContext}-${timestamp}-${randomId}`.toLowerCase();
652+
653+
const podManifest: V1Pod = {
654+
apiVersion: 'v1',
655+
kind: 'Pod',
656+
metadata: {
657+
name: podName,
658+
namespace,
659+
labels: {
660+
'app.kubernetes.io/name': 'ctst-command-executor',
661+
'app.kubernetes.io/component': 'test-utility',
662+
'ctst.test/execution-id': `${timestamp}-${randomId}`
663+
}
664+
},
665+
spec: {
666+
restartPolicy: 'Never',
667+
securityContext: {
668+
runAsNonRoot: false,
669+
fsGroup: 0
670+
},
671+
containers: [{
672+
name: 'executor',
673+
image,
674+
command: ['/bin/sh', '-c', command],
675+
securityContext: {
676+
runAsUser: 0,
677+
allowPrivilegeEscalation: false,
678+
readOnlyRootFilesystem: false,
679+
capabilities: {
680+
drop: ['ALL']
681+
}
682+
},
683+
volumeMounts: [{
684+
name: 'host-volume',
685+
mountPath: volumeMountPath
686+
}]
687+
}],
688+
volumes: [{
689+
name: 'host-volume',
690+
hostPath: {
691+
path: hostPath,
692+
type: 'DirectoryOrCreate'
693+
}
694+
}]
695+
}
696+
};
697+
698+
try {
699+
await createAndRunPod(world, podManifest, true, cleanup, timeout);
700+
701+
const coreClient = createKubeCoreClient(world);
702+
const logs = await coreClient.readNamespacedPodLog(podName, namespace);
703+
704+
return logs.body.trim();
705+
} catch (error) {
706+
world.logger.debug('Command execution failed', {
707+
command,
708+
podName,
709+
error: error instanceof Error ? error.message : String(error)
710+
});
711+
throw error;
712+
}
713+
}
714+
715+
/**
716+
* Execute command in Kubernetes cluster with host volume access
717+
* Designed for concurrent test execution without conflicts
718+
* Uses unique pod names and labels for isolation
719+
*/
720+
export async function execInCluster(
721+
world: Zenko,
722+
command: string,
723+
volumeOptions?: Parameters<typeof execCommandWithVolumeAccess>[2]
724+
): Promise<string> {
725+
world.logger.debug('Executing command in cluster', { command });
726+
727+
try {
728+
return await execCommandWithVolumeAccess(world, command, volumeOptions);
729+
} catch (error) {
730+
world.logger.debug('Kubernetes command execution failed', {
731+
command,
732+
error,
733+
});
734+
throw error;
735+
}
736+
}
737+

0 commit comments

Comments
 (0)