Source code for aiida_quantumespresso.parsers.open_grid

# -*- coding: utf-8 -*-
from typing import Tuple

from aiida.common import AttributeDict
from aiida.orm import Dict, KpointsData

from aiida_quantumespresso.parsers.base import BaseParser
from aiida_quantumespresso.utils.mapping import get_logging_container


[docs]class OpenGridParser(BaseParser): """``Parser`` implementation for the ``OpenGridCalculation`` calculation job class."""
[docs] class_error_map = { 'incompatible FFT grid': 'ERROR_INCOMPATIBLE_FFT_GRID' }
[docs] def parse(self, **kwargs): """Parse the retrieved files of a completed ``OpenGridCalculation`` into output nodes.""" logs = get_logging_container() stdout, parsed_data, logs = self.parse_stdout_from_retrieved(logs) base_exit_code = self.check_base_errors(logs) if base_exit_code: return self.exit(base_exit_code, logs) self.out('output_parameters', Dict(parsed_data)) for exit_code in self.get_error_map().values(): if exit_code in logs.error: return self.exit(self.exit_codes.get(exit_code), logs) kpoints_mesh, kpoints, logs = self.parse_kpoints(stdout, logs) self.out('kpoints_mesh', kpoints_mesh) self.out('kpoints', kpoints) for exit_code in ['ERROR_OUTPUT_KPOINTS_MISMATCH', 'ERROR_OUTPUT_STDOUT_INCOMPLETE']: if exit_code in logs.error: return self.exit(self.exit_codes.get(exit_code), logs) return self.exit(logs=logs)
@staticmethod
[docs] def parse_kpoints(stdout: str, logs: AttributeDict) -> Tuple[KpointsData, KpointsData, AttributeDict]: """Parse the ``stdout`` for the mesh and explicit list of kpoints.""" lines = stdout.split('\n') kpoints = [] weights = [] found_kpoints = False for line in lines: if 'EXX: q-point mesh:' in line: kmesh = [int(i) for i in line.strip().split()[-3:]] if 'List to be put in the .win file of wannier90' in line: found_kpoints = True continue if found_kpoints: line = line.strip() if line != '': line = [float(i) for i in line.split()] kpoints.append(line[:-1]) weights.append(line[-1]) else: found_kpoints = False kpoints_mesh = KpointsData() kpoints_mesh.set_kpoints_mesh(kmesh) kpoints_list = KpointsData() kpoints_list.set_kpoints(kpoints, cartesian=False, weights=weights) if kmesh[0] * kmesh[1] * kmesh[2] != len(kpoints): logs.error.append('ERROR_OUTPUT_KPOINTS_MISMATCH') return kpoints_mesh, kpoints_list, logs