import os
import xml.etree.ElementTree as et
import csv
import xlsxwriter
import threading
from scipy import interpolate
class Converter:
def __init__(self, in_path, out_path, min_frequency=226, max_frequency=8000):
self.in_path = in_path
self.out_path = out_path
self.set_min_frequency(min_frequency)
self.set_max_frequency(max_frequency)
def set_min_frequency(self, min_frequency):
self.min_frequency = min_frequency
def set_max_frequency(self, max_frequency):
self.max_frequency = max_frequency
def convert(self):
dir_path = self.create_directory()
files_names = os.listdir(self.in_path)
threads = []
for file_name in files_names:
complete_path = os.path.join(self.in_path, file_name)
if os.path.splitext(complete_path)[1] != ".xml":
continue
tree = et.parse(complete_path)
root = tree.getroot()
uuid = self.get_uuid(root)
plain_file_name = os.path.splitext(file_name)[0]
print(plain_file_name + " loading...")
t = threading.Thread(target=self.convert_file, args=(dir_path, plain_file_name, root, uuid))
t.setName(plain_file_name)
t.start()
threads.append(t)
for thread in threads:
thread.join()
print(thread.name + " has been succesfully converted")
print("\nConverting has ended")
def get_uuid(self, root):
for attrib in root.attrib:
if "schemaLocation" in attrib:
return '{' + root.get(attrib).rsplit(' ', 1)[0] + '}' # getting UUID
return ""
def create_directory(self):
dir_path = os.path.join(self.out_path, "csv_files")
if not os.path.isdir(dir_path):
os.makedirs(dir_path)
return dir_path
else:
'''
toFormat = dir_path + "({})"
i = 1
dir_path = toFormat.format(i)
while os.path.isdir(dir_path):
dir_path = toFormat.format(i)
print(dir_path)
print(toFormat)
i += 1
os.makedirs(dir_path)
'''
return dir_path
def convert_file(self, dir_path, file_name, root, uuid):
path_to_format = os.path.join(dir_path, file_name + '-{} ({}).{}')
tests = list(root.iter(uuid + "Test"))
threads = []
for test in tests:
print(" " + file_name + ": " + self.get_type(test, uuid) + " is loading...")
i = 0
for measured in test.iter(uuid + "Measured"):
path = path_to_format.format(self.get_name(test, uuid), measured.find(uuid + "EarSide").text, "csv")
t = threading.Thread(target=self.create_file, args=(test, measured, path, uuid))
t.setName(file_name + " " + self.get_type(test, uuid) + "_" + str(i))
t.start()
threads.append(t)
i += 1
for thread in threads:
thread.join()
print(thread.name + " is completed")
def create_file(self, test, measured, path, uuid):
setting_node = list(test.iter(uuid + "TypeSettings"))[0][0]
settings_header = self.extract_names(setting_node, uuid, shallow=True)
settings_row = self.get_csv_rows(setting_node, uuid, shallow=True)
if self.get_type(test, uuid) == "WideBandTympanometry":
# csv
'''
full_path = path.format(test_name, ear_side, "csv")
rows = self.get_csv_matrix_rows(measured)
header = rows[0].keys()
with open(full_path, 'w+') as f:
csv_writer = csv.DictWriter(f, header)
csv_writer.writeheader()
csv_writer.writerows(rows)
'''
# csv matrix
data = self.get_data(measured, uuid)
matrix_rows = self.create_matrix_rows(data[0], data[1], data[2])
tables = []
tymp_markups = list(measured.iter(uuid + "TympData"))
tables.append(self.create_table(tymp_markups, uuid, header=False))
markups = list(measured.find(uuid + "Measurement"))
for pred in [lambda m: "TympData" in m.tag,
lambda m: "AbsorbanceData" in m.tag]:
tables.append(self.create_table(list(filter(pred, markups)), uuid))
with open(path, 'w+', newline="") as f:
dict_writer = csv.DictWriter(f, settings_header)
dict_writer.writeheader()
dict_writer.writerows(settings_row)
csv_writer = csv.writer(f)
csv_writer.writerow([""])
csv_writer.writerows(matrix_rows)
for table in tables:
csv_writer.writerow([""])
csv_writer.writerows(table)
'''
##Matrix heat map
data = self.get_analytic_continuation(data[0], data[1], data[2])
full_path = path.format(test_name + " heat map", ear_side, "xlsx")
self.create_workbook(full_path,
data[0],
data[1],
data[2])
'''
else:
header = self.extract_names(measured, uuid)
rows = self.get_csv_rows(measured, uuid)
with open(path, 'w+', newline="") as f:
dict_writer = csv.DictWriter(f, settings_header)
dict_writer.writeheader()
dict_writer.writerows(settings_row)
csv.writer(f).writerow([""])
csv_writer = csv.DictWriter(f, header)
csv_writer.writeheader()
csv_writer.writerows(rows)
def get_type(self, test, uuid):
return list(list(test.iter(uuid + "TypeSettings"))[0])[0].tag.replace(uuid, "")
def get_name(self, test, uuid):
return test.find(uuid + "TestName").text
def get_tag(self, node, uuid):
return node.tag.replace(uuid, "")
def extract_names(self, node, uuid, do_not_check=[], shallow=False):
header = []
used = []
nodes = [node]
while len(nodes) > 0:
current_node = nodes.pop(0)
for subnode in list(current_node)[::-1]:
tag = self.get_tag(subnode, uuid)
if tag in used or tag in do_not_check:
continue
if len(list(subnode)) > 0 and not shallow:
nodes.append(subnode)
else:
header.append(tag)
used.append(tag)
return header[::-1]
def get_csv_rows(self, start_node, uuid, do_not_check=[], shallow=False):
return self.get_csv_rows_helper(start_node, {}, uuid, do_not_check, shallow)
def get_csv_rows_helper(self, start_node, row, uuid, do_not_check, shallow=False):
to_visit = []
for subnode in start_node:
tag = self.get_tag(subnode, uuid)
if len(list(subnode)) > 0:
if tag not in do_not_check and not shallow:
to_visit.append(subnode)
else:
row[tag] = subnode.text
if len(to_visit) == 0:
return [row]
else:
rows = []
for node in to_visit:
new_rows = self.get_csv_rows_helper(node, row.copy(), uuid, do_not_check)
rows.extend(new_rows)
return rows
def get_data(self, measured, uuid):
frequencies = []
pressures = []
absorbances = []
for frequency in measured.find(uuid + "Frequencies"):
frequencies.append(float(frequency.text))
for pressure_data in measured.find(uuid + "Measurement").findall(uuid + "SinglePressureData"):
pressure = float(pressure_data.find(uuid + "Pressure").text)
pressures.append(pressure)
absorbances.append([])
for absorbance in pressure_data.find(uuid + "Absorbances"):
absorbances[-1].append(float(absorbance.text))
return [frequencies, pressures, absorbances]
def create_table(self, markups, uuid, header=True):
names = self.extract_names(markups[0], uuid, shallow=True)
rows = []
if header:
rows.append([""])
rows[-1].extend([self.get_tag(markup, uuid) for markup in markups])
for name in names:
rows.append([name])
for markup in markups:
rows[-1].append(markup.find(uuid + name).text)
return rows
def get_analytic_continuation(self, frequencies, pressures, absorbances):
inter_frequencies = frequencies.copy()
inter_pressures = []
inter_absorbances = []
min_press = pressures[-1]
max_press = pressures[0]
if pressures[0] < pressures[-1]:
is_increasing = True
min_press = max_press
max_press = pressures[-1]
else:
is_increasing = False
for i in range(int(max_press - min_press)):
if is_increasing:
inter_pressures.append(min_press + i)
else:
inter_pressures.append(max_press - i)
inter_absorbances.append([])
for j in range(len(frequencies)):
inter_absorbances[-1].append(0)
for i in range(len(frequencies)):
vals = [absorbances[j][i] for j in range(len(pressures))]
f = interpolate.interp1d(pressures, vals)
for press in inter_pressures:
if is_increasing:
inter_absorbances[int(press - min_press)][i] = f(press).flat[0]
else:
inter_absorbances[int(press - max_press)][i] = f(press).flat[0]
return [inter_frequencies, inter_pressures, inter_absorbances]
def create_workbook(self, path, frequencies, pressures, absorbances):
workbook = xlsxwriter.Workbook(path)
worksheet = workbook.add_worksheet()
worksheet.write(0, 0, "Absorbances")
worksheet.write(1, 0, "Frequencies")
worksheet.write_column(2, 0, frequencies)
worksheet.write(0, 1, "Pressures")
worksheet.write_row(0, 2, pressures)
if (len(absorbances) > 0):
for i in range(0, len(absorbances)):
worksheet.write_column(2, 2 + i, absorbances[i])
letter = self.column_string(3 + i)
sum_formula = "=SUM({}{}:{}{})".format(letter, 4, letter, len(absorbances[0]) + 1)
worksheet.write_formula(len(absorbances[0]) + 1, 2 + i, sum_formula)
worksheet.conditional_format(2, 2, len(absorbances[0]) + 1, len(absorbances) + 1,
{
'type': '3_color_scale',
'min_color': 'green',
'mid_color': 'yellow',
'max_color': "red"
})
workbook.close()
def create_matrix_rows(self, frequencies, pressures, absorbances):
rows = []
row1 = ["Absorbance", "Pressures", "from " + str(self.min_frequency), "to " + str(self.max_frequency)]
row2 = ["Frequencies"]
row2.extend(pressures)
rows.append(row1)
rows.append(row2)
for i in range(len(frequencies)):
new_row = [frequencies[i]]
for j in range(len(pressures)):
new_row.append(absorbances[j][i])
rows.append(new_row)
for i in range(len(frequencies)):
rows[2 + i].append(sum(rows[2 + i][1:]))
last_row = [""]
min_freq_index = 0
max_freq_index = len(frequencies) -1
while(frequencies[min_freq_index] < self.min_frequency):
min_freq_index += 1
while (frequencies[max_freq_index] > self.max_frequency):
max_freq_index -= 1
print(max_freq_index)
print(min_freq_index)
for i in range(len(pressures)):
last_row.append(sum(absorbances[i][min_freq_index:max_freq_index+1 ]))
rows.append(last_row)
return rows
def get_csv_matrix_rows(self, measured, uuid):
frequencies = list(map(lambda n: float(n.text), measured.iter(uuid + "Frequency")))
info = self.get_csv_rows(measured, uuid, ["Measurement", "Frequencies"])[0]
spds = []
pressures = []
absorbances = []
for node in measured.find(uuid + "Measurement"):
tag = self.get_tag(node)
if tag == "SinglePressureData":
spds.append(self.get_csv_rows(node, uuid, ["Absorbances"])[0])
absorbances.append(self.get_absorbances(node, uuid))
pressures.append(spds[-1]["Pressure"])
del spds[-1]["Pressure"]
else:
pass
row = {"Pressure": "", "Frequency": "", "Absorbance": ""}
row.update(info)
rows = []
i = 0
for press in pressures:
j = 0
copy = row.copy()
copy["Pressure"] = press
for freq in frequencies:
copy["Frequency"] = freq
copy["Absorbance"] = absorbances[i][j]
copy.update(spds[i])
rows.append(copy.copy())
j += 1
i += 1
return rows
def get_absorbances(self, node, uuid):
return list(map(lambda n: float(n.text), node.find(uuid + "Absorbances")))
def column_string(self, n):
string = ""
while n > 0:
n, remainder = divmod(n - 1, 26)
string = chr(65 + remainder) + string
return string