Skip to content

Commit 15f648f

Browse files
authored
Test with SLURM (#726)
* Test with SLURM * if CI * fixes * tests * update tests * check for h5py * remove openmpi tests * add time out * remove blockallocation * remove threads test * reduce to a single test * remove unused import * remove unused functions
1 parent 00d06a9 commit 15f648f

File tree

3 files changed

+170
-1
lines changed

3 files changed

+170
-1
lines changed

.github/workflows/pipeline.yml

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,39 @@ jobs:
264264
with:
265265
token: ${{ secrets.CODECOV_TOKEN }}
266266

267+
unittest_slurm_mpich:
268+
needs: [black]
269+
runs-on: ubuntu-latest
270+
services:
271+
mysql:
272+
image: mysql:8.0
273+
env:
274+
MYSQL_ROOT_PASSWORD: root
275+
ports:
276+
- "8888:3306"
277+
options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3
278+
steps:
279+
- uses: actions/checkout@v4
280+
- uses: koesterlab/setup-slurm-action@v1
281+
timeout-minutes: 5
282+
- name: Conda config
283+
shell: bash -l {0}
284+
run: echo -e "channels:\n - conda-forge\n" > .condarc
285+
- uses: conda-incubator/setup-miniconda@v3
286+
with:
287+
python-version: '3.13'
288+
miniforge-version: latest
289+
condarc-file: .condarc
290+
environment-file: .ci_support/environment-mpich.yml
291+
- name: Test
292+
shell: bash -l {0}
293+
timeout-minutes: 5
294+
run: |
295+
pip install . --no-deps --no-build-isolation
296+
cd tests
297+
python -m unittest test_slurmclusterexecutor.py
298+
python -m unittest test_slurmjobexecutor.py
299+
267300
unittest_mpich:
268301
needs: [black]
269302
runs-on: ${{ matrix.operating-system }}
@@ -400,7 +433,7 @@ jobs:
400433
GH_TOKEN: ${{secrets.GITHUB_TOKEN}}
401434

402435
uml:
403-
needs: [unittest_old, unittest_win, unittest_openmpi, unittest_mpich, unittest_flux_openmpi, unittest_flux_mpich, notebooks, benchmark, minimal, pip_check, mypy]
436+
needs: [unittest_slurm_mpich, unittest_old, unittest_win, unittest_openmpi, unittest_mpich, unittest_flux_openmpi, unittest_flux_mpich, notebooks, benchmark, minimal, pip_check, mypy]
404437
runs-on: ubuntu-latest
405438
steps:
406439
- uses: actions/checkout@v4

tests/test_slurmclusterexecutor.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import os
2+
import importlib
3+
import unittest
4+
import shutil
5+
6+
from executorlib import SlurmClusterExecutor
7+
from executorlib.standalone.serialize import cloudpickle_register
8+
9+
if shutil.which("srun") is not None:
10+
skip_slurm_test = False
11+
else:
12+
skip_slurm_test = True
13+
14+
skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None
15+
16+
try:
17+
from executorlib.task_scheduler.file.hdf import dump
18+
19+
skip_h5py_test = False
20+
except ImportError:
21+
skip_h5py_test = True
22+
23+
submission_template = """\
24+
#!/bin/bash
25+
#SBATCH --output=time.out
26+
#SBATCH --job-name={{job_name}}
27+
#SBATCH --chdir={{working_directory}}
28+
#SBATCH --get-user-env=L
29+
#SBATCH --cpus-per-task={{cores}}
30+
31+
{{command}}
32+
"""
33+
34+
35+
def mpi_funct(i):
36+
from mpi4py import MPI
37+
38+
size = MPI.COMM_WORLD.Get_size()
39+
rank = MPI.COMM_WORLD.Get_rank()
40+
return i, size, rank
41+
42+
43+
@unittest.skipIf(
44+
skip_slurm_test or skip_mpi4py_test or skip_h5py_test,
45+
"h5py or mpi4py or SLRUM are not installed, so the h5py, slurm and mpi4py tests are skipped.",
46+
)
47+
class TestCacheExecutorPysqa(unittest.TestCase):
48+
def test_executor(self):
49+
with SlurmClusterExecutor(
50+
resource_dict={"cores": 2, "cwd": "executorlib_cache", "submission_template": submission_template},
51+
block_allocation=False,
52+
cache_directory="executorlib_cache",
53+
terminate_tasks_on_shutdown=False,
54+
) as exe:
55+
cloudpickle_register(ind=1)
56+
fs1 = exe.submit(mpi_funct, 1)
57+
self.assertFalse(fs1.done())
58+
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
59+
self.assertEqual(len(os.listdir("executorlib_cache")), 3)
60+
self.assertTrue(fs1.done())
61+
62+
def test_executor_no_cwd(self):
63+
with SlurmClusterExecutor(
64+
resource_dict={"cores": 2, "submission_template": submission_template},
65+
block_allocation=False,
66+
cache_directory="executorlib_cache",
67+
terminate_tasks_on_shutdown=True,
68+
) as exe:
69+
cloudpickle_register(ind=1)
70+
fs1 = exe.submit(mpi_funct, 1)
71+
self.assertFalse(fs1.done())
72+
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
73+
self.assertEqual(len(os.listdir("executorlib_cache")), 2)
74+
self.assertTrue(fs1.done())
75+
76+
def test_executor_existing_files(self):
77+
with SlurmClusterExecutor(
78+
resource_dict={"cores": 2, "cwd": "executorlib_cache", "submission_template": submission_template},
79+
block_allocation=False,
80+
cache_directory="executorlib_cache",
81+
) as exe:
82+
cloudpickle_register(ind=1)
83+
fs1 = exe.submit(mpi_funct, 1)
84+
self.assertFalse(fs1.done())
85+
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
86+
self.assertTrue(fs1.done())
87+
self.assertEqual(len(os.listdir("executorlib_cache")), 3)
88+
for file_name in os.listdir("executorlib_cache"):
89+
file_path = os.path.join("executorlib_cache", file_name )
90+
os.remove(file_path)
91+
if ".h5" in file_path:
92+
task_key = file_path[:-5] + "_i.h5"
93+
dump(file_name=task_key, data_dict={"a": 1})
94+
95+
with SlurmClusterExecutor(
96+
resource_dict={"cores": 2, "cwd": "executorlib_cache", "submission_template": submission_template},
97+
block_allocation=False,
98+
cache_directory="executorlib_cache",
99+
) as exe:
100+
cloudpickle_register(ind=1)
101+
fs1 = exe.submit(mpi_funct, 1)
102+
self.assertFalse(fs1.done())
103+
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
104+
self.assertTrue(fs1.done())
105+
self.assertEqual(len(os.listdir("executorlib_cache")), 3)
106+
107+
def tearDown(self):
108+
shutil.rmtree("executorlib_cache", ignore_errors=True)

tests/test_slurmjobexecutor.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import shutil
2+
import unittest
3+
4+
from executorlib import SlurmJobExecutor
5+
6+
7+
if shutil.which("srun") is not None:
8+
skip_slurm_test = False
9+
else:
10+
skip_slurm_test = True
11+
12+
13+
def calc(i):
14+
return i
15+
16+
17+
@unittest.skipIf(
18+
skip_slurm_test, "Slurm is not installed, so the Slurm tests are skipped."
19+
)
20+
class TestSlurmBackend(unittest.TestCase):
21+
def test_slurm_executor_serial(self):
22+
with SlurmJobExecutor() as exe:
23+
fs_1 = exe.submit(calc, 1)
24+
fs_2 = exe.submit(calc, 2)
25+
self.assertEqual(fs_1.result(), 1)
26+
self.assertEqual(fs_2.result(), 2)
27+
self.assertTrue(fs_1.done())
28+
self.assertTrue(fs_2.done())

0 commit comments

Comments
 (0)