Source code for aiida_quantumespresso.parsers.pp

# -*- coding: utf-8 -*-
"""`Parser` implementation for the `PpCalculation` calculation job class."""
import os
import re
from typing import Tuple

from aiida import orm
from aiida.common import AttributeDict
import numpy as np

from aiida_quantumespresso.calculations.pp import PpCalculation
from aiida_quantumespresso.utils.mapping import get_logging_container

from .base import BaseParser


[docs]class PpParser(BaseParser): """``Parser`` implementation for the ``PpCalculation`` calculation job class."""
[docs] class_error_map = { 'xml data file not found': 'ERROR_PARENT_XML_MISSING', }
# Lookup: plot_num --> units
[docs] units_dict = { 0: 'e/bohr^3', # Electrons, electronic charge density 1: 'Ry', # Total potential 2: 'Ry', # Ionic potential 3: 'states/bohr^3', # Density of states over an energy range 4: 'Ry/K.bohr^3', # Local density of electronic entropy 5: 'states/bohr^3', # Simulated STM images from LDOS 6: 'e/bohr^3', # Spin density 7: 'e/bohr^3', # WFN contribution to charge density, assuming collinear spins 8: '1', # Electron localization function, dimensionless 9: 'e/bohr^3', # Charge density minus superposition of atomic densities 10: 'states/bohr^3', # Integrated local density of states (ILDOS) 11: 'Ry', # Bare + Hartree potential 12: 'Ry', # the sawtooth electric field potential 13: 'mu_B', # Noncollinear magnetisation, Bohr magnetons 17: 'e/bohr^3', # All electron charge density 18: 'T', # The exchange and correlation magnetic field in the noncollinear case 19: '1', # Reduced density gradient - see dx.doi.org/10.1021/ct100641a, Eq.1 - dimensionless 20: 'e/bohr^5', # Product of the electron density and the second eigenvalue of the electron-density Hessian matrix, see: dx.doi.org/10.1021/ct100641a, with sign of second eigenvalue 21: 'e/bohr^3', # All electron charge density, PAW case 22: 'Ry/bohr^3', # Kinetic energy density }
[docs] def parse(self, **kwargs): """Parse the retrieved files of a ``PpCalculation`` into output nodes.""" logs = get_logging_container() stdout, parsed_data, logs = self.parse_stdout_from_retrieved(logs) base_exit_code = self.check_base_errors(logs) if base_exit_code: return self.exit(base_exit_code, logs) parsed_pp, logs = self.parse_stdout(stdout, logs) parsed_data.update(parsed_pp) self.out('output_parameters', orm.Dict(parsed_data)) if 'ERROR_OUTPUT_STDOUT_INCOMPLETE'in logs.error: return self.exit(self.exit_codes.ERROR_OUTPUT_STDOUT_INCOMPLETE, logs) retrieve_temporary_list = self.node.base.attributes.get('retrieve_temporary_list', None) # If temporary files were specified, check that we have them if retrieve_temporary_list: try: retrieved_temporary_folder = kwargs['retrieved_temporary_folder'] except KeyError: return self.exit(self.exit_codes.ERROR_NO_RETRIEVED_TEMPORARY_FOLDER) # Currently all plot output files should start with the `filplot` as prefix. If only one file was produced the # prefix is the entire filename, but in the case of multiple files, there will be pairs of two files where the # first has the format '{filename_prefix}.{some_random_suffix' and the second has the same name but with the # `filename_suffix` appended. filename_prefix = PpCalculation._FILPLOT filename_suffix = PpCalculation._FILEOUT # How to get the output filenames and how to open them, depends on whether they will have been retrieved in the # `retrieved` output node, or in the `retrieved_temporary_folder`. Instead of having a conditional with almost # the same loop logic in each branch, we apply a somewhat dirty trick to define an `opener` which is a callable # that will open a handle to the output file given a certain filename. This works since it is guaranteed that # these output files (excluding the standard output) will all either be in the retrieved, or in the retrieved # temporary folder. if retrieve_temporary_list: filenames = os.listdir(retrieved_temporary_folder) file_opener = lambda filename: open(os.path.join(retrieved_temporary_folder, filename)) else: filenames = self.retrieved.base.repository.list_object_names() file_opener = self.retrieved.base.repository.open # The following check should in principle always succeed since the iflag should in principle be set by the # `PpCalculation` plugin which only ever sets 0 - 4, but we check in order for the code not to except. iflag = self.node.inputs.parameters.base.attributes.get('PLOT')['iflag'] if iflag not in range(5): return self.exit_codes.ERROR_UNSUPPORTED_DATAFILE_FORMAT data_parsed = [] parsers = { 0: self.parse_gnuplot1D, 1: self.parse_gnuplot1D, 2: self.parse_gnuplot2D, 3: self.parse_gaussian, 4: self.parse_gnuplot_polar, } def get_key_from_filename(filename): """Determine the output link label for the output file with the given filename.""" if filename == filename_suffix: return filename pattern = r'{}_(.*){}'.format(filename_prefix, filename_suffix) matches = re.search(pattern, filename) return matches.group(1) for filename in filenames: # Directly parse the retrieved files after reading them to memory (`data_raw`). The raw data # of each file is released from memory after parsing, to improve memory usage. if filename.endswith(filename_suffix): # Read the file to memory try: with file_opener(filename) as handle: data_raw = handle.read() except OSError: return self.exit_codes.ERROR_OUTPUT_DATAFILE_READ.format(filename=filename) # Parse the file try: key = get_key_from_filename(filename) data_parsed.append((key, parsers[iflag](data_raw, self.units_dict[parsed_data['plot_num']]))) del data_raw except Exception as exception: # pylint: disable=broad-except return self.exit_codes.ERROR_OUTPUT_DATAFILE_PARSE.format(filename=filename, exception=exception) # If we don't have any parsed files, we exit. Note that this will not catch the case where there should be more # than one file, but the engine did not retrieve all of them. Since often we anyway don't know how many files # should be retrieved there really is no way to check this explicitly. if not data_parsed: return self.exit_codes.ERROR_OUTPUT_DATAFILE_MISSING.format(filename=filename_prefix) # Create output nodes if len(data_parsed) == 1: self.out('output_data', data_parsed[0][1]) else: self.out('output_data_multiple', dict(data_parsed)) return self.exit(logs=logs)
[docs] def parse_stdout(self, stdout: str, logs: AttributeDict) -> Tuple[dict, AttributeDict]: """Parse the ``stdout`` content of a Quantum ESPRESSO ``pp.x`` calculation.""" parsed_data = {} # Parse useful data from stdout for line in stdout.splitlines(): if 'Check:' in line: # QE < 6.5 split_line = line.split('=') if 'negative/imaginary' in line: # QE6.1-6.3 parsed_data['negative_core_charge'] = float(split_line[-1].split()[0]) parsed_data['imaginary_core_charge'] = float(split_line[-1].split()[-1]) else: # QE6.4 parsed_data['negative_core_charge'] = float(split_line[1]) if 'Min, Max, imaginary charge:' in line: split_line = line.split() parsed_data['charge_min'] = float(split_line[-3]) parsed_data['charge_max'] = float(split_line[-2]) parsed_data['charge_img'] = float(split_line[-1]) if 'plot_num = ' in line: parsed_data['plot_num'] = int(line.split('=')[1]) if 'Plot Type:' in line: parsed_data['plot_type'] = line.split('Output format')[0].split(':')[-1].strip() parsed_data['output_format'] = line.split(':')[-1].strip() return parsed_data, logs
@staticmethod
[docs] def parse_gnuplot1D(data_file_str, data_units): """Parse 1D GNUPlot formatted output. :param data_file_str: the data file read in as a single string """ data_lines = data_file_str.splitlines() n_col = len(data_lines[0].split()) # 1D case if n_col == 2: coords = [] data = [] data_integral = [] for line in data_lines: split_line = line.split() coords.append(float(split_line[0])) data.append(float(split_line[1])) y_data = [data] y_names = ['data'] y_units = [data_units] # 1D case with spherical averaging if n_col == 3: coords = [] data = [] data_integral = [] for line in data_lines: split_line = line.split() coords.append(float(split_line[0])) data.append(float(split_line[1])) data_integral.append(float(split_line[2])) y_data = [data, data_integral] y_names = ['data', 'integrated_data'] y_units = [data_units, data_units.replace('bohr^3', 'bohr')] x_units = 'bohr' arraydata = orm.ArrayData() arraydata.set_array('x_coordinates', np.array(coords)) arraydata.set_array('x_coordinates_units', np.array(x_units)) for name, data, units in zip(y_names, y_data, y_units): arraydata.set_array(name, np.array(data)) arraydata.set_array(name + '_units', np.array(units)) return arraydata
@staticmethod
[docs] def parse_gnuplot_polar(data_file_str, data_units): """Parse 2D Polar GNUPlot formatted, single column output. :param data_file_str: the data file read in as a single string """ data_lines = data_file_str.splitlines() data_lines.pop(0) # First line is a header data = [] for line in data_lines: data.append(float(line)) arraydata = orm.ArrayData() arraydata.set_array('data', np.array(data)) arraydata.set_array('data_units', np.array([data_units])) return arraydata
@staticmethod
[docs] def parse_gnuplot2D(data_file_str, data_units): """Parse 2D GNUPlot formatted output. :param data_file_str: the data file read in as a single string """ data_lines = data_file_str.splitlines() coords = [] data = [] for line in data_lines: stripped = line.strip() if stripped == '': continue else: split_line = stripped.split() coords.append([float(split_line[0]), float(split_line[1])]) data.append(float(split_line[2])) coords_units = 'bohr' arraydata = orm.ArrayData() arraydata.set_array('xy_coordinates', np.array(coords)) arraydata.set_array('data', np.array(data)) arraydata.set_array('xy_coordinates_units', np.array(coords_units)) arraydata.set_array('data_units', np.array(data_units)) return arraydata
@staticmethod
[docs] def parse_gaussian(data_file_str, data_units): """Parse Gaussian Cube formatted output. :param data_file_str: the data file read in as a single string """ lines = data_file_str.splitlines() atoms_line = lines[2].split() natoms = int(atoms_line[0]) # The number of atoms listed in the file origin = np.array(atoms_line[1:], dtype=float) header = lines[:6 + natoms] # Header of the file: comments, the voxel, and the number of atoms and datapoints data_lines = lines[6 + natoms:] # The actual data: atoms and volumetric data # Parse the declared dimensions of the volumetric data x_line = header[3].split() xdim = int(x_line[0]) y_line = header[4].split() ydim = int(y_line[0]) z_line = header[5].split() zdim = int(z_line[0]) # Get the vectors describing the basis voxel voxel_array = np.array([[x_line[1], x_line[2], x_line[3]], [y_line[1], y_line[2], y_line[3]], [z_line[1], z_line[2], z_line[3]]], dtype=np.float64) # Get the volumetric data data_array = np.empty(xdim * ydim * zdim, dtype=float) cursor = 0 for line in data_lines: ls = line.split() data_array[cursor:cursor + len(ls)] = ls cursor += len(ls) data_array = data_array.reshape((xdim, ydim, zdim)) coordinates_units = 'bohr' arraydata = orm.ArrayData() arraydata.set_array('voxel', voxel_array) arraydata.set_array('data', data_array) arraydata.set_array('data_units', np.array(data_units)) arraydata.set_array('coordinates_units', np.array(coordinates_units)) return arraydata