Facebook
From Abrupt Hummingbird, 9 Months ago, written in Python.
This paste is a reply to converter from ProxPxD - view diff
Embed
Download Paste or View Raw
Hits: 96
  1. import os
  2. import xml.etree.ElementTree as et
  3. import csv
  4. import xlsxwriter
  5. import threading
  6.  
  7. class Converter:
  8.  
  9.     def __init__(self, in_path, out_path, min_frequency=226, max_frequency=8000):
  10.         self.in_path = in_path
  11.         self.out_path = out_path
  12.         self.set_min_frequency(min_frequency)
  13.         self.set_max_frequency(max_frequency)
  14.  
  15.     def set_min_frequency(self, min_frequency):
  16.         self.min_frequency = min_frequency
  17.  
  18.     def set_max_frequency(self, max_frequency):
  19.         self.max_frequency = max_frequency
  20.  
  21.     def convert(self):
  22.         dir_path = self.create_directory()
  23.  
  24.         files_names = os.listdir(self.in_path)
  25.         threads = []
  26.         for file_name in files_names:
  27.             complete_path = os.path.join(self.in_path, file_name)
  28.  
  29.             if os.path.splitext(complete_path)[1] != ".xml":
  30.                 continue
  31.  
  32.             tree = et.parse(complete_path)
  33.             root = tree.getroot()
  34.  
  35.             uuid = self.get_uuid(root)
  36.  
  37.             plain_file_name = os.path.splitext(file_name)[0]
  38.             print(plain_file_name + " loading...")
  39.  
  40.             t = threading.Thread(target=self.convert_file, args=(dir_path, plain_file_name, root, uuid))
  41.             t.setName(plain_file_name)
  42.             t.start()
  43.             threads.append(t)
  44.  
  45.         for thread in threads:
  46.             thread.join()
  47.             print(thread.name + " has been succesfully converted")
  48.         print("nConverting has ended")
  49.  
  50.     def get_uuid(self, root):
  51.         for attrib in root.attrib:
  52.             if "schemaLocation" in attrib:
  53.                 return '{' + root.get(attrib).rsplit(' ', 1)[0] + '}'  # getting UUID
  54.         return ""
  55.  
  56.     def create_directory(self):
  57.         dir_path = os.path.join(self.out_path, "csv_files")
  58.         if not os.path.isdir(dir_path):
  59.             os.makedirs(dir_path)
  60.             return dir_path
  61.         else:
  62.             '''
  63.            toFormat = dir_path + "({})"
  64.            i = 1
  65.            dir_path = toFormat.format(i)
  66.            while os.path.isdir(dir_path):
  67.                dir_path = toFormat.format(i)
  68.                print(dir_path)
  69.                print(toFormat)
  70.                i += 1
  71.            os.makedirs(dir_path)
  72.            '''
  73.         return dir_path
  74.  
  75.     def convert_file(self, dir_path, file_name, root, uuid):
  76.         path_to_format = os.path.join(dir_path, file_name + '-{} ({}).{}')
  77.         tests = list(root.iter(uuid + "Test"))
  78.         threads = []
  79.  
  80.         for test in tests:
  81.             print(" " + file_name + ": " + self.get_type(test, uuid) + " is loading...")
  82.             i = 0
  83.             for measured in test.iter(uuid + "Measured"):
  84.                 path = path_to_format.format(self.get_name(test, uuid), measured.find(uuid + "EarSide").text, "csv")
  85.  
  86.                 t = threading.Thread(target=self.create_file, args=(test, measured, path, uuid))
  87.                 t.setName(file_name + " " + self.get_type(test, uuid) + "_" + str(i))
  88.                 t.start()
  89.                 threads.append(t)
  90.                 i += 1
  91.  
  92.         for thread in threads:
  93.             thread.join()
  94.             print(thread.name + " is completed")
  95.  
  96.     def create_file(self, test, measured, path, uuid):
  97.  
  98.         setting_node = list(test.iter(uuid + "TypeSettings"))[0][0]
  99.         settings_header = self.extract_names(setting_node, uuid, shallow=True)
  100.         settings_row = self.get_csv_rows(setting_node, uuid, shallow=True)
  101.  
  102.         if self.get_type(test, uuid) == "WideBandTympanometry":
  103.             # csv matrix
  104.             data = self.get_data(measured, uuid)
  105.             matrix_rows = self.create_matrix_rows(data[0], data[1], data[2])
  106.  
  107.             tables = []
  108.  
  109.             tymp_markups = list(measured.iter(uuid + "TympData"))
  110.             tables.append(self.create_table(tymp_markups, uuid, header=False))
  111.  
  112.             markups = list(measured.find(uuid + "Measurement"))
  113.             for pred in [lambda m: "TympData" in m.tag,
  114.                          lambda m: "AbsorbanceData" in m.tag]:
  115.                 tables.append(self.create_table(list(filter(pred, markups)), uuid))
  116.  
  117.             with open(path, 'w+', newline="") as f:
  118.                 dict_writer = csv.DictWriter(f, settings_header)
  119.                 dict_writer.writeheader()
  120.                 dict_writer.writerows(settings_row)
  121.  
  122.                 csv_writer = csv.writer(f)
  123.                 csv_writer.writerow([""])
  124.                 csv_writer.writerows(matrix_rows)
  125.                 for table in tables:
  126.                     csv_writer.writerow([""])
  127.                     csv_writer.writerows(table)
  128.         else:
  129.             header = self.extract_names(measured, uuid)
  130.             rows = self.get_csv_rows(measured, uuid)
  131.             with open(path, 'w+', newline="") as f:
  132.                 dict_writer = csv.DictWriter(f, settings_header)
  133.                 dict_writer.writeheader()
  134.                 dict_writer.writerows(settings_row)
  135.  
  136.                 csv.writer(f).writerow([""])
  137.  
  138.                 csv_writer = csv.DictWriter(f, header)
  139.                 csv_writer.writeheader()
  140.                 csv_writer.writerows(rows)
  141.  
  142.     def get_type(self, test, uuid):
  143.         return list(list(test.iter(uuid + "TypeSettings"))[0])[0].tag.replace(uuid, "")
  144.  
  145.     def get_name(self, test, uuid):
  146.         return test.find(uuid + "TestName").text
  147.  
  148.     def get_tag(self, node, uuid):
  149.         return node.tag.replace(uuid, "")
  150.  
  151.     def extract_names(self, node, uuid, do_not_check=[], shallow=False):
  152.         header = []
  153.         used = []
  154.         nodes = [node]
  155.         while len(nodes) > 0:
  156.             current_node = nodes.pop(0)
  157.             for subnode in list(current_node)[::-1]:
  158.                 tag = self.get_tag(subnode, uuid)
  159.                 if tag in used or tag in do_not_check:
  160.                     continue
  161.  
  162.                 if len(list(subnode)) > 0 and not shallow:
  163.                     nodes.append(subnode)
  164.                 else:
  165.                     header.append(tag)
  166.  
  167.                 used.append(tag)
  168.  
  169.         return header[::-1]
  170.  
  171.     def get_csv_rows(self, start_node, uuid, do_not_check=[], shallow=False):
  172.         return self.get_csv_rows_helper(start_node, {}, uuid, do_not_check, shallow)
  173.  
  174.     def get_csv_rows_helper(self, start_node, row, uuid, do_not_check, shallow=False):
  175.         to_visit = []
  176.         for subnode in start_node:
  177.             tag = self.get_tag(subnode, uuid)
  178.             if len(list(subnode)) > 0:
  179.                 if tag not in do_not_check and not shallow:
  180.                     to_visit.append(subnode)
  181.             else:
  182.                 row[tag] = subnode.text
  183.  
  184.         if len(to_visit) == 0:
  185.             return [row]
  186.         else:
  187.             rows = []
  188.             for node in to_visit:
  189.                 new_rows = self.get_csv_rows_helper(node, row.copy(), uuid, do_not_check)
  190.                 rows.extend(new_rows)
  191.  
  192.             return rows
  193.  
  194.     def get_data(self, measured, uuid):
  195.         frequencies = []
  196.         pressures = []
  197.         absorbances = []
  198.  
  199.         for frequency in measured.find(uuid + "Frequencies"):
  200.             frequencies.append(float(frequency.text))
  201.  
  202.         for pressure_data in measured.find(uuid + "Measurement").findall(uuid + "SinglePressureData"):
  203.             pressure = float(pressure_data.find(uuid + "Pressure").text)
  204.             pressures.append(pressure)
  205.  
  206.             absorbances.append([])
  207.             for absorbance in pressure_data.find(uuid + "Absorbances"):
  208.                 absorbances[-1].append(float(absorbance.text))
  209.  
  210.         return [frequencies, pressures, absorbances]
  211.  
  212.     def create_table(self, markups, uuid, header=True):
  213.         names = self.extract_names(markups[0], uuid, shallow=True)
  214.         rows = []
  215.  
  216.         if header:
  217.             rows.append([""])
  218.             rows[-1].extend([self.get_tag(markup, uuid) for markup in markups])
  219.  
  220.         for name in names:
  221.             rows.append([name])
  222.             for markup in markups:
  223.                 rows[-1].append(markup.find(uuid + name).text)
  224.  
  225.         return rows
  226.  
  227.  
  228.     def create_matrix_rows(self, frequencies, pressures, absorbances):
  229.         rows = []
  230.         row1 = ["Absorbance", "Pressures", "from " + str(self.min_frequency), "to " + str(self.max_frequency)]
  231.         row2 = ["Frequencies"]
  232.         row2.extend(pressures)
  233.         rows.append(row1)
  234.         rows.append(row2)
  235.  
  236.         for i in range(len(frequencies)):
  237.             new_row = [frequencies[i]]
  238.             for j in range(len(pressures)):
  239.                 new_row.append(absorbances[j][i])
  240.             rows.append(new_row)
  241.  
  242.         for i in range(len(frequencies)):
  243.             rows[2 + i].append(sum(rows[2 + i][1:]))
  244.  
  245.         last_row = [""]
  246.         min_freq_index = 0
  247.         max_freq_index = len(frequencies) - 1
  248.         while (frequencies[min_freq_index] < self.min_frequency):
  249.             min_freq_index += 1
  250.         while (frequencies[max_freq_index] > self.max_frequency):
  251.             max_freq_index -= 1
  252.  
  253.         for i in range(len(pressures)):
  254.             last_row.append(sum(absorbances[i][min_freq_index:max_freq_index]))
  255.  
  256.         rows.append(last_row)
  257.         return rows