Note
Go to the end to download the full example code.
Run an OpenFOAM parameter study
from pathlib import Path
import time
from remote_run.run import (
ExecutionContext,
SlurmSchedulingEngine,
GitProject,
GuixRunner,
remote,
is_finished,
SshExecution,
)
from pyopenfoam.type_conversion import OFMultiValue
from pyopenfoam.tutorials import generate_pitz_daily_case
define execution variables
num_cpus = 5
create an OpenFOAM case study locally we are going to do a parameter study of the inlet velocity
study_path = Path("study_pitz_daily")
velocities_x = range(8, 12)
array_task_ids = range(len(velocities_x))
define a case path for each array task id
case_paths_lookup = {
array_task_id: study_path / str(array_task_id) for array_task_id in array_task_ids
}
Define an execution context with a scheduling engine. For slurm you can pass slurm parameters directly here.
execution_context = ExecutionContext(
execution=SshExecution(
machine="shpc0003.ost.ch",
working_directory=Path("/cluster/raid/home/reza.housseini"),
),
num_cpus=num_cpus,
project=GitProject(),
runner=GuixRunner(
dependencies=[
"python-pyvista",
"python-pint",
"python-pyopenfoam",
"openfoam-org",
"openmpi",
],
channels=Path("channels.scm").read_text(),
),
scheduling_engine=SlurmSchedulingEngine(
job_name="openfoam_sim",
mail_type="ALL",
mail_user="reza.housseini@ost.ch",
array=array_task_ids,
),
)
define simulation variables
duration = 0.05 # seconds
then adjust the inlet velocity and generate the cases
for velocity_x, array_task_id in zip(velocities_x, array_task_ids):
pitz_daily_case = generate_pitz_daily_case(duration=duration, num_cpus=num_cpus)
pitz_daily_case.files["U"].contents.update(
{
"boundaryField": {
"inlet": {
"type": "mappedInternalValue",
"interpolationScheme": "cell",
"average": [velocity_x, 0, 0],
"value": OFMultiValue(("uniform", [velocity_x, 0, 0])),
},
"outlet": {
"type": "inletOutlet",
"inletValue": OFMultiValue(("uniform", [0, 0, 0])),
"value": OFMultiValue(("uniform", [0, 0, 0])),
},
"upperWall": {
"type": "fixedValue",
"value": OFMultiValue(("uniform", [0, 0, 0])),
},
"lowerWall": {
"type": "fixedValue",
"value": OFMultiValue(("uniform", [0, 0, 0])),
},
"frontAndBack": {"type": "empty"},
}
}
)
pitz_daily_case.write(folder=case_paths_lookup[array_task_id])
decorate your functions you want to run in the specified execution context, in this case we want to execute our OpenFOAM case in parallel and return the field “alpha.liquid”
@remote(execution_context)
def sim_job(run_path=None):
# call import statements here for modules needed remotely
from pyopenfoam.cli.commands import run_parallel
import os
array_task_id = int(os.getenv("SLURM_ARRAY_TASK_ID"))
remote_case_path = run_path / case_paths_lookup[array_task_id]
run_parallel(
remote_case_path,
num_cpus=num_cpus,
reconstruct_fields=["U", "p"],
)
this call will run on the remote machine specified in execution_context but due to the asynchronous nature of scheduling engines this will not return the result, instead you get the job id and a function to retrieve the result later.
job_id, result_func = sim_job(remote=None)
now we wait for the remote execution to finish before retrieving the result normally this step is decoupled when using a scheduler.
time.sleep(10)
we should check if the job id has finished before retrieving the result
if is_finished(execution_context, job_id):
result = result_func()