import os import xml.etree.ElementTree as et import csv import xlsxwriter import threading class Converter: def __init__(self, in_path, out_path, min_frequency=226, max_frequency=8000): self.in_path = in_path self.out_path = out_path self.set_min_frequency(min_frequency) self.set_max_frequency(max_frequency) def set_min_frequency(self, min_frequency): self.min_frequency = min_frequency def set_max_frequency(self, max_frequency): self.max_frequency = max_frequency def convert(self): dir_path = self.create_directory() files_names = os.listdir(self.in_path) threads = [] for file_name in files_names: complete_path = os.path.join(self.in_path, file_name) if os.path.splitext(complete_path)[1] != ".xml": continue tree = et.parse(complete_path) root = tree.getroot() uuid = self.get_uuid(root) plain_file_name = os.path.splitext(file_name)[0] print(plain_file_name + " loading...") t = threading.Thread(target=self.convert_file, args=(dir_path, plain_file_name, root, uuid)) t.setName(plain_file_name) t.start() threads.append(t) for thread in threads: thread.join() print(thread.name + " has been succesfully converted") print("nConverting has ended") def get_uuid(self, root): for attrib in root.attrib: if "schemaLocation" in attrib: return '{' + root.get(attrib).rsplit(' ', 1)[0] + '}' # getting UUID return "" def create_directory(self): dir_path = os.path.join(self.out_path, "csv_files") if not os.path.isdir(dir_path): os.makedirs(dir_path) return dir_path else: ''' toFormat = dir_path + "({})" i = 1 dir_path = toFormat.format(i) while os.path.isdir(dir_path): dir_path = toFormat.format(i) print(dir_path) print(toFormat) i += 1 os.makedirs(dir_path) ''' return dir_path def convert_file(self, dir_path, file_name, root, uuid): path_to_format = os.path.join(dir_path, file_name + '-{} ({}).{}') tests = list(root.iter(uuid + "Test")) threads = [] for test in tests: print(" " + file_name + ": " + self.get_type(test, uuid) + " is loading...") i = 0 for measured in test.iter(uuid + "Measured"): path = path_to_format.format(self.get_name(test, uuid), measured.find(uuid + "EarSide").text, "csv") t = threading.Thread(target=self.create_file, args=(test, measured, path, uuid)) t.setName(file_name + " " + self.get_type(test, uuid) + "_" + str(i)) t.start() threads.append(t) i += 1 for thread in threads: thread.join() print(thread.name + " is completed") def create_file(self, test, measured, path, uuid): setting_node = list(test.iter(uuid + "TypeSettings"))[0][0] settings_header = self.extract_names(setting_node, uuid, shallow=True) settings_row = self.get_csv_rows(setting_node, uuid, shallow=True) if self.get_type(test, uuid) == "WideBandTympanometry": # csv matrix data = self.get_data(measured, uuid) matrix_rows = self.create_matrix_rows(data[0], data[1], data[2]) tables = [] tymp_markups = list(measured.iter(uuid + "TympData")) tables.append(self.create_table(tymp_markups, uuid, header=False)) markups = list(measured.find(uuid + "Measurement")) for pred in [lambda m: "TympData" in m.tag, lambda m: "AbsorbanceData" in m.tag]: tables.append(self.create_table(list(filter(pred, markups)), uuid)) with open(path, 'w+', newline="") as f: dict_writer = csv.DictWriter(f, settings_header) dict_writer.writeheader() dict_writer.writerows(settings_row) csv_writer = csv.writer(f) csv_writer.writerow([""]) csv_writer.writerows(matrix_rows) for table in tables: csv_writer.writerow([""]) csv_writer.writerows(table) else: header = self.extract_names(measured, uuid) rows = self.get_csv_rows(measured, uuid) with open(path, 'w+', newline="") as f: dict_writer = csv.DictWriter(f, settings_header) dict_writer.writeheader() dict_writer.writerows(settings_row) csv.writer(f).writerow([""]) csv_writer = csv.DictWriter(f, header) csv_writer.writeheader() csv_writer.writerows(rows) def get_type(self, test, uuid): return list(list(test.iter(uuid + "TypeSettings"))[0])[0].tag.replace(uuid, "") def get_name(self, test, uuid): return test.find(uuid + "TestName").text def get_tag(self, node, uuid): return node.tag.replace(uuid, "") def extract_names(self, node, uuid, do_not_check=[], shallow=False): header = [] used = [] nodes = [node] while len(nodes) > 0: current_node = nodes.pop(0) for subnode in list(current_node)[::-1]: tag = self.get_tag(subnode, uuid) if tag in used or tag in do_not_check: continue if len(list(subnode)) > 0 and not shallow: nodes.append(subnode) else: header.append(tag) used.append(tag) return header[::-1] def get_csv_rows(self, start_node, uuid, do_not_check=[], shallow=False): return self.get_csv_rows_helper(start_node, {}, uuid, do_not_check, shallow) def get_csv_rows_helper(self, start_node, row, uuid, do_not_check, shallow=False): to_visit = [] for subnode in start_node: tag = self.get_tag(subnode, uuid) if len(list(subnode)) > 0: if tag not in do_not_check and not shallow: to_visit.append(subnode) else: row[tag] = subnode.text if len(to_visit) == 0: return [row] else: rows = [] for node in to_visit: new_rows = self.get_csv_rows_helper(node, row.copy(), uuid, do_not_check) rows.extend(new_rows) return rows def get_data(self, measured, uuid): frequencies = [] pressures = [] absorbances = [] for frequency in measured.find(uuid + "Frequencies"): frequencies.append(float(frequency.text)) for pressure_data in measured.find(uuid + "Measurement").findall(uuid + "SinglePressureData"): pressure = float(pressure_data.find(uuid + "Pressure").text) pressures.append(pressure) absorbances.append([]) for absorbance in pressure_data.find(uuid + "Absorbances"): absorbances[-1].append(float(absorbance.text)) return [frequencies, pressures, absorbances] def create_table(self, markups, uuid, header=True): names = self.extract_names(markups[0], uuid, shallow=True) rows = [] if header: rows.append([""]) rows[-1].extend([self.get_tag(markup, uuid) for markup in markups]) for name in names: rows.append([name]) for markup in markups: rows[-1].append(markup.find(uuid + name).text) return rows def create_matrix_rows(self, frequencies, pressures, absorbances): rows = [] row1 = ["Absorbance", "Pressures", "from " + str(self.min_frequency), "to " + str(self.max_frequency)] row2 = ["Frequencies"] row2.extend(pressures) rows.append(row1) rows.append(row2) for i in range(len(frequencies)): new_row = [frequencies[i]] for j in range(len(pressures)): new_row.append(absorbances[j][i]) rows.append(new_row) for i in range(len(frequencies)): rows[2 + i].append(sum(rows[2 + i][1:])) last_row = [""] min_freq_index = 0 max_freq_index = len(frequencies) - 1 while (frequencies[min_freq_index] < self.min_frequency): min_freq_index += 1 while (frequencies[max_freq_index] > self.max_frequency): max_freq_index -= 1 for i in range(len(pressures)): last_row.append(sum(absorbances[i][min_freq_index:max_freq_index])) rows.append(last_row) return rows