#!/usr/bin/env python
from abc import ABC, abstractmethod
import json
from parsl.config import Config
from parsl.providers import LocalProvider, PBSProProvider
from parsl.executors import HighThroughputExecutor
from parsl.launchers import MpiExecLauncher, GnuParallelLauncher
from parsl.addresses import address_by_interface, address_by_hostname
from parsl.utils import get_all_checkpoints
from pathlib import Path
from pydantic import BaseModel
from typing import List, Sequence, Tuple, Type, TypeVar, Union
import yaml
PathLike = Union[str, Path]
_T = TypeVar("_T")
[docs]
class BaseSettings(BaseModel):
[docs]
def dump_yaml(self, filename: PathLike) -> None:
with open(filename, mode="w") as fp:
yaml.dump(json.loads(self.model_dump_json()), fp, indent=4, sort_keys=False)
[docs]
@classmethod
def from_yaml(cls: Type[_T], filename: PathLike) -> _T:
with open(filename) as fp:
raw_data = yaml.safe_load(fp)
return cls(**raw_data) # type: ignore
[docs]
class BaseComputeSettings(ABC, BaseSettings):
"""Compute settings (HPC platform, number of GPUs, etc)."""
[docs]
@abstractmethod
def config_factory(self, run_dir: PathLike) -> Config:
"""
Create new Parsl configuration.
"""
[docs]
class LocalSettings(BaseComputeSettings):
available_accelerators: Union[int, Sequence[str]] = 4
worker_init: str = ''
nodes: int = 1
retries: int = 1
label: str = 'htex'
worker_port_range: Tuple[int, int] = (10000, 20000)
[docs]
def config_factory(self,
run_dir: PathLike) -> Config:
return Config(
run_dir=str(run_dir / 'runinfo'),
retries=self.retries,
executors=[
HighThroughputExecutor(
provider=LocalProvider(
nodes_per_block=self.nodes,
init_blocks=1,
max_blocks=1,
launcher=MpiExecLauncher(
bind_cmd='--cpu-bind', overrides='--depth=1 --ppn 1'
),
worker_init=self.worker_init,
),
label=self.label,
cpu_affinity="block",
available_accelerators=self.available_accelerators,
worker_port_range=self.worker_port_range,
),
],
)
[docs]
class LocalCPUSettings(BaseComputeSettings):
worker_init: str = ''
nodes: int = 1
max_workers_per_node: int = 1
cores_per_worker: float = 1.0
retries: int = 1
label: str = 'htex'
worker_port_range: Tuple[int, int] = (10000, 20000)
available_accelerators: Union[int, Sequence[str]] = []
[docs]
def config_factory(self,
run_dir: PathLike) -> Config:
return Config(
run_dir=str(run_dir / 'runinfo'),
retries=self.retries,
executors=[
HighThroughputExecutor(
provider=LocalProvider(
nodes_per_block=self.nodes,
init_blocks=1,
max_blocks=1,
launcher=MpiExecLauncher(
bind_cmd='--cpu-bind depth', overrides='--depth=1 --ppn 1'
),
worker_init=self.worker_init,
),
label=self.label,
max_workers_per_node=self.max_workers_per_node,
cores_per_worker=self.cores_per_worker,
worker_port_range=self.worker_port_range,
),
],
)
[docs]
class PolarisSettings(BaseComputeSettings):
label: str = 'htex'
num_nodes: int = 1
worker_init: str = ''
scheduler_options: str = ''
account: str
queue: str
walltime: str
cpus_per_node: int = 64
strategy: str = 'simple'
available_accelerators: Union[int, Sequence[str]] = 4
[docs]
def config_factory(self, run_dir: PathLike) -> Config:
"""Create a configuration suitable for running all tasks on single nodes of Polaris
We will launch 4 workers per node, each pinned to a different GPU
Args:
num_nodes: Number of nodes to use for the MPI parallel tasks
user_options: Options for which account to use, location of environment files, etc
run_dir: Directory in which to store Parsl run files. Default: `runinfo`
"""
return Config(
retries=1, # Allows restarts if jobs are killed by the end of a job
executors=[
HighThroughputExecutor(
label=self.label,
heartbeat_period=15,
heartbeat_threshold=120,
worker_debug=True,
available_accelerators=self.available_accelerators,
address=address_by_hostname(),
cpu_affinity="alternating",
prefetch_capacity=0, # Increase if you have many more tasks than workers
provider=PBSProProvider( # type: ignore[no-untyped-call]
launcher=MpiExecLauncher(
bind_cmd="--cpu-bind", overrides="--depth=64 --ppn 1"
), # Updates to the mpiexec command
account=self.account,
queue=self.queue,
select_options="ngpus=4",
# PBS directives (header lines): for array jobs pass '-J' option
scheduler_options=self.scheduler_options,
worker_init=self.worker_init,
nodes_per_block=self.num_nodes,
init_blocks=1,
min_blocks=0,
max_blocks=1, # Can increase more to have more parallel jobs
cpus_per_node=self.cpus_per_node,
walltime=self.walltime,
),
),
],
run_dir=str(run_dir),
strategy=self.strategy,
app_cache=True,
)
[docs]
class AuroraSettings(BaseComputeSettings):
label: str = 'htex'
worker_init: str = ""
num_nodes: int = 1
scheduler_options: str = ""
account: str
queue: str
walltime: str
retries: int = 0
cpus_per_node: int = 48 # only 4 cpus per OpenMM job
strategy: str = "simple"
available_accelerators: List[str] = [str(i) for i in range(12)]
[docs]
def config_factory(self, run_dir: PathLike) -> Config:
"""Create a Parsl configuration for running on Aurora."""
return Config(
executors=[
HighThroughputExecutor(
label=self.label,
available_accelerators=self.available_accelerators,
cpu_affinity="block", # Assigns cpus in sequential order
prefetch_capacity=0,
max_workers=12,
cores_per_worker=16,
heartbeat_period=30,
heartbeat_threshold=300,
worker_debug=False,
provider=PBSProProvider(
launcher=MpiExecLauncher(
bind_cmd="--cpu-bind",
overrides=f"--depth=208 --ppn 1"
), # Ensures 1 manger per node and allows it to divide work among all 208 threads
worker_init=self.worker_init,
nodes_per_block=self.num_nodes,
account=self.account,
queue=self.queue,
walltime=self.walltime,
),
),
],
run_dir=str(run_dir),
checkpoint_mode='task_exit',
retries=self.retries,
app_cache=True,
)