import os
import xml.etree.ElementTree as et
import csv
import xlsxwriter
import threading
class Converter:
def __init__(self, in_path, out_path, min_frequency=226, max_frequency=8000):
self.in_path = in_path
self.out_path = out_path
self.set_min_frequency(min_frequency)
self.set_max_frequency(max_frequency)
def set_min_frequency(self, min_frequency):
self.min_frequency = min_frequency
def set_max_frequency(self, max_frequency):
self.max_frequency = max_frequency
def convert(self):
dir_path = self.create_directory()
files_names = os.listdir(self.in_path)
threads = []
for file_name in files_names:
complete_path = os.path.join(self.in_path, file_name)
if os.path.splitext(complete_path)[1] != ".xml":
continue
tree = et.parse(complete_path)
root = tree.getroot()
uuid = self.get_uuid(root)
plain_file_name = os.path.splitext(file_name)[0]
print(plain_file_name + " loading...")
t = threading.Thread(target=self.convert_file, args=(dir_path, plain_file_name, root, uuid))
t.setName(plain_file_name)
t.start()
threads.append(t)
for thread in threads:
thread.join()
print(thread.name + " has been succesfully converted")
print("nConverting has ended")
def get_uuid(self, root):
for attrib in root.attrib:
if "schemaLocation" in attrib:
return '{' + root.get(attrib).rsplit(' ', 1)[0] + '}' # getting UUID
return ""
def create_directory(self):
dir_path = os.path.join(self.out_path, "csv_files")
if not os.path.isdir(dir_path):
os.makedirs(dir_path)
return dir_path
else:
'''
toFormat = dir_path + "({})"
i = 1
dir_path = toFormat.format(i)
while os.path.isdir(dir_path):
dir_path = toFormat.format(i)
print(dir_path)
print(toFormat)
i += 1
os.makedirs(dir_path)
'''
return dir_path
def convert_file(self, dir_path, file_name, root, uuid):
path_to_format = os.path.join(dir_path, file_name + '-{} ({}).{}')
tests = list(root.iter(uuid + "Test"))
threads = []
for test in tests:
print(" " + file_name + ": " + self.get_type(test, uuid) + " is loading...")
i = 0
for measured in test.iter(uuid + "Measured"):
path = path_to_format.format(self.get_name(test, uuid), measured.find(uuid + "EarSide").text, "csv")
t = threading.Thread(target=self.create_file, args=(test, measured, path, uuid))
t.setName(file_name + " " + self.get_type(test, uuid) + "_" + str(i))
t.start()
threads.append(t)
i += 1
for thread in threads:
thread.join()
print(thread.name + " is completed")
def create_file(self, test, measured, path, uuid):
setting_node = list(test.iter(uuid + "TypeSettings"))[0][0]
settings_header = self.extract_names(setting_node, uuid, shallow=True)
settings_row = self.get_csv_rows(setting_node, uuid, shallow=True)
if self.get_type(test, uuid) == "WideBandTympanometry":
# csv matrix
data = self.get_data(measured, uuid)
matrix_rows = self.create_matrix_rows(data[0], data[1], data[2])
tables = []
tymp_markups = list(measured.iter(uuid + "TympData"))
tables.append(self.create_table(tymp_markups, uuid, header=False))
markups = list(measured.find(uuid + "Measurement"))
for pred in [lambda m: "TympData" in m.tag,
lambda m: "AbsorbanceData" in m.tag]:
tables.append(self.create_table(list(filter(pred, markups)), uuid))
with open(path, 'w+', newline="") as f:
dict_writer = csv.DictWriter(f, settings_header)
dict_writer.writeheader()
dict_writer.writerows(settings_row)
csv_writer = csv.writer(f)
csv_writer.writerow([""])
csv_writer.writerows(matrix_rows)
for table in tables:
csv_writer.writerow([""])
csv_writer.writerows(table)
else:
header = self.extract_names(measured, uuid)
rows = self.get_csv_rows(measured, uuid)
with open(path, 'w+', newline="") as f:
dict_writer = csv.DictWriter(f, settings_header)
dict_writer.writeheader()
dict_writer.writerows(settings_row)
csv.writer(f).writerow([""])
csv_writer = csv.DictWriter(f, header)
csv_writer.writeheader()
csv_writer.writerows(rows)
def get_type(self, test, uuid):
return list(list(test.iter(uuid + "TypeSettings"))[0])[0].tag.replace(uuid, "")
def get_name(self, test, uuid):
return test.find(uuid + "TestName").text
def get_tag(self, node, uuid):
return node.tag.replace(uuid, "")
def extract_names(self, node, uuid, do_not_check=[], shallow=False):
header = []
used = []
nodes = [node]
while len(nodes) > 0:
current_node = nodes.pop(0)
for subnode in list(current_node)[::-1]:
tag = self.get_tag(subnode, uuid)
if tag in used or tag in do_not_check:
continue
if len(list(subnode)) > 0 and not shallow:
nodes.append(subnode)
else:
header.append(tag)
used.append(tag)
return header[::-1]
def get_csv_rows(self, start_node, uuid, do_not_check=[], shallow=False):
return self.get_csv_rows_helper(start_node, {}, uuid, do_not_check, shallow)
def get_csv_rows_helper(self, start_node, row, uuid, do_not_check, shallow=False):
to_visit = []
for subnode in start_node:
tag = self.get_tag(subnode, uuid)
if len(list(subnode)) > 0:
if tag not in do_not_check and not shallow:
to_visit.append(subnode)
else:
row[tag] = subnode.text
if len(to_visit) == 0:
return [row]
else:
rows = []
for node in to_visit:
new_rows = self.get_csv_rows_helper(node, row.copy(), uuid, do_not_check)
rows.extend(new_rows)
return rows
def get_data(self, measured, uuid):
frequencies = []
pressures = []
absorbances = []
for frequency in measured.find(uuid + "Frequencies"):
frequencies.append(float(frequency.text))
for pressure_data in measured.find(uuid + "Measurement").findall(uuid + "SinglePressureData"):
pressure = float(pressure_data.find(uuid + "Pressure").text)
pressures.append(pressure)
absorbances.append([])
for absorbance in pressure_data.find(uuid + "Absorbances"):
absorbances[-1].append(float(absorbance.text))
return [frequencies, pressures, absorbances]
def create_table(self, markups, uuid, header=True):
names = self.extract_names(markups[0], uuid, shallow=True)
rows = []
if header:
rows.append([""])
rows[-1].extend([self.get_tag(markup, uuid) for markup in markups])
for name in names:
rows.append([name])
for markup in markups:
rows[-1].append(markup.find(uuid + name).text)
return rows
def create_matrix_rows(self, frequencies, pressures, absorbances):
rows = []
row1 = ["Absorbance", "Pressures", "from " + str(self.min_frequency), "to " + str(self.max_frequency)]
row2 = ["Frequencies"]
row2.extend(pressures)
rows.append(row1)
rows.append(row2)
for i in range(len(frequencies)):
new_row = [frequencies[i]]
for j in range(len(pressures)):
new_row.append(absorbances[j][i])
rows.append(new_row)
for i in range(len(frequencies)):
rows[2 + i].append(sum(rows[2 + i][1:]))
last_row = [""]
min_freq_index = 0
max_freq_index = len(frequencies) - 1
while (frequencies[min_freq_index] < self.min_frequency):
min_freq_index += 1
while (frequencies[max_freq_index] > self.max_frequency):
max_freq_index -= 1
for i in range(len(pressures)):
last_row.append(sum(absorbances[i][min_freq_index:max_freq_index]))
rows.append(last_row)
return rows