import os import xml.etree.ElementTree as et import csv import xlsxwriter import threading from scipy import interpolate class Converter: def __init__(self, in_path, out_path, min_frequency=226, max_frequency=8000): self.in_path = in_path self.out_path = out_path self.set_min_frequency(min_frequency) self.set_max_frequency(max_frequency) def set_min_frequency(self, min_frequency): self.min_frequency = min_frequency def set_max_frequency(self, max_frequency): self.max_frequency = max_frequency def convert(self): dir_path = self.create_directory() files_names = os.listdir(self.in_path) threads = [] for file_name in files_names: complete_path = os.path.join(self.in_path, file_name) if os.path.splitext(complete_path)[1] != ".xml": continue tree = et.parse(complete_path) root = tree.getroot() uuid = self.get_uuid(root) plain_file_name = os.path.splitext(file_name)[0] print(plain_file_name + " loading...") t = threading.Thread(target=self.convert_file, args=(dir_path, plain_file_name, root, uuid)) t.setName(plain_file_name) t.start() threads.append(t) for thread in threads: thread.join() print(thread.name + " has been succesfully converted") print("\nConverting has ended") def get_uuid(self, root): for attrib in root.attrib: if "schemaLocation" in attrib: return '{' + root.get(attrib).rsplit(' ', 1)[0] + '}' # getting UUID return "" def create_directory(self): dir_path = os.path.join(self.out_path, "csv_files") if not os.path.isdir(dir_path): os.makedirs(dir_path) return dir_path else: ''' toFormat = dir_path + "({})" i = 1 dir_path = toFormat.format(i) while os.path.isdir(dir_path): dir_path = toFormat.format(i) print(dir_path) print(toFormat) i += 1 os.makedirs(dir_path) ''' return dir_path def convert_file(self, dir_path, file_name, root, uuid): path_to_format = os.path.join(dir_path, file_name + '-{} ({}).{}') tests = list(root.iter(uuid + "Test")) threads = [] for test in tests: print(" " + file_name + ": " + self.get_type(test, uuid) + " is loading...") i = 0 for measured in test.iter(uuid + "Measured"): path = path_to_format.format(self.get_name(test, uuid), measured.find(uuid + "EarSide").text, "csv") t = threading.Thread(target=self.create_file, args=(test, measured, path, uuid)) t.setName(file_name + " " + self.get_type(test, uuid) + "_" + str(i)) t.start() threads.append(t) i += 1 for thread in threads: thread.join() print(thread.name + " is completed") def create_file(self, test, measured, path, uuid): setting_node = list(test.iter(uuid + "TypeSettings"))[0][0] settings_header = self.extract_names(setting_node, uuid, shallow=True) settings_row = self.get_csv_rows(setting_node, uuid, shallow=True) if self.get_type(test, uuid) == "WideBandTympanometry": # csv ''' full_path = path.format(test_name, ear_side, "csv") rows = self.get_csv_matrix_rows(measured) header = rows[0].keys() with open(full_path, 'w+') as f: csv_writer = csv.DictWriter(f, header) csv_writer.writeheader() csv_writer.writerows(rows) ''' # csv matrix data = self.get_data(measured, uuid) matrix_rows = self.create_matrix_rows(data[0], data[1], data[2]) tables = [] tymp_markups = list(measured.iter(uuid + "TympData")) tables.append(self.create_table(tymp_markups, uuid, header=False)) markups = list(measured.find(uuid + "Measurement")) for pred in [lambda m: "TympData" in m.tag, lambda m: "AbsorbanceData" in m.tag]: tables.append(self.create_table(list(filter(pred, markups)), uuid)) with open(path, 'w+', newline="") as f: dict_writer = csv.DictWriter(f, settings_header) dict_writer.writeheader() dict_writer.writerows(settings_row) csv_writer = csv.writer(f) csv_writer.writerow([""]) csv_writer.writerows(matrix_rows) for table in tables: csv_writer.writerow([""]) csv_writer.writerows(table) ''' ##Matrix heat map data = self.get_analytic_continuation(data[0], data[1], data[2]) full_path = path.format(test_name + " heat map", ear_side, "xlsx") self.create_workbook(full_path, data[0], data[1], data[2]) ''' else: header = self.extract_names(measured, uuid) rows = self.get_csv_rows(measured, uuid) with open(path, 'w+', newline="") as f: dict_writer = csv.DictWriter(f, settings_header) dict_writer.writeheader() dict_writer.writerows(settings_row) csv.writer(f).writerow([""]) csv_writer = csv.DictWriter(f, header) csv_writer.writeheader() csv_writer.writerows(rows) def get_type(self, test, uuid): return list(list(test.iter(uuid + "TypeSettings"))[0])[0].tag.replace(uuid, "") def get_name(self, test, uuid): return test.find(uuid + "TestName").text def get_tag(self, node, uuid): return node.tag.replace(uuid, "") def extract_names(self, node, uuid, do_not_check=[], shallow=False): header = [] used = [] nodes = [node] while len(nodes) > 0: current_node = nodes.pop(0) for subnode in list(current_node)[::-1]: tag = self.get_tag(subnode, uuid) if tag in used or tag in do_not_check: continue if len(list(subnode)) > 0 and not shallow: nodes.append(subnode) else: header.append(tag) used.append(tag) return header[::-1] def get_csv_rows(self, start_node, uuid, do_not_check=[], shallow=False): return self.get_csv_rows_helper(start_node, {}, uuid, do_not_check, shallow) def get_csv_rows_helper(self, start_node, row, uuid, do_not_check, shallow=False): to_visit = [] for subnode in start_node: tag = self.get_tag(subnode, uuid) if len(list(subnode)) > 0: if tag not in do_not_check and not shallow: to_visit.append(subnode) else: row[tag] = subnode.text if len(to_visit) == 0: return [row] else: rows = [] for node in to_visit: new_rows = self.get_csv_rows_helper(node, row.copy(), uuid, do_not_check) rows.extend(new_rows) return rows def get_data(self, measured, uuid): frequencies = [] pressures = [] absorbances = [] for frequency in measured.find(uuid + "Frequencies"): frequencies.append(float(frequency.text)) for pressure_data in measured.find(uuid + "Measurement").findall(uuid + "SinglePressureData"): pressure = float(pressure_data.find(uuid + "Pressure").text) pressures.append(pressure) absorbances.append([]) for absorbance in pressure_data.find(uuid + "Absorbances"): absorbances[-1].append(float(absorbance.text)) return [frequencies, pressures, absorbances] def create_table(self, markups, uuid, header=True): names = self.extract_names(markups[0], uuid, shallow=True) rows = [] if header: rows.append([""]) rows[-1].extend([self.get_tag(markup, uuid) for markup in markups]) for name in names: rows.append([name]) for markup in markups: rows[-1].append(markup.find(uuid + name).text) return rows def get_analytic_continuation(self, frequencies, pressures, absorbances): inter_frequencies = frequencies.copy() inter_pressures = [] inter_absorbances = [] min_press = pressures[-1] max_press = pressures[0] if pressures[0] < pressures[-1]: is_increasing = True min_press = max_press max_press = pressures[-1] else: is_increasing = False for i in range(int(max_press - min_press)): if is_increasing: inter_pressures.append(min_press + i) else: inter_pressures.append(max_press - i) inter_absorbances.append([]) for j in range(len(frequencies)): inter_absorbances[-1].append(0) for i in range(len(frequencies)): vals = [absorbances[j][i] for j in range(len(pressures))] f = interpolate.interp1d(pressures, vals) for press in inter_pressures: if is_increasing: inter_absorbances[int(press - min_press)][i] = f(press).flat[0] else: inter_absorbances[int(press - max_press)][i] = f(press).flat[0] return [inter_frequencies, inter_pressures, inter_absorbances] def create_workbook(self, path, frequencies, pressures, absorbances): workbook = xlsxwriter.Workbook(path) worksheet = workbook.add_worksheet() worksheet.write(0, 0, "Absorbances") worksheet.write(1, 0, "Frequencies") worksheet.write_column(2, 0, frequencies) worksheet.write(0, 1, "Pressures") worksheet.write_row(0, 2, pressures) if (len(absorbances) > 0): for i in range(0, len(absorbances)): worksheet.write_column(2, 2 + i, absorbances[i]) letter = self.column_string(3 + i) sum_formula = "=SUM({}{}:{}{})".format(letter, 4, letter, len(absorbances[0]) + 1) worksheet.write_formula(len(absorbances[0]) + 1, 2 + i, sum_formula) worksheet.conditional_format(2, 2, len(absorbances[0]) + 1, len(absorbances) + 1, { 'type': '3_color_scale', 'min_color': 'green', 'mid_color': 'yellow', 'max_color': "red" }) workbook.close() def create_matrix_rows(self, frequencies, pressures, absorbances): rows = [] row1 = ["Absorbance", "Pressures", "from " + str(self.min_frequency), "to " + str(self.max_frequency)] row2 = ["Frequencies"] row2.extend(pressures) rows.append(row1) rows.append(row2) for i in range(len(frequencies)): new_row = [frequencies[i]] for j in range(len(pressures)): new_row.append(absorbances[j][i]) rows.append(new_row) for i in range(len(frequencies)): rows[2 + i].append(sum(rows[2 + i][1:])) last_row = [""] min_freq_index = 0 max_freq_index = len(frequencies) -1 while(frequencies[min_freq_index] < self.min_frequency): min_freq_index += 1 while (frequencies[max_freq_index] > self.max_frequency): max_freq_index -= 1 print(max_freq_index) print(min_freq_index) for i in range(len(pressures)): last_row.append(sum(absorbances[i][min_freq_index:max_freq_index+1 ])) rows.append(last_row) return rows def get_csv_matrix_rows(self, measured, uuid): frequencies = list(map(lambda n: float(n.text), measured.iter(uuid + "Frequency"))) info = self.get_csv_rows(measured, uuid, ["Measurement", "Frequencies"])[0] spds = [] pressures = [] absorbances = [] for node in measured.find(uuid + "Measurement"): tag = self.get_tag(node) if tag == "SinglePressureData": spds.append(self.get_csv_rows(node, uuid, ["Absorbances"])[0]) absorbances.append(self.get_absorbances(node, uuid)) pressures.append(spds[-1]["Pressure"]) del spds[-1]["Pressure"] else: pass row = {"Pressure": "", "Frequency": "", "Absorbance": ""} row.update(info) rows = [] i = 0 for press in pressures: j = 0 copy = row.copy() copy["Pressure"] = press for freq in frequencies: copy["Frequency"] = freq copy["Absorbance"] = absorbances[i][j] copy.update(spds[i]) rows.append(copy.copy()) j += 1 i += 1 return rows def get_absorbances(self, node, uuid): return list(map(lambda n: float(n.text), node.find(uuid + "Absorbances"))) def column_string(self, n): string = "" while n > 0: n, remainder = divmod(n - 1, 26) string = chr(65 + remainder) + string return string