Source code for molecular_simulations.utils.parsl_settings

#!/usr/bin/env python
import json
from abc import ABC, abstractmethod
from collections.abc import Sequence
from pathlib import Path
from typing import TypeVar

import yaml
from parsl.addresses import address_by_hostname
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.launchers import MpiExecLauncher
from parsl.providers import LocalProvider, PBSProProvider
from pydantic import BaseModel

PathLike = str | Path
_T = TypeVar('_T')


[docs] class BaseSettings(BaseModel):
[docs] def dump_yaml(self, filename: PathLike) -> None: with open(filename, mode='w') as fp: yaml.dump(json.loads(self.model_dump_json()), fp, indent=4, sort_keys=False)
[docs] @classmethod def from_yaml(cls: type[_T], filename: PathLike) -> _T: with open(filename) as fp: raw_data = yaml.safe_load(fp) return cls(**raw_data)
[docs] class BaseComputeSettings(ABC, BaseSettings): """Compute settings (HPC platform, number of GPUs, etc)."""
[docs] @abstractmethod def config_factory(self, run_dir: PathLike) -> Config: """ Create new Parsl configuration. """
[docs] class LocalSettings(BaseComputeSettings): available_accelerators: int | Sequence[str] = 4 worker_init: str = '' nodes: int = 1 retries: int = 1 label: str = 'gpu' worker_port_range: tuple[int, int] = (10000, 20000)
[docs] def config_factory(self, run_dir: PathLike) -> Config: return Config( run_dir=str(Path(run_dir) / 'runinfo'), retries=self.retries, executors=[ HighThroughputExecutor( provider=LocalProvider( nodes_per_block=self.nodes, init_blocks=1, max_blocks=1, launcher=MpiExecLauncher( bind_cmd='--cpu-bind', overrides='--depth=1 --ppn 1' ), worker_init=self.worker_init, ), label=self.label, cpu_affinity='block', available_accelerators=self.available_accelerators, worker_port_range=self.worker_port_range, ), ], )
[docs] class LocalCPUSettings(BaseComputeSettings): worker_init: str = '' nodes: int = 1 max_workers_per_node: int = 1 cores_per_worker: float = 1.0 retries: int = 1 label: str = 'cpu' worker_port_range: tuple[int, int] = (10000, 20000) available_accelerators: int | Sequence[str] = []
[docs] def config_factory(self, run_dir: PathLike) -> Config: return Config( run_dir=str(Path(run_dir) / 'runinfo'), retries=self.retries, executors=[ HighThroughputExecutor( provider=LocalProvider( nodes_per_block=self.nodes, init_blocks=1, max_blocks=1, launcher=MpiExecLauncher( bind_cmd='--cpu-bind', overrides='--depth=1 --ppn 1' ), worker_init=self.worker_init, ), label=self.label, max_workers_per_node=self.max_workers_per_node, cores_per_worker=self.cores_per_worker, worker_port_range=self.worker_port_range, ), ], )
[docs] class HeterogeneousSettings(BaseComputeSettings): worker_init: str = '' nodes: int = 1 max_workers_per_node: int = 1 cores_per_worker: float = 1.0 retries: int = 1 worker_port_range: tuple[int, int] = (10000, 20000) available_accelerators: int = 4
[docs] def config_factory(self, run_dir: PathLike) -> Config: return Config( run_dir=str(Path(run_dir) / 'runinfo'), retries=self.retries, executors=[ HighThroughputExecutor( provider=LocalProvider( nodes_per_block=self.nodes, init_blocks=1, max_blocks=1, launcher=MpiExecLauncher( bind_cmd='--cpu-bind', overrides='--depth=1 --ppn 1' ), worker_init=self.worker_init, ), label='gpu', cpu_affinity='block', max_workers_per_node=self.available_accelerators, available_accelerators=self.available_accelerators, worker_port_range=self.worker_port_range, ), HighThroughputExecutor( provider=LocalProvider( nodes_per_block=self.nodes, init_blocks=1, max_blocks=1, launcher=MpiExecLauncher( bind_cmd='--cpu-bind', overrides='--depth=1 --ppn 1' ), worker_init=self.worker_init, ), label='cpu', max_workers_per_node=self.max_workers_per_node, cores_per_worker=self.cores_per_worker, worker_port_range=self.worker_port_range, ), ], )
[docs] class PolarisSettings(BaseComputeSettings): label: str = 'htex' num_nodes: int = 1 worker_init: str = '' scheduler_options: str = '' account: str queue: str walltime: str cpus_per_node: int = 64 strategy: str = 'simple' available_accelerators: int | Sequence[str] = 4
[docs] def config_factory(self, run_dir: PathLike) -> Config: """Create a configuration suitable for running all tasks on single nodes of Polaris We will launch 4 workers per node, each pinned to a different GPU Args: num_nodes: Number of nodes to use for the MPI parallel tasks user_options: Options for which account to use, location of environment files, etc run_dir: Directory in which to store Parsl run files. Default: `runinfo` """ return Config( retries=1, # Allows restarts if jobs are killed by the end of a job executors=[ HighThroughputExecutor( label=self.label, heartbeat_period=15, heartbeat_threshold=120, worker_debug=True, available_accelerators=self.available_accelerators, address=address_by_hostname(), cpu_affinity='alternating', prefetch_capacity=0, # Increase if you have many more tasks than workers provider=PBSProProvider( launcher=MpiExecLauncher( bind_cmd='--cpu-bind', overrides='--depth=64 --ppn 1' ), # Updates to the mpiexec command account=self.account, queue=self.queue, select_options='ngpus=4', # PBS directives (header lines): for array jobs pass '-J' option scheduler_options=self.scheduler_options, worker_init=self.worker_init, nodes_per_block=self.num_nodes, init_blocks=1, min_blocks=0, max_blocks=1, # Can increase more to have more parallel jobs cpus_per_node=self.cpus_per_node, walltime=self.walltime, ), ), ], run_dir=str(run_dir), strategy=self.strategy, )
[docs] class AuroraSettings(BaseComputeSettings): label: str = 'htex' worker_init: str = '' num_nodes: int = 1 scheduler_options: str = '' account: str queue: str walltime: str retries: int = 0 cpus_per_node: int = 48 # only 4 cpus per OpenMM job strategy: str = 'simple' available_accelerators: list[str] = [str(i) for i in range(12)]
[docs] def config_factory(self, run_dir: PathLike) -> Config: """Create a Parsl configuration for running on Aurora.""" return Config( executors=[ HighThroughputExecutor( label=self.label, available_accelerators=self.available_accelerators, cpu_affinity='block', # Assigns cpus in sequential order prefetch_capacity=0, max_workers_per_node=12, cores_per_worker=16, heartbeat_period=30, heartbeat_threshold=300, worker_debug=False, provider=PBSProProvider( launcher=MpiExecLauncher( bind_cmd='--cpu-bind', overrides='--depth=208 --ppn 1' ), # Ensures 1 manger per node and allows it to divide work among all 208 threads worker_init=self.worker_init, nodes_per_block=self.num_nodes, account=self.account, queue=self.queue, walltime=self.walltime, ), ), ], run_dir=str(run_dir), retries=self.retries, )