Source code for aiida_quantumespresso.parsers.parse_raw.cp
# -*- coding: utf-8 -*-
from xml.dom.minidom import parseString
from xml.etree import ElementTree
from aiida_quantumespresso.parsers import QEOutputParsingError
from aiida_quantumespresso.parsers.parse_xml.cp.legacy import parse_cp_xml_output
from aiida_quantumespresso.parsers.parse_xml.parse import parse_xml_post_6_2
from aiida_quantumespresso.parsers.parse_xml.pw.legacy import parse_xml_child_integer
from aiida_quantumespresso.parsers.parse_xml.versions import QeXmlVersion, get_xml_file_version
[docs]def parse_cp_traj_stanzas(num_elements, splitlines, prepend_name, rescale=1.):
"""
num_elements: Number of lines (with three elements) between lines with two only
elements (containing step number and time in ps).
num_elements is 3 for cell, and the number of atoms for coordinates and positions.
splitlines: a list of lines of the file, already split in pieces using string.split
prepend_name: a string to be prepended to the name of keys returned
in the return dictionary.
rescale: the values in each stanza are multiplied by this factor, for units conversion
"""
steps = []
times = []
stanzas = []
this_stanza = []
start_stanza = False
linenum = -1
try:
for linenum, l in enumerate(splitlines):
if len(l) == 2:
steps.append(int(l[0]))
if set(l[1]) == {'*'}:
times.append(-1.0)
else:
times.append(float(l[1]))
start_stanza = True
if len(this_stanza) != 0:
raise ValueError('Wrong position of short line.')
elif len(l) == 3:
if len(this_stanza) == 0 and not start_stanza:
raise ValueError('Wrong position of long line.')
start_stanza = False
this_stanza.append([float(l[0]) * rescale, float(l[1]) * rescale, float(l[2]) * rescale])
if len(this_stanza) == num_elements:
stanzas.append(this_stanza)
this_stanza = []
else:
raise ValueError(f'Wrong line length ({len(l)})')
if len(this_stanza) != 0:
raise ValueError(f'Wrong length of last block ({len(this_stanza)} lines instead of 0).')
if len(steps) != len(stanzas):
raise ValueError('Length mismatch between number of steps and number of defined stanzas.')
return {
f'{prepend_name}_steps': steps,
f'{prepend_name}_times': times,
f'{prepend_name}_data': stanzas,
}
except Exception as e:
e.message = f'At line {linenum + 1}: {e}'
raise e
[docs]def parse_cp_text_output(data, xml_data):
"""data must be a list of strings, one for each lines, as returned by readlines().
On output, a dictionary with parsed values
"""
# TODO: uniform readlines() and read() usage for passing input to the parser
parsed_data = {}
parsed_data['warnings'] = []
conjugate_gradient = False
for count, line in enumerate(data):
if 'conjugate gradient' in line.lower():
conjugate_gradient = True
parsed_data['conjugate_gradient'] = True
if 'warning' in line.lower():
parsed_data['warnings'].append(line)
elif 'bananas' in line:
parsed_data['warnings'].append('Bananas from the ortho.')
#when the cp does a cg, the output is different and the parser below does not work
#TODO: understand what the cg prints out and parse it (it is undocumented)
if not conjugate_gradient:
for count, line in enumerate(reversed(data)):
if 'nfi' in line and 'ekinc' in line and 'econs' in line:
this_line = data[len(data) - count]
try:
parsed_data['ekinc'] = [float(this_line.split()[1])]
except ValueError:
pass
try:
parsed_data['temph'] = [float(this_line.split()[2])]
except ValueError:
pass
try:
parsed_data['tempp'] = [float(this_line.split()[3])]
except ValueError:
pass
try:
parsed_data['etot'] = [float(this_line.split()[4])]
except ValueError:
pass
try:
parsed_data['enthal'] = [float(this_line.split()[5])]
except ValueError:
pass
try:
parsed_data['econs'] = [float(this_line.split()[6])]
except ValueError:
pass
try:
parsed_data['econt'] = [float(this_line.split()[7])]
except ValueError:
pass
try:
parsed_data['vnhh'] = [float(this_line.split()[8])]
except (ValueError, IndexError):
pass
try:
parsed_data['xnhh0'] = [float(this_line.split()[9])]
except (ValueError, IndexError):
pass
try:
parsed_data['vnhp'] = [float(this_line.split()[10])]
except (ValueError, IndexError):
pass
try:
parsed_data['xnhp0'] = [float(this_line.split()[11])]
except (ValueError, IndexError):
pass
return parsed_data
[docs]def parse_cp_xml_counter_output(data):
"""Parse xml file print_counter.xml data must be a single string, as returned by file.read() (notice the difference
with parse_text_output!) On output, a dictionary with parsed values."""
dom = parseString(data)
parsed_data = {}
cardname = 'LAST_SUCCESSFUL_PRINTOUT'
card1 = [_ for _ in dom.childNodes if _.nodeName == 'PRINT_COUNTER'][0]
card2 = [_ for _ in card1.childNodes if _.nodeName == 'LAST_SUCCESSFUL_PRINTOUT'][0]
tagname = 'STEP'
parsed_data[cardname.lower().replace('-', '_')] = parse_xml_child_integer(tagname, card2)
return parsed_data
[docs]def parse_cp_counter_output(data):
"""Parse file print_counter data must be a single string, as returned by file.read() (notice the difference
with parse_text_output!) On output, a dictionary with parsed values."""
parsed_data = {}
cardname = 'LAST_SUCCESSFUL_PRINTOUT'
tagname = 'STEP'
numbers = [int(s) for s in data.split() if s.isdigit()]
if numbers:
parsed_data[cardname.lower().replace('-', '_')] = numbers[0]
return parsed_data
[docs]def parse_cp_raw_output(stdout, output_xml=None, xml_counter_file=None, print_counter_xml=True):
parser_warnings = []
# analyze the xml
if output_xml is not None:
xml_parsed = ElementTree.ElementTree(element=ElementTree.fromstring(output_xml))
xml_file_version = get_xml_file_version(xml_parsed)
if xml_file_version == QeXmlVersion.POST_6_2:
xml_data, logs = parse_xml_post_6_2(xml_parsed)
elif xml_file_version == QeXmlVersion.PRE_6_2:
xml_data = parse_cp_xml_output(output_xml)
else:
parser_warnings.append('Skipping the parsing of the xml file.')
xml_data = {}
# analyze the counter file, which keeps info on the steps
if xml_counter_file is not None:
if print_counter_xml:
xml_counter_data = parse_cp_xml_counter_output(xml_counter_file)
else:
xml_counter_data = parse_cp_counter_output(xml_counter_file)
else:
xml_counter_data = {}
stdout = stdout.split('\n')
# understand if the job ended smoothly
job_successful = any('JOB DONE' in line for line in reversed(stdout))
out_data = parse_cp_text_output(stdout, xml_data)
for key in out_data.keys():
if key in list(xml_data.keys()):
raise AssertionError(f'{key} found in both dictionaries')
if key in list(xml_counter_data.keys()):
raise AssertionError(f'{key} found in both dictionaries')
# out_data keys take precedence and overwrite xml_data keys,
# if the same key name is shared by both (but this should not happen!)
final_data = {
**xml_data,
**out_data,
**xml_counter_data,
}
if parser_warnings:
final_data['parser_warnings'] = parser_warnings
# TODO: parse the trajectory and save them in a reasonable format
return final_data, job_successful