Source code for aiida_quantumespresso.parsers.base

# -*- coding: utf-8 -*-
"""Defines a `Parser` base class for `aiida-quantumespresso`.

All `Parser` implementations in `aiida-quantumespresso` must use this base class, not `aiida.parsers.Parser`.
"""
from __future__ import annotations

import abc
import re
from typing import Optional, Tuple

from aiida.common import AttributeDict
from aiida.engine import ExitCode
from aiida.parsers import Parser

from aiida_quantumespresso.parsers.parse_raw.base import convert_qe_time_to_sec

__all__ = ('BaseParser',)


[docs]class BaseParser(Parser, metaclass=abc.ABCMeta): """Custom ``Parser`` class for ``aiida-quantumespresso`` parser implementations."""
[docs] class_error_map = {}
[docs] class_warning_map = {}
[docs] base_error_map = { 'Maximum CPU time exceeded': 'ERROR_OUT_OF_WALLTIME', }
[docs] base_warning_map = { 'Warning:': None, 'DEPRECATED:': None, }
[docs] success_string = 'JOB DONE'
@classmethod
[docs] def get_error_map(cls): """The full error map of the parser class.""" error_map = cls.base_error_map.copy() error_map.update(cls.class_error_map) return error_map
@classmethod
[docs] def get_warning_map(cls): """The full warning map of the parser class.""" warning_map = cls.base_warning_map.copy() warning_map.update(cls.class_warning_map) return warning_map
[docs] def parse_stdout_from_retrieved(self, logs: AttributeDict) -> Tuple[str, dict, AttributeDict]: """Read and parse the ``stdout`` content of a Quantum ESPRESSO calculation. :param logs: Logging container that will be updated during parsing. :returns: size 3 tuple: (``stdout`` content, parsed data, updated logs). """ filename_stdout = self.node.get_option('output_filename') if filename_stdout not in self.retrieved.base.repository.list_object_names(): logs.error.append('ERROR_OUTPUT_STDOUT_MISSING') return '', {}, logs try: with self.retrieved.open(filename_stdout, 'r') as handle: stdout = handle.read() except OSError as exception: logs.error.append('ERROR_OUTPUT_STDOUT_READ') logs.error.append(exception) return '', {}, logs try: parsed_data, logs = self._parse_stdout_base(stdout, logs) except Exception as exception: logs.error.append('ERROR_OUTPUT_STDOUT_PARSE') logs.error.append(exception) return stdout, {}, logs return stdout, parsed_data, logs
[docs] def emit_logs(self, logs: list[AttributeDict] | tuple[AttributeDict] | AttributeDict, ignore: list = None) -> None: """Emit the messages in one or multiple "log dictionaries" through the logger of the parser. A log dictionary is expected to have the following structure: each key must correspond to a log level of the python logging module, e.g. `error` or `warning` and its values must be a list of string messages. The method will loop over all log dictionaries and emit the messages it contains with the log level indicated by the key. Example log dictionary structure:: logs = { 'warning': ['Could not parse the `etot_threshold` variable from the stdout.'], 'error': ['Self-consistency was not achieved'] } :param logs: log dictionaries :param ignore: list of log messages to ignore """ ignore = ignore or [] if not isinstance(logs, (list, tuple)): logs = [logs] for logs in logs: for level, messages in logs.items(): for message in messages: stripped = message.strip() if stripped in ignore: continue getattr(self.logger, level)(stripped)
[docs] def check_base_errors(self, logs: AttributeDict) -> Optional[ExitCode]: """Check the ``logs`` for the following "basic" parsing error and return a (formatted) version: * ``ERROR_OUTPUT_STDOUT_MISSING`` * ``ERROR_OUTPUT_STDOUT_READ`` * ``ERROR_OUTPUT_STDOUT_PARSE`` These errors mean that there is no ``stdout`` to parse. The ``ERROR_OUTPUT_STDOUT_INCOMPLETE`` error is not checked here because in this case there might still be useful information in the ``stdout``. """ for exit_code in [ 'ERROR_OUTPUT_STDOUT_MISSING', ]: if exit_code in logs.error: return self.exit_codes.get(exit_code) # These exit codes have additional information that needs to be formatted in the message. for exit_code in [ 'ERROR_OUTPUT_STDOUT_READ', 'ERROR_OUTPUT_STDOUT_PARSE' ]: if exit_code in logs.error: exception = logs.error[logs.index(exit_code) + 1] return self.exit_codes.get(exit_code).format(exception=exception)
[docs] def exit(self, exit_code: ExitCode | None = None, logs: AttributeDict | None = None) -> ExitCode: """Log all messages in the ``logs`` as well as the ``exit_code`` message and return the correct exit code. This is a utility function if one wants to return from the parse method and automically add the ``logs`` and exit message associated to and exit code as a log message to the node: e.g. ``return self._exit(self.exit_codes.LABEL))`` If no ``exit_code`` is provided, the method will check if an ``exit_status`` has already been set on the node and return the corresponding ``ExitCode`` in this case. If not, ``ExitCode(0)`` is returned. :param logs: log dictionaries :param exit_code: an ``ExitCode`` :return: The correct exit code """ if logs: self.emit_logs(logs) if exit_code is not None: self.logger.error(exit_code.message) elif self.node.exit_status is not None: exit_code = ExitCode(self.node.exit_status, self.node.exit_message) else: exit_code = ExitCode(0) return exit_code
@classmethod
[docs] def _parse_stdout_base(cls, stdout: str, logs: AttributeDict) -> Tuple[dict, AttributeDict]: """Parse the ``stdout`` content of a Quantum ESPRESSO calculation. This function only checks for basic content like JOB DONE, errors with %%%%% etc, but can be overridden to parse more data from the ``stdout``. :param stdout: the stdout content as a string. :returns: tuple of two dictionaries, with the parsed data and log messages, respectively. """ parsed_data = {} if not re.search(cls.success_string, stdout): logs.error.append('ERROR_OUTPUT_STDOUT_INCOMPLETE') code_match = re.search( r'Program\s(?P<code_name>[A-Z|a-z|\_|\d]+)\sv\.(?P<code_version>[\d\.|a-z|A-Z]+)\s', stdout ) if code_match: code_name = code_match.groupdict()['code_name'] parsed_data['code_version'] = code_match.groupdict()['code_version'] wall_match = re.search(fr'{code_name}\s+:[\s\S]+CPU\s+(?P<wall_time>[\s.\d|s|m|d|h]+)\sWALL', stdout) if wall_match: try: parsed_data['wall_time_seconds'] = convert_qe_time_to_sec(wall_match.groupdict()['wall_time']) except ValueError: logs.warnings.append('Unable to convert wall time from `stdout` to seconds.') # Look for typical Quantum ESPRESSO error messages between %%%%%-lines that are not in our error map if re.search(r'\%\%\%\%\%', stdout): # Note: using e.g. `\%{5}` is significantly slower for error_message in set(re.split(r'\%\%\%\%\%\n', stdout)[1::2]): if not any(error_marker in error_message for error_marker in cls.get_error_map().keys()): logs.error.append(error_message.rstrip('\n%')) # Look for error messages in general for error_marker, error, in cls.get_error_map().items(): if re.search(fr'{error_marker}', stdout): logs.error.append(error) # Look for lines with warnings from the `warning_map` for warning_marker, warning in cls.get_warning_map().items(): for warning_message in set(re.findall(fr'({warning_marker}.+)\n', stdout)): if warning is not None: logs.warning.append(warning) else: logs.warning.append(warning_message) return parsed_data, logs