""" Created on Mon Apr 24 2023 @name: MainWindow.py @desc: Main window of the application @auth: Djalim Simaila @e-mail: djalim.simaila@inrae.fr """ import os from PyQt5 import QtWidgets from PyQt5.QtCore import QThread from PyQt5.QtWidgets import QFileDialog, QWidget from utils.files.input import ScannedObject from utils.files.output import format_data, save_output_file from utils.gui.pyqt.main_window.Workers.AdvancedDataWorker import AdvancedDataWorker from utils.gui.pyqt.settings.Settings import Settings from utils.gui.pyqt.about.AboutThis import AboutThis from utils.math.data_extraction import get_mean, get_radius_from_x_y, get_true_theta_from_x_y from utils.settings.SettingManager import SettingManager from utils.graph2D.visplot_render import cross_section, render2D from utils.graph3D.visplot_render import render3D from utils.gui.pyqt.main_window.UI_MainWindow import Ui_MainWindow from utils.gui.pyqt.main_window.Workers.DiscreteDataWorker import DiscreteDataProcessWorker from utils.gui.pyqt.main_window.Workers.PreProcessWorker import PreProcessWorker from utils.gui.pyqt.main_window.Workers.RawDataWorker import RawDataProcessWorker from utils.gui.pyqt.error_popup.ErrorPopup import ErrorPopup class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow): """ Main window of the application, it contains all the UI elements """ def __init__(self, parent=None): super(MainWindow, self).__init__(parent) # Persist variable to avoid re-computation self.obj = None self.raw_data= None self.discrete_data = None self.advanced_data = None # Retrieve the UI self.setupUi(self) # Setup buttons listeners self.start_analyse_button.clicked.connect(self.start_preprocess) self.input_file_choose_btn.clicked.connect(self.select_file) self.output_folder_choose_btn.clicked.connect(self.select_folder) self.show_graph_checkbox.stateChanged.connect(self.toggle_graphs) self.export_advanced_metrics.clicked.connect(self.export_advanced_data) self.refresh_morphological_indicators.clicked.connect(self.refresh_advanced_data) self.actionOuvrir_un_fichier.triggered.connect(self.select_file) self.actionSauvegarder_le_model_redress.triggered.connect(self.save_model) self.actionPr_f_rennces.triggered.connect(self.show_settings) self.actionQuitter.triggered.connect(self.close) self.actionQ_propos_de_ce_logiciel.triggered.connect(self.show_about) # add default layer combobox value and setup the listenerr when index change self.layer_ComboBox.addItems(['Aucune couche']) self.layer_ComboBox.currentIndexChanged.connect(self.layer_changed) # Prepare available graph type list for the slots combobox self.graph_type = [ "Aucun", "Mesh3D", "Coupe XZ", "Coupe YZ", "Evolution du rayon moyen", "Difference entre le rayon moyen et la moyenne des rayons", "Coupe de la couche", "Difference entre le rayon de chaque points et le rayon moyen de la couche" ] # put all slots combo boxes in a list for conveniance self.combo_boxes = [ self.slot0ComboBox, self.slot1ComboBox, self.slot2ComboBox, self.slot3ComboBox, self.slot4ComboBox, self.slot5ComboBox, self.slot6ComboBox, self.slot7ComboBox, self.slot8ComboBox, self.slot9ComboBox, self.slot10ComboBox ] # Setup all combo boxes with values and listener for combo_box in self.combo_boxes: combo_box.addItems(self.graph_type) combo_box.currentIndexChanged.connect(self.graph_type_changed) # put all slots in a list for conveniance and store their state in a string self.slots = [ [self.slot0,"Aucun"], [self.slot1,"Aucun"], [self.slot2,"Aucun"], [self.slot3,"Aucun"], [self.slot4,"Aucun"], [self.slot5,"Aucun"], [self.slot6,"Aucun"], [self.slot7,"Aucun"], [self.slot8,"Aucun"], [self.slot9,"Aucun"], [self.slot10,"Aucun"] ] # Retrieve the slot previous value from the settings for slot_nb,slot in enumerate(self.slots): slot[1] = SettingManager.get_instance().get_last_graph(slot_nb) self.combo_boxes[slot_nb].setCurrentText(slot[1]) # Graph number indicator self.graph_nb =0 self.graph_type_changed() # Construct sub windows self.settings_window = Settings() self.about_window = AboutThis() # Variable to check if parametters has changed to avoid re-computation self.has_changed = True self.old_discretisation_value = None # Task counter and task number, used to know when the analysis # is considered finished # Current tasks are # - process raw data # - process discrete data # - process morphological indicators (advanced data) self.completed_tasks = 0 self.total_tasks = 3 ############################################################################### # # # # # Input/Setting Management # # # # # ############################################################################### def select_file(self): """ Open a file dialog to select the input file, if the folder_path is empty, it fills it with the files folder. """ file = QFileDialog.getOpenFileName()[0] self.input_file_path.setPlainText(file) self.output_file_prefix.setText(os.path.splitext(os.path.basename(file))[0]) if self.output_folder_path.toPlainText() is None or self.output_folder_path.toPlainText() == "": self.output_folder_path.setPlainText(os.path.dirname(file)) self.has_changed = True def select_folder(self): """ Open a file dialog to select the output folder """ self.output_folder_path.setPlainText(QFileDialog.getExistingDirectory()) self.has_changed = True def check_input_file(self): """ Check if the input file is valid by checking: - if it exists - if its extension is .obj """ authorised_extensions = ScannedObject.authorised_extensions if not os.path.isfile(self.input_file_path.toPlainText()) : ErrorPopup("Fichier d'entrée invalide: Aucun fichier selectionné, ou le fichier n'existe pas", button_label="Choisir un fichier d'entrée", button_callback=self.select_file).show_popup() return False if os.path.splitext(self.input_file_path.toPlainText())[1].lower() not in authorised_extensions: ErrorPopup("Fichier d'entrée invalide: l'extension du fichier est incorrecte ", button_label="Choisir un fichier d'entrée", button_callback=self.select_file).show_popup() return False if os.path.splitext(self.input_file_path.toPlainText())[1].lower() == ".xyz": ErrorPopup("Attention!, Ce programme ne peut pas redresser les fichiers .xyz, assurez vous que l'objet est deja droit. Il sera aussi impossible d'utiliser le graphe 'Mesh3D'").show_popup() return True def check_output_folder(self): """ Check if the output folder is valid by cheking if it exists. """ if not os.path.isdir(self.output_folder_path.toPlainText()): ErrorPopup("Dossier de sortie invalide", button_label="Choisir un dossier de sortie", button_callback=self.select_folder).show_popup() return False return True ############################################################################### # # # # # Data Processing # # # # # ############################################################################### def start_preprocess(self): """ Start the analyse, create the thread and connect the signals. """ if not self.check_input_file(): return if not self.check_output_folder(): return settings = SettingManager.get_instance() self.clear_graphs() self.completed_tasks = 0 if not self.has_changed and self.old_discretisation_value == self.discretisation_value_selector.value() and not settings.has_changed: self.completed_tasks = self.total_tasks -1 self.finish_analyse() return settings.has_changed = False self.has_changed = False self.old_discretisation_value = self.discretisation_value_selector.value() # Create the thread to run the analyse self.preprocess_thread = QThread() self.preprocess_worker = PreProcessWorker("PreProcessWorker", self.input_file_path.toPlainText(), self.discretisation_value_selector.value()) self.preprocess_worker.moveToThread(self.preprocess_thread) # Connect the signals # Start self.preprocess_thread.started.connect(self.preprocess_worker.run) # Progress self.preprocess_worker.status.connect(self.set_status) self.preprocess_worker.progress.connect(self.update_progress_bar) self.preprocess_worker.processed_obj.connect(self.set_obj) self.preprocess_worker.processed_obj.connect(self.process_raw_data) self.preprocess_worker.processed_obj.connect(self.process_discrete_data) # Finished self.preprocess_worker.finished.connect(self.preprocess_thread.quit) self.preprocess_worker.finished.connect(self.preprocess_worker.deleteLater) self.preprocess_thread.finished.connect(self.preprocess_thread.deleteLater) # Start the thread self.preprocess_thread.start() self.start_analyse_button.setEnabled(False) def process_raw_data(self, obj:ScannedObject): """ Start the analyse, create the thread and connect the signals. """ self.processrawdata_thread = QThread() self.processraw_worker = RawDataProcessWorker("RawDataProcessWorker", obj, self.output_folder_path.toPlainText(), self.output_file_prefix.text(), self.discretisation_value_selector.value()) self.processraw_worker.moveToThread(self.processrawdata_thread) # Connect the signals # Start self.processrawdata_thread.started.connect(self.processraw_worker.run) # Progress self.processraw_worker.status.connect(self.set_status) self.processraw_worker.progress.connect(self.update_progress_bar) self.processraw_worker.processedData.connect(self.set_raw_data) # Finished self.processraw_worker.finished.connect(self.finish_analyse) self.processraw_worker.finished.connect(self.processrawdata_thread.quit) self.processraw_worker.finished.connect(self.processraw_worker.deleteLater) self.processrawdata_thread.finished.connect(self.processrawdata_thread.deleteLater) # Start the thread self.processrawdata_thread.start() def process_discrete_data(self, obj:ScannedObject): """ Start the analyse, create the thread and connect the signals. """ self.processdiscrete_thread = QThread() self.processdiscrete_worker = DiscreteDataProcessWorker("DiscreteDataProcessWorker", obj, self.output_folder_path.toPlainText(), self.output_file_prefix.text(), self.discretisation_value_selector.value()) self.processdiscrete_worker.moveToThread(self.processdiscrete_thread) # Connect the signals # Start self.processdiscrete_thread.started.connect(self.processdiscrete_worker.run) # Progress self.processdiscrete_worker.status.connect(self.set_status) self.processdiscrete_worker.progress.connect(self.update_progress_bar) self.processdiscrete_worker.processedData.connect(self.set_discrete_data) self.processdiscrete_worker.processedData.connect(self.process_advanced_data) # Finished self.processdiscrete_worker.finished.connect(self.finish_analyse) self.processdiscrete_worker.finished.connect(self.processdiscrete_thread.quit) self.processdiscrete_worker.finished.connect(self.processdiscrete_worker.deleteLater) self.processdiscrete_thread.finished.connect(self.processdiscrete_thread.deleteLater) # Start the thread self.processdiscrete_thread.start() def process_advanced_data(self, discrete_data:dict, refresh:bool = False): """ Start the analyse, create the thread and connect the signals. """ self.advanced_data_thread = QThread() self.advanced_data_worker = AdvancedDataWorker("AdvancedDataProcessWorker", discrete_data, self.V_scan.value()) self.advanced_data_worker.moveToThread(self.advanced_data_thread) # Connect the signals # Start self.advanced_data_thread.started.connect(self.advanced_data_worker.run) # Progress self.advanced_data_worker.status.connect(self.set_status) self.advanced_data_worker.progress.connect(self.update_progress_bar) self.advanced_data_worker.processedData.connect(self.set_advanced_data) # Finished if not refresh: self.advanced_data_worker.finished.connect(self.finish_analyse) self.advanced_data_worker.finished.connect(self.advanced_data_thread.quit) self.advanced_data_worker.finished.connect(self.advanced_data_worker.deleteLater) self.advanced_data_thread.finished.connect(self.advanced_data_thread.deleteLater) # Start the thread self.advanced_data_thread.start() def refresh_advanced_data(self): """ Start the analyse, create the thread and connect the signals. """ self.process_advanced_data(self.discrete_data, True) def set_obj(self,obj:ScannedObject): """ Persists the pre-processed obj """ self.obj = obj def set_discrete_data(self,discrete_data:dict): """ Persists the calculated discrete data """ self.discrete_data = discrete_data layer = [str(i) for i in range(len(discrete_data["X moy (en mm)"]))] layer.insert(0,"Aucune couche") self.layer_ComboBox.currentIndexChanged.disconnect(self.layer_changed) self.layer_ComboBox.clear() self.layer_ComboBox.addItems(layer) self.layer_ComboBox.currentIndexChanged.connect(self.layer_changed) def set_raw_data(self,raw_data:dict): """ Persists the calculated raw data """ self.raw_data = raw_data def set_advanced_data(self,advanced_data:dict): """ Persists the calculated raw data and show the values """ self.advanced_data = advanced_data self.tortuosity.setValue(advanced_data["Tortuosité"]) self.volume.setValue(advanced_data["Volume en mm3"]) self.surface.setValue(advanced_data["Surface en mm2"]) self.mean_r_mean.setValue(advanced_data[""]) self.sigma_r_mean.setValue(advanced_data["σ_"]) self.sigma_r_tot.setValue(advanced_data["σ_^tot"]) self.MI_l.setValue(advanced_data["MI_l"]) self.MI_p.setValue(advanced_data["MI_p"]) self.MI_mR.setValue(advanced_data["MI_mR"]) self.MI_mH.setValue(advanced_data["MI_mH"]) self.MI_mR_in.setValue(advanced_data["MI_mR_in"]) self.V_scan.setValue(advanced_data["V_scan"]) self.R_V_scan.setValue(advanced_data["R_V_scan"]) self.S_V_scan.setValue(advanced_data["S_V_scan"]) self.R_h.setValue(advanced_data["Rayon hydraulique R_h"]) self.HI.setValue(advanced_data["HI"]) def export_advanced_data(self): """ Export the advanced data to a file """ if self.advanced_data is None: ErrorPopup("Aucune analyse effectuée. Aucune donnée à exporter").show_popup() return file_path = QFileDialog.getSaveFileName(self, "Exporter les indicateurs morphologiques", "./", "Fichier texte (*.txt)") if file_path[0] == "": return advanced_data = {key : [round(value,6),] for key,value in self.advanced_data.items()} string_data = format_data(advanced_data,SettingManager.get_instance().get_setting("output_file_separator")) save_output_file(file_path[0],string_data) def save_model(self): """ Save the model to a file """ if self.obj is None: ErrorPopup("Aucune analyse effectuée. Aucun modèle à sauvegarder").show_popup() return file_path = QFileDialog.getSaveFileName(self, "Sauvegarder le modèle", "./", "Fichier OBJ (*.obj)") self.obj.export_obj(file_path[0]) ############################################################################### # # # # # Graphs management # # # # # ############################################################################### def toggle_graphs(self): """ Show or hide the graphs """ if self.show_graph_checkbox.isChecked(): self.Graphs.show() else: self.Graphs.hide() def renderGraphs(self,obj:ScannedObject,raw_data:dict,discrete_data:dict): """ Render the graphs :param obj: The scanned object :param raw_data: The raw data :param discrete_data: The discrete data """ if not self.show_graph_checkbox.isChecked(): return self.set_status("Rendering graphs... this may take a moment") for slot in self.slots: current_slot = slot[0] graph_type = slot[1] if graph_type == "Mesh3D": if obj.has_faces(): current_slot.addWidget(render3D(obj,False).native) if graph_type == "Coupe XZ": current_slot.addWidget(cross_section(obj.get_x(), obj.get_z(), "Coupe XZ", "X (en mm)", "Z (en mm)", False).native) if graph_type == "Coupe YZ": current_slot.addWidget(cross_section(obj.get_y(), obj.get_z(), "Coupe YZ", "Y (en mm)", "Z (en mm)", False).native) if graph_type == "Evolution du rayon moyen": current_slot.addWidget(render2D(list(zip(discrete_data['Rayon moyen (en mm)'],discrete_data['Z moy (en mm)'])), "Evolution du rayon moyen en fonction de Z", "Rayon moyen (en mm)", "Z (en mm)", False).native) if graph_type == "Difference entre le rayon moyen et la moyenne des rayons": r_mean= get_mean(discrete_data['Rayon moyen (en mm)']) current_slot.addWidget(render2D(list(zip(discrete_data['Rayon moyen (en mm)']-r_mean,discrete_data['Z moy (en mm)'])), "Difference entre le rayon moyen et la moyenne en fonction de Z", "Rayon moyen (en mm)", "Z (en mm)", False).native) self.set_status("Graphs rendered!") def renderDiscreteGraphs(self,obj:ScannedObject,raw_data:dict,discrete_data:dict): """ """ if self.layer_ComboBox.currentText() == 'Aucune couche': return layer_nb = int(self.layer_ComboBox.currentText()) discretisation_value = self.discretisation_value_selector.value() self.interval_size.setValue(self.discrete_data["Discretisation(en mm)"][layer_nb]) self.x_mean.setValue(self.discrete_data["X moy (en mm)"][layer_nb]) self.y_mean.setValue(self.discrete_data["Y moy (en mm)"][layer_nb]) self.z_mean.setValue(self.discrete_data["Z moy (en mm)"][layer_nb]) self.r_mean.setValue(self.discrete_data["Rayon moyen (en mm)"][layer_nb]) self.sigma_r.setValue(self.discrete_data["Rayon ecart type (en mm)"][layer_nb]) if not self.show_graph_checkbox.isChecked(): return self.set_status("Rendering discretes graphs... this may take a moment") for slot in self.slots: current_slot = slot[0] graph_type = slot[1] if graph_type == "Coupe de la couche": self.clear_slot(current_slot) vertices = obj.get_discrete_vertices(discretisation_value)[layer_nb] current_slot.addWidget(cross_section([x for x,y,z in vertices], [y for x,y,z in vertices], "Coupe de la couche", "X en mm", "Y en mm", False).native) if graph_type == "Difference entre le rayon de chaque points et le rayon moyen de la couche": self.clear_slot(current_slot) vertices = obj.get_discrete_vertices(discretisation_value)[layer_nb] x_mean = discrete_data["X moy (en mm)"][layer_nb] y_mean = discrete_data["Y moy (en mm)"][layer_nb] r_mean = discrete_data["Rayon moyen (en mm)"][layer_nb] rs = [get_radius_from_x_y(x,y,x_mean,y_mean) for x,y,z in vertices] θs = [get_true_theta_from_x_y(x,y,x_mean,y_mean) for x,y,z in vertices] min_θ = min(θs) current_slot.addWidget(cross_section([θ-min_θ for θ in θs], [r-r_mean for r in rs], "Difference entre le rayon de chaque points et le rayon moyen de la couche", "Theta en rad", "r - en mm", False).native) self.set_status("Graphs rendered!") def clear_slot(self,slot): for i in reversed(range(slot.count())): slot.itemAt(i).widget().setParent(None) def clear_graphs(self): """ Clear the graphs """ if not self.show_graph_checkbox.isChecked(): return for slot,_ in self.slots: self.clear_slot(slot) ############################################################################### # # # # # User interface updates # # # # # ############################################################################### def finish_analyse(self): """ Finish the analyse """ self.completed_tasks += 1 if self.completed_tasks == self.total_tasks: self.status_text.setText("Done") self.analyse_progress_bar.setValue(100) self.renderGraphs(self.obj,self.raw_data,self.discrete_data) self.renderDiscreteGraphs(self.obj,self.raw_data,self.discrete_data) self.start_analyse_button.setEnabled(True) self.SettingsTab.setCurrentIndex(1) def update_progress_bar(self, value): """ Update the progress bar """ self.analyse_progress_bar.setValue(value) def set_status(self, status:str): """ Set the status of the analyse """ self.status_text.setText(status) def show_settings(self): """ Show the settings window """ self.settings_window.show() def show_about(self): """ Show the about window """ self.about_window.show() def graph_type_changed(self): """ Update the number of graphs """ self.graph_nb = 0 for combo_box in self.combo_boxes: if combo_box.currentIndex() != 0: self.graph_nb += 1 self.graph_nb_spinbox.setValue(self.graph_nb) settings = SettingManager.get_instance() for count,_ in enumerate(self.slots): self.slots[count][1] = self.combo_boxes[count].currentText() settings.set_last_graph(count,self.slots[count][1]) if self.obj is None or self.discrete_data is None or self.raw_data is None: return else: self.clear_graphs() self.renderGraphs(self.obj,self.raw_data,self.discrete_data) self.renderDiscreteGraphs(self.obj,self.raw_data,self.discrete_data) def layer_changed(self): if self.obj is None or self.discrete_data is None or self.raw_data is None: return else: self.renderDiscreteGraphs(self.obj,self.raw_data,self.discrete_data)