Source code for aiida_quantumespresso.workflows.xspectra.base

# -*- coding: utf-8 -*-
"""Workchain to run a Quantum ESPRESSO xspectra.x calculation with automated error handling and restarts."""
from aiida import orm
from aiida.common import AttributeDict
from aiida.common.lang import type_check
from aiida.engine import BaseRestartWorkChain, ProcessHandlerReport, process_handler, while_
from aiida.plugins import CalculationFactory

from aiida_quantumespresso.calculations.functions.create_kpoints_from_distance import create_kpoints_from_distance
from aiida_quantumespresso.workflows.protocols.utils import ProtocolMixin

[docs]XspectraCalculation = CalculationFactory('quantumespresso.xspectra')
[docs]class XspectraBaseWorkChain(ProtocolMixin, BaseRestartWorkChain): """Workchain to run a Quantum ESPRESSO xspectra.x calculation with automated error handling and restarts."""
[docs] _process_class = XspectraCalculation
[docs] defaults = AttributeDict({'delta_factor_time_limit': 0.90})
@classmethod
[docs] def define(cls, spec): """Define the process specification.""" super().define(spec) spec.expose_inputs(XspectraCalculation, namespace='xspectra', exclude=('kpoints',)) spec.input( 'kpoints', valid_type=orm.KpointsData, required=False, help='An explicit k-points mesh. Either this or `kpoints_distance` has to be provided.' ) spec.input( 'kpoints_distance', valid_type=orm.Float, required=False, help='The minimum desired distance in 1/Å between k-points in reciprocal space. The explicit k-points will ' 'be generated automatically by a calculation function based on the input structure.' ) spec.input( 'kpoints_force_parity', valid_type=orm.Bool, required=False, help='Optional input when constructing the k-points based on a desired `kpoints_distance`. Setting this to ' '`True` will force the k-point mesh to have an even number of points along each lattice vector except ' 'for any non-periodic directions.' ) spec.expose_outputs(XspectraCalculation) spec.outline( cls.setup, cls.validate_kpoints, while_(cls.should_run_process)( cls.prepare_process, cls.run_process, cls.inspect_process, ), cls.results, ) spec.exit_code( 202, 'ERROR_INVALID_INPUT_KPOINTS', message='Neither the `kpoints` nor the `kpoints_distance` input was specified.' ) spec.exit_code( 300, 'ERROR_UNRECOVERABLE_FAILURE', message='The calculation failed with an unrecoverable error.' )
@classmethod
[docs] def get_protocol_filepath(cls): """Return ``pathlib.Path to the ``.yaml`` file that defines the protocols.""" from importlib_resources import files from ..protocols import xspectra as xs_protocols return files(xs_protocols) / 'base.yaml'
@classmethod
[docs] def get_builder_from_protocol( cls, code, core_wfc_data, parent_folder, abs_atom_marker='X', protocol=None, overrides=None, options=None, **_ ): """Return a builder prepopulated with inputs selected according to the chosen protocol. :param code: the ``Code`` instance configured for the ``quantumespresso.xspectra`` plugin. :param core_wfc_data: a ``SinglefileData`` object for the initial-state core wavefunction, normally derived from upf2plotcore.sh, required for the xspectra.x calculation. :param parent_folder: a ``RemoteData`` object for the parent calculation (either pw.x or xspectra.x). :param abs_atom_marker: the name given to the Kind representing the absorbing atom. Matches to Kind.name for the absorbing atom in the structure, used to set the parameter `xiabs` automatically. :param protocol: protocol to use, if not specified, the default will be used. :param overrides: optional dictionary of inputs to override the defaults of the protocol. :param options: A dictionary of options that will be recursively set for the ``metadata.options`` input of all the ``CalcJobs`` that are nested in this work chain. :return: a process builder instance with all inputs defined ready for launch. """ from aiida_quantumespresso.workflows.protocols.utils import recursive_merge if isinstance(code, str): code = orm.load_code(code) type_check(code, orm.AbstractCode) inputs = cls.get_protocol_inputs(protocol, overrides) metadata = inputs['xspectra']['metadata'] parameters = inputs['xspectra']['parameters'] if options: metadata['options'] = recursive_merge(inputs['xspectra']['metadata']['options'], options) # pylint: disable=no-member builder = cls.get_builder() parent_calc = parent_folder.creator if parent_calc.process_type == 'aiida.calculations:quantumespresso.xspectra': builder.kpoints = parent_calc.inputs.kpoints elif parent_calc.process_type == 'aiida.calculations:quantumespresso.pw': structure = parent_calc.inputs.structure kinds_present = sorted([kind.name for kind in structure.kinds]) if 'kpoints' in inputs['xspectra']: builder.kpoints = inputs['xspectra']['kpoints'] else: builder.kpoints_distance = orm.Float(inputs['kpoints_distance']) builder.kpoints_force_parity = orm.Bool(inputs['kpoints_force_parity']) if abs_atom_marker not in kinds_present: raise ValueError(f'given abs_atom_marker `{abs_atom_marker}` does not match a kind in the structure.') else: parameters['INPUT_XSPECTRA']['xiabs'] = kinds_present.index(abs_atom_marker) + 1 else: raise ValueError(f'process type of `parent_folder` creator `{parent_calc.process_type}` is not supported.') builder.xspectra['code'] = code builder.xspectra['parent_folder'] = parent_folder builder.xspectra['core_wfc_data'] = core_wfc_data builder.xspectra['parameters'] = orm.Dict(parameters) builder.xspectra['metadata'] = metadata if 'settings' in inputs['xspectra']: builder.xspectra['settings'] = orm.Dict(inputs['xspectra']['settings']) builder.clean_workdir = orm.Bool(inputs['clean_workdir']) builder.max_iterations = orm.Int(inputs['max_iterations']) # pylint: enable=no-member return builder
[docs] def setup(self): """Call the `setup` of the `BaseRestartWorkChain` and then create the inputs dictionary in `self.ctx.inputs`. This `self.ctx.inputs` dictionary will be used by the `BaseRestartWorkChain` to submit the calculations in the internal loop. """ super().setup() self.ctx.restart_calc = None self.ctx.inputs = AttributeDict(self.exposed_inputs(XspectraCalculation, 'xspectra')) self.ctx.inputs.parameters = self.ctx.inputs.parameters.get_dict()
[docs] def validate_kpoints(self): """Validate the inputs related to k-points. Either an explicit `KpointsData` with given mesh, or a desired k-points distance should be specified. In the case of the latter, the `KpointsData` will be constructed for the input `StructureData` using the `create_kpoints_from_distance` calculation function. """ if all(key not in self.inputs for key in ['kpoints', 'kpoints_distance']): return self.exit_codes.ERROR_INVALID_INPUT_KPOINTS # xspectra.x can only work with a kpoints mesh, so we check that the KpointsData has a # mesh property: if 'kpoints' in self.inputs: try: self.inputs.kpoints.get_kpoints_mesh() except AttributeError as exception: raise AttributeError('XSpectra calculations cannot use an explicit kpoints list.') from exception try: kpoints = self.inputs.kpoints except AttributeError: inputs = { 'structure': self.inputs.xspectra.parent_folder.creator.inputs.structure, 'distance': self.inputs.kpoints_distance, 'force_parity': self.inputs.get('kpoints_force_parity', orm.Bool(False)), 'metadata': { 'call_link_label': 'create_kpoints_from_distance' } } kpoints = create_kpoints_from_distance(**inputs) # pylint: disable=unexpected-keyword-arg self.ctx.inputs.kpoints = kpoints
[docs] def set_max_seconds(self, max_wallclock_seconds): """Set the `max_seconds` to a fraction of `max_wallclock_seconds` option to prevent out-of-walltime problems. :param max_wallclock_seconds: the maximum wallclock time that will be set in the scheduler settings. """ max_seconds_factor = self.defaults.delta_factor_time_limit max_seconds = max_wallclock_seconds * max_seconds_factor self.ctx.inputs.parameters['INPUT_XSPECTRA']['time_limit'] = max_seconds
[docs] def prepare_process(self): """Prepare the inputs for the next calculation. If a `restart_calc` has been set in the context, its `remote_folder` will be used as the `parent_folder` input for the next calculation and the `restart_mode` is set to `restart`. """ max_wallclock_seconds = self.ctx.inputs.metadata.options.get('max_wallclock_seconds', None) if max_wallclock_seconds is not None and 'time_limit' not in self.ctx.inputs.parameters['INPUT_XSPECTRA']: self.set_max_seconds(max_wallclock_seconds) if self.ctx.restart_calc: self.ctx.inputs.parameters['INPUT_XSPECTRA']['restart_mode'] = 'restart' self.ctx.inputs.parent_folder = self.ctx.restart_calc.outputs.remote_folder
[docs] def report_error_handled(self, calculation, action): """Report an action taken for a calculation that has failed. This should be called in a registered error handler if its condition is met and an action was taken. :param calculation: the failed calculation node :param action: a string message with the action taken """ arguments = [calculation.process_label, calculation.pk, calculation.exit_status, calculation.exit_message] self.report('{}<{}> failed with exit status {}: {}'.format(*arguments)) self.report(f'Action taken: {action}')
@process_handler(priority=600)
[docs] def handle_unrecoverable_failure(self, node): """Handle calculations with an exit status below 400 which are unrecoverable, so abort the work chain.""" if node.is_failed and node.exit_status < 400: self.report_error_handled(node, 'unrecoverable error, aborting...') return ProcessHandlerReport(True, self.exit_codes.ERROR_UNRECOVERABLE_FAILURE)
@process_handler(priority=610, exit_codes=XspectraCalculation.exit_codes.ERROR_SCHEDULER_OUT_OF_WALLTIME)
[docs] def handle_scheduler_out_of_walltime(self, node): """Handle `ERROR_SCHEDULER_OUT_OF_WALLTIME` exit code: decrease the time_limit and restart from scratch.""" # Decrease `max_seconds` significantly in order to make sure that the calculation has the time to shut down # neatly before reaching the scheduler wall time and one can restart from this calculation. factor = 0.5 max_seconds = self.ctx.inputs.parameters.get('INPUT_XSPECTRA', {}).get('time_limit', None) if max_seconds is None: max_seconds = self.ctx.inputs.metadata.options.get( 'max_wallclock_seconds', None ) * self.defaults.delta_factor_time_limit max_seconds_new = max_seconds * factor self.ctx.restart_calc = node self.ctx.inputs.parameters.setdefault('INPUT_XSPECTRA', {})['restart_type'] = 'from_scratch' self.ctx.inputs.parameters.setdefault('INPUT_XSPECTRA', {})['time_limit'] = max_seconds_new action = f'reduced max_seconds from {max_seconds} to {max_seconds_new} and restarting' self.report_error_handled(node, action) return ProcessHandlerReport(True)
@process_handler(priority=580, exit_codes=XspectraCalculation.exit_codes.ERROR_OUT_OF_WALLTIME)
[docs] def handle_out_of_walltime(self, node): """Handle `ERROR_OUT_OF_WALLTIME` exit code: calculation shut down neatly and we can simply restart.""" self.ctx.restart_calc = node self.report_error_handled(node, 'simply restart from the last calculation') return ProcessHandlerReport(True)