Facebook
From Hot Pheasant, 4 Years ago, written in Python.
">

A PHP Error was encountered

Severity: Notice

Message: Trying to access array offset on value of type bool

Filename: view/view.php

Line Number: 33

from

A PHP Error was encountered

Severity: Notice

Message: Trying to access array offset on value of type bool

Filename: view/view.php

Line Number: 33

- view diff
Embed
Download Paste or View Raw
Hits: 218
  1. import os
  2. import xml.etree.ElementTree as et
  3. import csv
  4. import xlsxwriter
  5. import threading
  6.  
  7. from scipy import interpolate
  8.  
  9. class Converter:
  10.  
  11.     def __init__(self, in_path, out_path, min_frequency=226, max_frequency=8000):
  12.         self.in_path = in_path
  13.         self.out_path = out_path
  14.         self.set_min_frequency(min_frequency)
  15.         self.set_max_frequency(max_frequency)
  16.  
  17.  
  18.     def set_min_frequency(self, min_frequency):
  19.         self.min_frequency = min_frequency
  20.  
  21.     def set_max_frequency(self, max_frequency):
  22.         self.max_frequency = max_frequency
  23.  
  24.     def convert(self):
  25.         dir_path = self.create_directory()
  26.  
  27.         files_names = os.listdir(self.in_path)
  28.         threads = []
  29.         for file_name in files_names:
  30.             complete_path = os.path.join(self.in_path, file_name)
  31.  
  32.             if os.path.splitext(complete_path)[1] != ".xml":
  33.                 continue
  34.  
  35.             tree = et.parse(complete_path)
  36.             root = tree.getroot()
  37.  
  38.             uuid = self.get_uuid(root)
  39.  
  40.             plain_file_name = os.path.splitext(file_name)[0]
  41.             print(plain_file_name + " loading...")
  42.  
  43.             t = threading.Thread(target=self.convert_file, args=(dir_path, plain_file_name, root, uuid))
  44.             t.setName(plain_file_name)
  45.             t.start()
  46.             threads.append(t)
  47.  
  48.         for thread in threads:
  49.             thread.join()
  50.             print(thread.name + " has been succesfully converted")
  51.         print("\nConverting has ended")
  52.  
  53.     def get_uuid(self, root):
  54.         for attrib in root.attrib:
  55.             if "schemaLocation" in attrib:
  56.                 return '{' + root.get(attrib).rsplit(' ', 1)[0] + '}'  # getting UUID
  57.         return ""
  58.  
  59.  
  60.     def create_directory(self):
  61.         dir_path = os.path.join(self.out_path, "csv_files")
  62.         if not os.path.isdir(dir_path):
  63.             os.makedirs(dir_path)
  64.             return dir_path
  65.         else:
  66.             '''
  67.            toFormat = dir_path + "({})"
  68.            i = 1
  69.            dir_path = toFormat.format(i)
  70.            while os.path.isdir(dir_path):
  71.                dir_path = toFormat.format(i)
  72.                print(dir_path)
  73.                print(toFormat)
  74.                i += 1
  75.            os.makedirs(dir_path)
  76.            '''
  77.         return dir_path
  78.  
  79.     def convert_file(self, dir_path, file_name, root, uuid):
  80.         path_to_format = os.path.join(dir_path, file_name + '-{} ({}).{}')
  81.         tests = list(root.iter(uuid + "Test"))
  82.         threads = []
  83.  
  84.         for test in tests:
  85.             print(" " + file_name + ": " + self.get_type(test, uuid) + " is loading...")
  86.             i = 0
  87.             for measured in test.iter(uuid + "Measured"):
  88.                 path = path_to_format.format(self.get_name(test, uuid), measured.find(uuid + "EarSide").text, "csv")
  89.                
  90.                 t = threading.Thread(target=self.create_file, args=(test, measured, path, uuid))
  91.                 t.setName(file_name + " " + self.get_type(test, uuid) + "_" + str(i))
  92.                 t.start()
  93.                 threads.append(t)
  94.                 i += 1
  95.  
  96.         for thread in threads:
  97.             thread.join()
  98.             print(thread.name + " is completed")
  99.  
  100.     def create_file(self, test, measured, path, uuid):
  101.  
  102.         setting_node = list(test.iter(uuid + "TypeSettings"))[0][0]
  103.         settings_header = self.extract_names(setting_node, uuid, shallow=True)
  104.         settings_row = self.get_csv_rows(setting_node, uuid, shallow=True)
  105.  
  106.         if self.get_type(test, uuid) == "WideBandTympanometry":
  107.             # csv
  108.             '''
  109.            full_path = path.format(test_name, ear_side, "csv")
  110.            rows = self.get_csv_matrix_rows(measured)
  111.            header = rows[0].keys()
  112.            with open(full_path, 'w+') as f:
  113.                csv_writer = csv.DictWriter(f, header)
  114.                csv_writer.writeheader()
  115.                csv_writer.writerows(rows)
  116.            '''
  117.             # csv matrix
  118.             data = self.get_data(measured, uuid)
  119.             matrix_rows = self.create_matrix_rows(data[0], data[1], data[2])
  120.  
  121.             tables = []
  122.  
  123.             tymp_markups = list(measured.iter(uuid + "TympData"))
  124.             tables.append(self.create_table(tymp_markups, uuid, header=False))
  125.             markups = list(measured.find(uuid + "Measurement"))
  126.             for pred in [lambda m: "TympData" in m.tag,
  127.                          lambda m: "AbsorbanceData" in m.tag]:
  128.                 tables.append(self.create_table(list(filter(pred, markups)), uuid))
  129.  
  130.  
  131.             with open(path, 'w+', newline="") as f:
  132.                 dict_writer = csv.DictWriter(f, settings_header)
  133.                 dict_writer.writeheader()
  134.                 dict_writer.writerows(settings_row)
  135.  
  136.                 csv_writer = csv.writer(f)
  137.                 csv_writer.writerow([""])
  138.                 csv_writer.writerows(matrix_rows)
  139.                 for table in tables:
  140.                     csv_writer.writerow([""])
  141.                     csv_writer.writerows(table)
  142.  
  143.             '''
  144.            ##Matrix heat map
  145.            data = self.get_analytic_continuation(data[0], data[1], data[2])
  146.            full_path = path.format(test_name + " heat map", ear_side, "xlsx")
  147.  
  148.            self.create_workbook(full_path,
  149.                                 data[0],
  150.                                 data[1],
  151.                                 data[2])
  152.            '''
  153.         else:
  154.             header = self.extract_names(measured, uuid)
  155.             rows = self.get_csv_rows(measured, uuid)
  156.             with open(path, 'w+', newline="") as f:
  157.                 dict_writer = csv.DictWriter(f, settings_header)
  158.                 dict_writer.writeheader()
  159.                 dict_writer.writerows(settings_row)
  160.  
  161.                 csv.writer(f).writerow([""])
  162.  
  163.                 csv_writer = csv.DictWriter(f, header)
  164.                 csv_writer.writeheader()
  165.                 csv_writer.writerows(rows)
  166.  
  167.     def get_type(self, test, uuid):
  168.         return list(list(test.iter(uuid + "TypeSettings"))[0])[0].tag.replace(uuid, "")
  169.  
  170.     def get_name(self, test, uuid):
  171.         return test.find(uuid + "TestName").text
  172.  
  173.     def get_tag(self, node, uuid):
  174.         return node.tag.replace(uuid, "")
  175.  
  176.     def extract_names(self, node, uuid, do_not_check=[], shallow=False):
  177.         header = []
  178.         used = []
  179.         nodes = [node]
  180.         while len(nodes) > 0:
  181.             current_node = nodes.pop(0)
  182.             for subnode in list(current_node)[::-1]:
  183.                 tag = self.get_tag(subnode, uuid)
  184.                 if tag in used or tag in do_not_check:
  185.                     continue
  186.  
  187.                 if len(list(subnode)) > 0 and not shallow:
  188.                     nodes.append(subnode)
  189.                 else:
  190.                     header.append(tag)
  191.  
  192.                 used.append(tag)
  193.  
  194.         return header[::-1]
  195.  
  196.     def get_csv_rows(self, start_node, uuid, do_not_check=[], shallow=False):
  197.         return self.get_csv_rows_helper(start_node, {}, uuid, do_not_check, shallow)
  198.  
  199.     def get_csv_rows_helper(self, start_node, row, uuid, do_not_check, shallow=False):
  200.         to_visit = []
  201.         for subnode in start_node:
  202.             tag = self.get_tag(subnode, uuid)
  203.             if len(list(subnode)) > 0:
  204.                 if tag not in do_not_check and not shallow:
  205.                     to_visit.append(subnode)
  206.             else:
  207.                 row[tag] = subnode.text
  208.  
  209.         if len(to_visit) == 0:
  210.             return [row]
  211.         else:
  212.             rows = []
  213.             for node in to_visit:
  214.                 new_rows = self.get_csv_rows_helper(node, row.copy(), uuid, do_not_check)
  215.                 rows.extend(new_rows)
  216.  
  217.             return rows
  218.  
  219.     def get_data(self, measured, uuid):
  220.         frequencies = []
  221.         pressures = []
  222.         absorbances = []
  223.  
  224.         for frequency in measured.find(uuid + "Frequencies"):
  225.             frequencies.append(float(frequency.text))
  226.  
  227.         for pressure_data in measured.find(uuid + "Measurement").findall(uuid + "SinglePressureData"):
  228.             pressure = float(pressure_data.find(uuid + "Pressure").text)
  229.             pressures.append(pressure)
  230.  
  231.             absorbances.append([])
  232.             for absorbance in pressure_data.find(uuid + "Absorbances"):
  233.                 absorbances[-1].append(float(absorbance.text))
  234.  
  235.         return [frequencies, pressures, absorbances]
  236.  
  237.     def create_table(self, markups, uuid, header=True):
  238.         names = self.extract_names(markups[0], uuid, shallow=True)
  239.         rows = []
  240.  
  241.         if header:
  242.             rows.append([""])
  243.             rows[-1].extend([self.get_tag(markup, uuid) for markup in markups])
  244.  
  245.         for name in names:
  246.             rows.append([name])
  247.             for markup in markups:
  248.                 rows[-1].append(markup.find(uuid + name).text)
  249.  
  250.         return rows
  251.  
  252.     def get_analytic_continuation(self, frequencies, pressures, absorbances):
  253.         inter_frequencies = frequencies.copy()
  254.         inter_pressures = []
  255.         inter_absorbances = []
  256.  
  257.         min_press = pressures[-1]
  258.         max_press = pressures[0]
  259.  
  260.         if pressures[0] < pressures[-1]:
  261.             is_increasing = True
  262.             min_press = max_press
  263.             max_press = pressures[-1]
  264.         else:
  265.             is_increasing = False
  266.  
  267.  
  268.         for i in range(int(max_press - min_press)):
  269.             if is_increasing:
  270.                 inter_pressures.append(min_press + i)
  271.             else:
  272.                 inter_pressures.append(max_press - i)
  273.  
  274.             inter_absorbances.append([])
  275.             for j in range(len(frequencies)):
  276.                 inter_absorbances[-1].append(0)
  277.  
  278.         for i in range(len(frequencies)):
  279.             vals = [absorbances[j][i] for j in range(len(pressures))]
  280.             f = interpolate.interp1d(pressures, vals)
  281.             for press in inter_pressures:
  282.                 if is_increasing:
  283.                     inter_absorbances[int(press - min_press)][i] = f(press).flat[0]
  284.                 else:
  285.                     inter_absorbances[int(press - max_press)][i] = f(press).flat[0]
  286.         return [inter_frequencies, inter_pressures, inter_absorbances]
  287.  
  288.  
  289.     def create_workbook(self, path, frequencies, pressures, absorbances):
  290.         workbook = xlsxwriter.Workbook(path)
  291.         worksheet = workbook.add_worksheet()
  292.  
  293.         worksheet.write(0, 0, "Absorbances")
  294.         worksheet.write(1, 0, "Frequencies")
  295.         worksheet.write_column(2, 0, frequencies)
  296.         worksheet.write(0, 1, "Pressures")
  297.         worksheet.write_row(0, 2, pressures)
  298.  
  299.         if (len(absorbances) > 0):
  300.             for i in range(0, len(absorbances)):
  301.                 worksheet.write_column(2, 2 + i, absorbances[i])
  302.                 letter = self.column_string(3 + i)
  303.                 sum_formula = "=SUM({}{}:{}{})".format(letter, 4, letter, len(absorbances[0]) + 1)
  304.                 worksheet.write_formula(len(absorbances[0]) + 1, 2 + i, sum_formula)
  305.  
  306.             worksheet.conditional_format(2, 2, len(absorbances[0]) + 1, len(absorbances) + 1,
  307.                                          {
  308.                                              'type': '3_color_scale',
  309.                                              'min_color': 'green',
  310.                                              'mid_color': 'yellow',
  311.                                              'max_color': "red"
  312.                                          })
  313.  
  314.         workbook.close()
  315.  
  316.     def create_matrix_rows(self, frequencies, pressures, absorbances):
  317.         rows = []
  318.         row1 = ["Absorbance", "Pressures", "from " + str(self.min_frequency), "to " + str(self.max_frequency)]
  319.         row2 = ["Frequencies"]
  320.         row2.extend(pressures)
  321.         rows.append(row1)
  322.         rows.append(row2)
  323.  
  324.         for i in range(len(frequencies)):
  325.             new_row = [frequencies[i]]
  326.             for j in range(len(pressures)):
  327.                 new_row.append(absorbances[j][i])
  328.             rows.append(new_row)
  329.  
  330.         for i in range(len(frequencies)):
  331.             rows[2 + i].append(sum(rows[2 + i][1:]))
  332.  
  333.         last_row = [""]
  334.         min_freq_index = 0
  335.         max_freq_index = len(frequencies) -1
  336.         while(frequencies[min_freq_index] < self.min_frequency):
  337.             min_freq_index += 1
  338.         while (frequencies[max_freq_index] > self.max_frequency):
  339.             max_freq_index -= 1
  340.  
  341.         print(max_freq_index)
  342.         print(min_freq_index)
  343.  
  344.         for i in range(len(pressures)):
  345.             last_row.append(sum(absorbances[i][min_freq_index:max_freq_index+1  ]))
  346.  
  347.         rows.append(last_row)
  348.         return rows
  349.  
  350.     def get_csv_matrix_rows(self, measured, uuid):
  351.         frequencies = list(map(lambda n: float(n.text), measured.iter(uuid + "Frequency")))
  352.  
  353.         info = self.get_csv_rows(measured, uuid, ["Measurement", "Frequencies"])[0]
  354.  
  355.         spds = []
  356.         pressures = []
  357.         absorbances = []
  358.         for node in measured.find(uuid + "Measurement"):
  359.             tag = self.get_tag(node)
  360.             if tag == "SinglePressureData":
  361.                 spds.append(self.get_csv_rows(node, uuid, ["Absorbances"])[0])
  362.                 absorbances.append(self.get_absorbances(node, uuid))
  363.                 pressures.append(spds[-1]["Pressure"])
  364.                 del spds[-1]["Pressure"]
  365.             else:
  366.                 pass
  367.  
  368.         row = {"Pressure": "", "Frequency": "", "Absorbance": ""}
  369.         row.update(info)
  370.  
  371.         rows = []
  372.         i = 0
  373.         for press in pressures:
  374.             j = 0
  375.             copy = row.copy()
  376.             copy["Pressure"] = press
  377.             for freq in frequencies:
  378.                 copy["Frequency"] = freq
  379.                 copy["Absorbance"] = absorbances[i][j]
  380.                 copy.update(spds[i])
  381.                 rows.append(copy.copy())
  382.                 j += 1
  383.             i += 1
  384.  
  385.         return rows
  386.  
  387.     def get_absorbances(self, node, uuid):
  388.         return list(map(lambda n: float(n.text), node.find(uuid + "Absorbances")))
  389.  
  390.     def column_string(self, n):
  391.         string = ""
  392.         while n > 0:
  393.             n, remainder = divmod(n - 1, 26)
  394.             string = chr(65 + remainder) + string
  395.         return string