Source code for aiida_quantumespresso.parsers.xspectra

# -*- coding: utf-8 -*-
import re
from typing import Tuple

from aiida.common import AttributeDict
from aiida.orm import Dict, XyData
import numpy as np

from aiida_quantumespresso.parsers import QEOutputParsingError
from aiida_quantumespresso.parsers.base import BaseParser
from aiida_quantumespresso.utils.mapping import get_logging_container


[docs]class XspectraParser(BaseParser): """Parser for the ``XSpectraCalculation`` calcjob plugin."""
[docs] class_error_map = { 'Wrong xiabs!!!': 'ERROR_OUTPUT_ABSORBING_SPECIES_WRONG', 'xiabs < 1 or xiabs > ntyp': 'ERROR_OUTPUT_ABSORBING_SPECIES_ZERO', 'Calculation not finished': 'ERROR_OUT_OF_WALLTIME', }
[docs] success_string='END JOB'
[docs] def parse(self, **kwargs): """Parse the contents of the output files stored in the `retrieved` output node.""" logs = get_logging_container() stdout, parsed_data, logs = self.parse_stdout_from_retrieved(logs) base_exit_code = self.check_base_errors(logs) if base_exit_code: return self.exit(base_exit_code, logs) parsed_xspectra, logs = self.parse_stdout(stdout, logs) parsed_data.update(parsed_xspectra) # Parse some additional info which the stdout does not reliably report parameters = self.node.inputs.parameters.base.attributes.get('INPUT_XSPECTRA', {}) xepsilon_defaults = { '1': 0, '2': 0, '3': 1, } raw_xepsilon = [parameters.get(f'xepsilon({n})', xepsilon_defaults[n]) for n in ['1', '2', '3']] parsed_data['xepsilon'] = [float(n) for n in raw_xepsilon] parsed_data['xcoordcrys'] = parameters.get('xcoordcrys', True) parsed_data['xonly_plot'] = parameters.get('xonly_plot', False) self.out('output_parameters', Dict(parsed_data)) for exit_code in list(self.class_error_map.values()) + ['ERROR_OUTPUT_STDOUT_INCOMPLETE']: if exit_code in logs.error: return self.exit(self.exit_codes.get(exit_code), logs) # Check that the spectra data file exists and is readable try: with self.retrieved.base.repository.open(self.node.process_class._Spectrum_FILENAME, 'r') as fil: xspectra_file = fil.readlines() except OSError: return self.exit(self.exit_codes.ERROR_READING_SPECTRUM_FILE, logs) # Check that the data in the spectra file can be read by NumPy try: _ = np.genfromtxt(xspectra_file) except ValueError: return self.exit(self.exit_codes.ERROR_READING_SPECTRUM_FILE_DATA, logs) array_names = [[], []] array_units = [[], []] array_names[0] = ['energy', 'sigma'] # for non-spin-polarised calculations array_units[0] = ['eV', 'n/a'] array_names[1] = ['energy', 'sigma_tot', 'sigma_up', 'sigma_down'] # for spin-polarised calculations array_units[1] = ['eV', 'n/a', 'n/a', 'n/a'] array_data, spin = self.parse_raw_xspectra(xspectra_file, array_names, array_units) energy_units = 'eV' xy_data = XyData() xy_data.set_x(array_data['energy'], 'energy', energy_units) y_arrays = [] y_names = [] y_units = [] if spin: y_arrays += [array_data['sigma_tot']] y_names += ['sigma_tot'] y_arrays += [array_data['sigma_up']] y_arrays += [array_data['sigma_down']] y_names += ['sigma_up'] y_names += ['sigma_down'] y_units += ['n/a'] * 3 else: y_arrays += [array_data['sigma']] y_names += ['sigma'] y_units += ['n/a'] xy_data.set_y(y_arrays, y_names, y_units) self.out('spectra', xy_data) return self.exit(logs=logs)
@staticmethod
[docs] def parse_stdout(stdout: str, logs: AttributeDict) -> Tuple[dict, AttributeDict]: """Parse the ``stdout`` of XSpectra for the core level energy and energy zero of the spectrum.""" from .parse_raw.base import convert_qe_time_to_sec parsed_data = {} # Parse the necessary information for data plotting: core level energy of the # absorbing atom and the energy zero of the spectrum (typically the Fermi level) for line in stdout.split('\n'): if 'From SCF save directory' in line: if '(spin polarized work)' in line: spin = True else: spin = False parsed_data['lsda'] = spin if 'ehomo [eV]' in line: if spin: homo_energy = line.split(':')[-2].split('(')[0].strip() else: homo_energy = line.split(':')[-1].split('(')[0].strip() homo_energy_units = line.split('[')[1].split(':')[0].replace(']', '') parsed_data['highest_occupied_level'] = homo_energy parsed_data['highest_occupied_level_units'] = homo_energy_units if 'elumo [eV]' in line: if spin: lumo_energy = line.split(':')[-2].split('(')[0].strip() else: lumo_energy = line.split(':')[-1].split('(')[0].strip() lumo_energy_units = line.split('[')[1].split(':')[0].replace(']', '') parsed_data['lowest_unoccupied_level'] = lumo_energy parsed_data['lowest_unoccupied_level_units'] = lumo_energy_units parsed_data['lumo_found'] = True elif 'No LUMO value' in line: parsed_data['lumo_found'] = False if 'ef [eV]' in line: ef_energy = line.split(':')[-1].split('(')[0].strip() ef_energy_units = line.split('[')[1].split(':')[0].replace(']', '') parsed_data['fermi_energy'] = ef_energy parsed_data['fermi_energy_units'] = ef_energy_units # parse per-process dynamical RAM estimates if 'Estimated max dynamical RAM per process' in line: value = line.split('>')[-1] match = re.match(r'\s+([+-]?\d+(\.\d*)?|\.\d+([eE][+-]?\d+)?)\s*(Mb|MB|GB)', value) if match: try: parsed_data['estimated_ram_per_process'] = float(match.group(1)) parsed_data['estimated_ram_per_process_units'] = match.group(4) except (IndexError, ValueError): pass # parse total dynamical RAM estimates if 'Estimated total dynamical RAM' in line: value = line.split('>')[-1] match = re.match(r'\s+([+-]?\d+(\.\d*)?|\.\d+([eE][+-]?\d+)?)\s*(Mb|MB|GB)', value) if match: try: parsed_data['estimated_ram_total'] = float(match.group(1)) parsed_data['estimated_ram_total_units'] = match.group(4) except (IndexError, ValueError): pass if 'Core level energy' in line: core_energy_line = line parsed_data['core_level_energy'] = core_energy_line.split('[')[1].split(':')[1].strip() parsed_data['core_level_energy_units'] = core_energy_line.split('[')[1].split(':')[0].replace(']', '') if 'energy-zero' in line: energy_zero_line = line parsed_data['energy_zero'] = energy_zero_line.split('[')[1].split(':')[1].strip() parsed_data['energy_zero_units'] = energy_zero_line.split('[')[1].split(':')[0].replace(']', '') # Parse the walltime # XSpectra does not appear next to the timing data, so we must find 'xanes' instead. if 'xanes' in line and 'WALL' in line: try: time = line.split('CPU')[1].split('WALL')[0].strip() parsed_data['wall_time'] = time except (ValueError, IndexError): break try: parsed_data['wall_time_seconds'] = convert_qe_time_to_sec(time) except ValueError: logs.warnings.append('Unable to convert wall time from `stdout` to seconds.') return parsed_data, logs
@staticmethod
[docs] def parse_raw_xspectra(xspectra_file, array_names, array_units): """Parse the content of the output spectrum. This function takes as input the xspectra_file as a list of filelines along with information on how to give labels and units to the parsed data. :param xspectra_file: xspectra file lines in the form of a list :type xspectra_file: list :param array_names: list of all array names. :type array_names: list :param array_units: list of all array units. :type array_units: list :return array_data: narray, a dictionary for ArrayData type, which contains all parsed xspectra output along with labels and units """ xspectra_header = xspectra_file[:4] xspectra_data = np.genfromtxt(xspectra_file) if len(xspectra_data) == 0: raise QEOutputParsingError('XSpectra file is empty.') if np.isnan(xspectra_data).any(): raise QEOutputParsingError('XSpectra file contains non-numeric elements.') if len(xspectra_data[0]) == 2: array_names = array_names[0] array_units = array_units[0] spin = False elif len(xspectra_data[0]) == 4: array_names = array_names[1] array_units = array_units[1] spin = True else: raise QEOutputParsingError('XSpectra data file in unsuitable format for the parser') i = 0 array_data = {} array_data['header'] = np.array(xspectra_header) while i < len(array_names): array_data[array_names[i]] = xspectra_data[:, i] array_data[array_names[i] + '_units'] = np.array(array_units[i]) i += 1 return array_data, spin