#!/usr/bin/env python
import json
from abc import ABC, abstractmethod
from collections.abc import Sequence
from pathlib import Path
from typing import TypeVar
import yaml
from parsl.addresses import address_by_hostname
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.launchers import MpiExecLauncher
from parsl.providers import LocalProvider, PBSProProvider
from pydantic import BaseModel
PathLike = str | Path
_T = TypeVar('_T')
[docs]
class BaseSettings(BaseModel):
[docs]
def dump_yaml(self, filename: PathLike) -> None:
with open(filename, mode='w') as fp:
yaml.dump(json.loads(self.model_dump_json()), fp, indent=4, sort_keys=False)
[docs]
@classmethod
def from_yaml(cls: type[_T], filename: PathLike) -> _T:
with open(filename) as fp:
raw_data = yaml.safe_load(fp)
return cls(**raw_data)
[docs]
class BaseComputeSettings(ABC, BaseSettings):
"""Compute settings (HPC platform, number of GPUs, etc)."""
[docs]
@abstractmethod
def config_factory(self, run_dir: PathLike) -> Config:
"""
Create new Parsl configuration.
"""
[docs]
class LocalSettings(BaseComputeSettings):
available_accelerators: int | Sequence[str] = 4
worker_init: str = ''
nodes: int = 1
retries: int = 1
label: str = 'gpu'
worker_port_range: tuple[int, int] = (10000, 20000)
[docs]
def config_factory(self, run_dir: PathLike) -> Config:
return Config(
run_dir=str(Path(run_dir) / 'runinfo'),
retries=self.retries,
executors=[
HighThroughputExecutor(
provider=LocalProvider(
nodes_per_block=self.nodes,
init_blocks=1,
max_blocks=1,
launcher=MpiExecLauncher(
bind_cmd='--cpu-bind', overrides='--depth=1 --ppn 1'
),
worker_init=self.worker_init,
),
label=self.label,
cpu_affinity='block',
available_accelerators=self.available_accelerators,
worker_port_range=self.worker_port_range,
),
],
)
[docs]
class LocalCPUSettings(BaseComputeSettings):
worker_init: str = ''
nodes: int = 1
max_workers_per_node: int = 1
cores_per_worker: float = 1.0
retries: int = 1
label: str = 'cpu'
worker_port_range: tuple[int, int] = (10000, 20000)
available_accelerators: int | Sequence[str] = []
[docs]
def config_factory(self, run_dir: PathLike) -> Config:
return Config(
run_dir=str(Path(run_dir) / 'runinfo'),
retries=self.retries,
executors=[
HighThroughputExecutor(
provider=LocalProvider(
nodes_per_block=self.nodes,
init_blocks=1,
max_blocks=1,
launcher=MpiExecLauncher(
bind_cmd='--cpu-bind', overrides='--depth=1 --ppn 1'
),
worker_init=self.worker_init,
),
label=self.label,
max_workers_per_node=self.max_workers_per_node,
cores_per_worker=self.cores_per_worker,
worker_port_range=self.worker_port_range,
),
],
)
[docs]
class HeterogeneousSettings(BaseComputeSettings):
worker_init: str = ''
nodes: int = 1
max_workers_per_node: int = 1
cores_per_worker: float = 1.0
retries: int = 1
worker_port_range: tuple[int, int] = (10000, 20000)
available_accelerators: int = 4
[docs]
def config_factory(self, run_dir: PathLike) -> Config:
return Config(
run_dir=str(Path(run_dir) / 'runinfo'),
retries=self.retries,
executors=[
HighThroughputExecutor(
provider=LocalProvider(
nodes_per_block=self.nodes,
init_blocks=1,
max_blocks=1,
launcher=MpiExecLauncher(
bind_cmd='--cpu-bind', overrides='--depth=1 --ppn 1'
),
worker_init=self.worker_init,
),
label='gpu',
cpu_affinity='block',
max_workers_per_node=self.available_accelerators,
available_accelerators=self.available_accelerators,
worker_port_range=self.worker_port_range,
),
HighThroughputExecutor(
provider=LocalProvider(
nodes_per_block=self.nodes,
init_blocks=1,
max_blocks=1,
launcher=MpiExecLauncher(
bind_cmd='--cpu-bind', overrides='--depth=1 --ppn 1'
),
worker_init=self.worker_init,
),
label='cpu',
max_workers_per_node=self.max_workers_per_node,
cores_per_worker=self.cores_per_worker,
worker_port_range=self.worker_port_range,
),
],
)
[docs]
class PolarisSettings(BaseComputeSettings):
label: str = 'htex'
num_nodes: int = 1
worker_init: str = ''
scheduler_options: str = ''
account: str
queue: str
walltime: str
cpus_per_node: int = 64
strategy: str = 'simple'
available_accelerators: int | Sequence[str] = 4
[docs]
def config_factory(self, run_dir: PathLike) -> Config:
"""Create a configuration suitable for running all tasks on single nodes of Polaris
We will launch 4 workers per node, each pinned to a different GPU
Args:
num_nodes: Number of nodes to use for the MPI parallel tasks
user_options: Options for which account to use, location of environment files, etc
run_dir: Directory in which to store Parsl run files. Default: `runinfo`
"""
return Config(
retries=1, # Allows restarts if jobs are killed by the end of a job
executors=[
HighThroughputExecutor(
label=self.label,
heartbeat_period=15,
heartbeat_threshold=120,
worker_debug=True,
available_accelerators=self.available_accelerators,
address=address_by_hostname(),
cpu_affinity='alternating',
prefetch_capacity=0, # Increase if you have many more tasks than workers
provider=PBSProProvider(
launcher=MpiExecLauncher(
bind_cmd='--cpu-bind', overrides='--depth=64 --ppn 1'
), # Updates to the mpiexec command
account=self.account,
queue=self.queue,
select_options='ngpus=4',
# PBS directives (header lines): for array jobs pass '-J' option
scheduler_options=self.scheduler_options,
worker_init=self.worker_init,
nodes_per_block=self.num_nodes,
init_blocks=1,
min_blocks=0,
max_blocks=1, # Can increase more to have more parallel jobs
cpus_per_node=self.cpus_per_node,
walltime=self.walltime,
),
),
],
run_dir=str(run_dir),
strategy=self.strategy,
)
[docs]
class AuroraSettings(BaseComputeSettings):
label: str = 'htex'
worker_init: str = ''
num_nodes: int = 1
scheduler_options: str = ''
account: str
queue: str
walltime: str
retries: int = 0
cpus_per_node: int = 48 # only 4 cpus per OpenMM job
strategy: str = 'simple'
available_accelerators: list[str] = [str(i) for i in range(12)]
[docs]
def config_factory(self, run_dir: PathLike) -> Config:
"""Create a Parsl configuration for running on Aurora."""
return Config(
executors=[
HighThroughputExecutor(
label=self.label,
available_accelerators=self.available_accelerators,
cpu_affinity='block', # Assigns cpus in sequential order
prefetch_capacity=0,
max_workers_per_node=12,
cores_per_worker=16,
heartbeat_period=30,
heartbeat_threshold=300,
worker_debug=False,
provider=PBSProProvider(
launcher=MpiExecLauncher(
bind_cmd='--cpu-bind', overrides='--depth=208 --ppn 1'
), # Ensures 1 manger per node and allows it to divide work among all 208 threads
worker_init=self.worker_init,
nodes_per_block=self.num_nodes,
account=self.account,
queue=self.queue,
walltime=self.walltime,
),
),
],
run_dir=str(run_dir),
retries=self.retries,
)