Source code for aiida_quantumespresso.parsers.parse_raw.ph

# -*- coding: utf-8 -*-
"""A collection of function that are used to parse the output of Quantum Espresso PHonon.

The function that needs to be called from outside is parse_raw_ph_output(). Ideally, the functions should work even
without aiida and will return a dictionary with parsed keys.
"""
import numpy
from qe_tools import CONSTANTS

from aiida_quantumespresso.parsers import QEOutputParsingError
from aiida_quantumespresso.parsers.parse_raw.base import convert_qe_time_to_sec
from aiida_quantumespresso.parsers.parse_xml.pw.legacy import parse_xml_child_bool, read_xml_card


[docs]def parse_raw_ph_output(stdout, logs, tensors=None, dynamical_matrices=None): """Parses the raw output of a Quantum ESPRESSO `ph.x` calculation. :param stdout: the content of the stdout file as a string :param tensors: the content of the tensors.xml file as a string :param dynamical_matrices: a list of the content of the dynamical matrix files as a string :returns: tuple of two dictionaries, with the parsed data and log messages, respectively """ data_lines = stdout.split('\n') # Parse tensors, if present tensor_data = {} if tensors: try: tensor_data = parse_ph_tensor(tensors) except QEOutputParsingError: logs.warning.append('Error while parsing the tensor files') out_data = parse_ph_text_output(data_lines, logs) # parse dynamical matrices if present dynmat_data = {} if dynamical_matrices: # find lattice parameter for dynmat_counter, dynmat in enumerate(dynamical_matrices): lines = dynmat.split('\n') # check if the file contains frequencies (i.e. is useful) or not dynmat_to_parse = False if not lines: continue try: _ = [float(i) for i in lines[0].split()] except ValueError: dynmat_to_parse = True if not dynmat_to_parse: continue # parse it this_dynmat_data = parse_ph_dynmat(lines, logs) # join it with the previous dynmat info dynmat_data[f'dynamical_matrix_{dynmat_counter}'] = this_dynmat_data # TODO: use the bands format? # join dictionaries, there should not be any twice repeated key for key in out_data.keys(): if key in list(tensor_data.keys()): raise AssertionError(f'{key} found in two dictionaries') if key in list(dynmat_data.keys()): raise AssertionError(f'{key} found in two dictionaries') symmetry_labels = out_data.pop('symmetry_labels', {}) if len(dynmat_data) == 1 and 'dynamical_matrix_0' in dynmat_data: dynmat_data['dynamical_matrix_1'] = dynmat_data.pop('dynamical_matrix_0') parsed_data = dict(list(dynmat_data.items()) + list(out_data.items()) + list(tensor_data.items())) for q_index, q_symlabels in symmetry_labels.items(): if f'dynamical_matrix_{q_index}' in parsed_data: parsed_data[f'dynamical_matrix_{q_index}'].update(q_symlabels) return parsed_data, logs
[docs]def parse_ph_tensor(data): """Parse the xml tensor file of QE v5.0.3 data must be read from the file with the .read() function (avoid readlines)""" from xml.dom.minidom import parseString dom = parseString(data) parsed_data = {} # card EF_TENSORS cardname = 'EF_TENSORS' target_tags = read_xml_card(dom, cardname) tagname = 'DONE_ELECTRIC_FIELD' parsed_data[tagname.lower()] = parse_xml_child_bool(tagname, target_tags) if parsed_data[tagname.lower()]: try: second_tagname = 'DIELECTRIC_CONSTANT' parsed_data[second_tagname.lower()] = parse_xml_matrices(second_tagname, target_tags) except: raise QEOutputParsingError('Failed to parse Dielectric constant') tagname = 'DONE_EFFECTIVE_CHARGE_EU' parsed_data[tagname.lower()] = parse_xml_child_bool(tagname, target_tags) if parsed_data[tagname.lower()]: try: second_tagname = 'EFFECTIVE_CHARGES_EU' dumb_matrix = parse_xml_matrices(second_tagname, target_tags) # separate the elements of the messy matrix, with a matrix 3x3 for each element new_matrix = [] this_at = [] for i in dumb_matrix: this_at.append(i) if len(this_at) == 3: new_matrix.append(this_at) this_at = [] parsed_data[second_tagname.lower()] = new_matrix except: raise QEOutputParsingError('Failed to parse effective charges eu') return parsed_data
[docs]def parse_xml_matrices(tagname, target_tags): """Can be used to load the disordered matrices of the QE XML file.""" a = target_tags.getElementsByTagName(tagname)[0] b = a.childNodes[0] flat_array = b.data.split() # convert to float, then into a list of tuples, then into a list of lists flat_array = [float(i) for i in flat_array] list_tuple = list(zip(*[iter(flat_array)] * 3)) return [list(i) for i in list_tuple]
[docs]def parse_ph_text_output(lines, logs): """Parses the stdout of Quantum ESPRESSO ``ph.x``. :param lines: list of strings, the file as read by readlines() :return: dictionary with parsed values """ def parse_qpoints(lines): """Parse the q-points from the corresponding lines in the stdout.""" return {int(line.split()[0]): [float(coord) for coord in line.split()[1:4]] for line in lines} def parse_mode_symmetries(lines, num_atoms): """Parse the mode symmetries from the block after diagonalization of the dynamical matrix.""" q_data = {} symlabel_q_point = [float(i) for i in lines[2].split('q = (')[-1].split(')')[0].split()] q_data['mode_symmetry'] = [] for line in lines: if 'Mode symmetry' in line: q_data['point_group'] = line.split('Mode symmetry,')[1].split('point group:')[0].strip() if '-->' in line: freq_start = int(line.split('(')[1].split(')')[0].split('-')[0]) freq_end = int(line.split('(')[1].split(')')[0].split('-')[1]) if line.split()[-1] == 'I': symm_label = line.split()[-2] else: symm_label = line.split()[-1] q_data['mode_symmetry'].extend([symm_label] * (freq_end - freq_start + 1)) # If the mode symmetries are not printed, set those for the non-symmetry case if 'point_group' not in q_data: q_data['point_group'] = 'C_1' q_data['mode_symmetry'] = ['A'] * (3 * num_atoms) return symlabel_q_point, q_data parsed_data = {} parsed_data['num_q_found'] = 0 # Parse number of q-points and number of atoms for count, line in enumerate(lines): if 'q-points for this run' in line: try: parsed_data['number_of_qpoints'] = int(line.split('/')[1].split('q-points')[0]) parsed_data['q_points'] = parse_qpoints(lines[count + 2:count + parsed_data['number_of_qpoints'] + 2]) except Exception as exc: logs.warning.append(f'Error while parsing number of q points: {exc}') elif 'q-points)' in line: # case of a 'only_wfc' calculation try: parsed_data['number_of_qpoints'] = int(line.split('q-points')[0].split('(')[1]) parsed_data['q_points'] = parse_qpoints(lines[count + 2:count + parsed_data['number_of_qpoints'] + 2]) except Exception as exc: logs.warning.append(f'Error while parsing number of q points: {exc}') # In case a single q-point is provided at the end of the input file, there is no summary of the q-points at the # start of the stdout. Then we have to parse the q-point further down the output file. elif 'number_of_qpoints' not in parsed_data and 'Calculation of q =' in line: parsed_data['number_of_qpoints'] = 1 parsed_data['q_points'] = {1: [float(coord) for coord in line.split('=')[-1].split()]} elif 'number of atoms/cell' in line: try: num_atoms = int(line.split('=')[1]) parsed_data['number_of_atoms'] = num_atoms except Exception: logs.warning.append('Error while parsing number of atoms.') elif 'irreducible representations' in line: if 'number_of_irr_representations_for_each_q' not in list(parsed_data.keys()): parsed_data['number_of_irr_representations_for_each_q'] = [] try: num_irr_repr = int(line.split('irreducible')[0].split('are')[1]) parsed_data['number_of_irr_representations_for_each_q'].append(num_irr_repr) except Exception: pass elif 'Diagonalizing the dynamical matrix' in line: mode_count = count while not 'Calculation' in lines[mode_count] and not 'CPU' in lines[mode_count]: mode_count += 1 symlabel_q_point, q_data = parse_mode_symmetries(lines[count: mode_count], num_atoms) for q_index, q_point in parsed_data['q_points'].items(): if numpy.isclose( numpy.array(q_point), numpy.array(symlabel_q_point), rtol=0, atol=1e-7 ).all(): parsed_data.setdefault('symmetry_labels', {}) parsed_data['symmetry_labels'][q_index] = q_data parsed_data['num_q_found'] += 1 # Remove the q-points from the parsed data; these are only used to assign the symmetry labels to the right index parsed_data.pop('q_points', None) # Trim the number of irreps to the number of q-points finished in this run parsed_data['number_of_irr_representations_for_each_q'] = parsed_data.get( 'number_of_irr_representations_for_each_q', [] )[:parsed_data.pop('num_q_found')] return parsed_data
[docs]def parse_ph_dynmat(data, logs, lattice_parameter=None, also_eigenvectors=False, parse_header=False): """Parse frequencies and eigenvectors of a single dynamical matrix. :param data: the text read with the function readlines() :param lattice_parameter: the lattice_parameter ('alat' in QE jargon). If None, q_point is kept in 2pi/a coordinates as in the dynmat file. :param also_eigenvectors: if True, return an additional 'eigenvectors' array in output, containing also the eigenvectors. This will be a list of lists, that when converted to a numpy array has 4 indices, with shape Neigenstates x Natoms x 3(xyz) x 2 (re,im) To convert to a complex numpy array, you can use:: ev = np.array(parsed_data['eigenvectors']) ev = ev[:,:,:,0] + 1j * ev[:,:,:,1] :param parse_header: if True, return additional keys in the returned parsed_data dictionary, including information from the header :return: a dictionary with parsed values and units """ parsed_data = {} if 'Dynamical matrix file' not in data[0]: raise QEOutputParsingError('Dynamical matrix is not in the expected format') frequencies = [] eigenvectors = [] starting_line = 1 if parse_header: header_dict = {'warnings': []} try: pieces = data[2].split() if len(pieces) != 9: raise QEOutputParsingError('Wrong # of elements on line 3') try: num_species = int(pieces[0]) num_atoms = int(pieces[1]) header_dict['ibrav'] = int(pieces[2]) header_dict['celldm'] = [float(i) for i in pieces[3:]] # In angstrom alat = header_dict['celldm'][0] * CONSTANTS.bohr_to_ang if abs(alat) < 1.e-5: raise QEOutputParsingError( 'Lattice constant=0! Probably you are using an ' 'old Quantum ESPRESSO version?' ) header_dict['alat'] = alat header_dict['alat_units'] = 'angstrom' except ValueError: raise QEOutputParsingError('Wrong data on line 3') starting_line = 3 if header_dict['ibrav'] == 0: if 'Basis vectors' not in data[3]: raise QEOutputParsingError("Wrong format (no 'Basis vectors' line)") try: v1 = [float(_) * alat for _ in data[4].split()] v2 = [float(_) * alat for _ in data[5].split()] v3 = [float(_) * alat for _ in data[6].split()] if len(v1) != 3 or len(v2) != 3 or len(v3) != 3: raise QEOutputParsingError('Wrong length for basis vectors') header_dict['lattice_vectors'] = [v1, v2, v3] header_dict['lattice_vectors_units'] = 'angstrom' except ValueError: raise QEOutputParsingError('Wrong data for basis vectors') starting_line += 4 species_info = {} species = [] for idx, sp_line in enumerate(data[starting_line:starting_line + num_species], start=1): pieces = sp_line.split("'") if len(pieces) != 3: raise QEOutputParsingError('Wrong # of elements for one of the species') try: if int(pieces[0]) != idx: raise QEOutputParsingError('Error with the indices of the species') species.append([pieces[1].strip(), float(pieces[2]) / CONSTANTS.amu_Ry]) except ValueError: raise QEOutputParsingError('Error parsing the species') masses = dict(species) header_dict['masses'] = masses atoms_coords = [] atoms_labels = [] starting_line += num_species for idx, atom_line in enumerate(data[starting_line:starting_line + num_atoms], start=1): pieces = atom_line.split() if len(pieces) != 5: raise QEOutputParsingError( f'Wrong # of elements for one of the atoms: {len(pieces)}, line {starting_line + idx}: {pieces}' ) try: if int(pieces[0]) != idx: raise QEOutputParsingError(f'Error with the indices of the atoms: {int(pieces[0])} vs {idx}') sp_idx = int(pieces[1]) if sp_idx > len(species): raise QEOutputParsingError(f'Wrong index for the species: {sp_idx}, but max={len(species)}') atoms_labels.append(species[sp_idx - 1][0]) atoms_coords.append([float(pieces[2]) * alat, float(pieces[3]) * alat, float(pieces[4]) * alat]) except ValueError: raise QEOutputParsingError('Error parsing the atoms') except IndexError: raise QEOutputParsingError('Error with the indices in the atoms section') header_dict['atoms_labels'] = atoms_labels header_dict['atoms_coords'] = atoms_coords header_dict['atoms_coords_units'] = 'angstrom' starting_line += num_atoms starting_line += 1 # Got to the next line to check if 'Dynamical' not in data[starting_line]: raise QEOutputParsingError("Wrong format (no 'Dynamical Matrix' line)") ## Here I finish the header parsing except QEOutputParsingError as e: logs.warning.append( 'Problem parsing the header of the matdyn file! (msg: {}). ' 'Storing only the information I managed to retrieve'.format(e.message) ) header_dict['warnings'].append( 'There was some parsing error and this dictionary is ' 'not complete, see the warnings of the top parsed_data dict' ) # I store what I got parsed_data['header'] = header_dict for line_counter, line in enumerate(data[starting_line:], start=starting_line): if 'q = ' in line: # q point is written several times, because it can also be rotated. # I consider only the first point, which is the one computed if 'q_point' not in parsed_data: q_point = [float(i) for i in line.split('(')[1].split(')')[0].split()] if lattice_parameter: parsed_data['q_point'] = [e * 2 * numpy.pi / lattice_parameter for e in q_point] parsed_data['q_point_units'] = 'angstrom-1' else: parsed_data['q_point'] = q_point parsed_data['q_point_units'] = '2pi/lattice_parameter' if 'freq' in line or 'omega' in line: this_freq = line.split('[cm-1]')[0].split('=')[-1] # exception for bad fortran coding: *** could be written instead of the number if '*' in this_freq: frequencies.append(None) logs.warning.append('Wrong fortran formatting found while parsing frequencies') else: frequencies.append(float(this_freq)) this_eigenvectors = [] for new_line in data[line_counter + 1:]: if ( 'freq' in new_line or 'omega' in new_line or '************************************************' in new_line ): break this_things = new_line.split('(')[1].split(')')[0].split() try: this_flatlist = [float(i) for i in this_things] except ValueError: logs.warning.append('Wrong fortran formatting found while parsing eigenvectors') # then save the three (xyz) complex numbers as [None,None] this_eigenvectors.append([[None, None]] * 3) continue list_tuples = list(zip(*[iter(this_flatlist)] * 2)) # I save every complex number as a list of two numbers this_eigenvectors.append([[i[0], i[1]] for i in list_tuples]) eigenvectors.append(this_eigenvectors) parsed_data['frequencies'] = frequencies parsed_data['frequencies_units'] = 'cm-1' # TODO: the eigenvectors should be written in the database according to a parser_opts. # for now, we don't store them, otherwise we get too much stuff # We implement anyway the possibility to get it with an optional parameter if also_eigenvectors: parsed_data['eigenvectors'] = eigenvectors return parsed_data