from PyQt6.QtCore import QThread, pyqtSignal, pyqtSlot, QMutexLocker, QMutex, QObject import cv2 from time import sleep import numpy as np class VideoThread(QObject): #Input signals pause_signal = pyqtSignal() stop_signal = pyqtSignal() go_to_frame_signal = pyqtSignal(int) change_fps_signal = pyqtSignal(int) change_pixmap_signal = pyqtSignal(tuple) current_frame_signal = pyqtSignal(int) def __init__(self, cap,type="video"): super().__init__() self.frame_index = 0 self.cap_mutex = QMutex() self.cap = cap self.fps = 24 self.fps_mutex = QMutex() self.pause = False self.pause_mutex = QMutex() self.stop = False self.stop_mutex = QMutex() self.type = type self.buffered_frame = None self.frameskip = 0 self.frameskip_mutex = QMutex() if self.type == "video": self.get_frame = self.get_frame_video else: self.get_frame = self.get_frame_camera def get_frame_video(self): with QMutexLocker(self.cap_mutex): frame_index = int(self.cap.get(cv2.CAP_PROP_POS_FRAMES)) ret, cv_img = self.cap.read() return ret, cv_img, frame_index def get_frame_camera(self): with QMutexLocker(self.cap_mutex): frame_index = self.frame_index self.frame_index += 1 ret, cv_img = self.cap.read() return ret, cv_img, frame_index @pyqtSlot(int) def go_to_frame(self, frame): if self.type != "video": return with QMutexLocker(self.cap_mutex): self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame) self.current_frame_signal.emit(int(self.cap.get(cv2.CAP_PROP_POS_FRAMES))) with QMutexLocker(self.pause_mutex): if self.pause: _, cv_img = self.cap.read() self.buffered_frame = cv_img @pyqtSlot(int) def change_fps(self, fps): with QMutexLocker(self.fps_mutex): self.fps = fps @pyqtSlot(int) def change_frameskip(self, frameskip): with QMutexLocker(self.frameskip_mutex): self.frameskip = frameskip @pyqtSlot() def pause_capture(self): with QMutexLocker(self.cap_mutex): with QMutexLocker(self.pause_mutex): self.pause = not self.pause if self.type == "video": self.frame_index = int(self.cap.get(cv2.CAP_PROP_POS_FRAMES)) _, cv_img = self.cap.read() self.buffered_frame = cv_img @pyqtSlot() def stop_capture(self): with QMutexLocker(self.stop_mutex): self.stop = True def run(self): self.change_fps_signal.connect(self.change_fps) self.go_to_frame_signal.connect(self.go_to_frame) self.pause_signal.connect(self.pause_capture) self.stop_signal.connect(self.stop_capture) while True: with QMutexLocker(self.stop_mutex): if self.stop: break fps = None with QMutexLocker(self.fps_mutex): fps = self.fps with QMutexLocker(self.pause_mutex): if self.pause: if self.buffered_frame is not None: self.change_pixmap_signal.emit((self.buffered_frame.copy(),self.frame_index)) sleep(1/fps) continue ret, cv_img, frame_count = self.get_frame() if ret: current_frame = (cv_img, frame_count) self.change_pixmap_signal.emit(current_frame) self.current_frame_signal.emit(int(self.cap.get(cv2.CAP_PROP_POS_FRAMES))) #next frame with QMutexLocker(self.frameskip_mutex): if self.frameskip != 0: frameskip = self.frameskip current_frame = int(self.cap.get(cv2.CAP_PROP_POS_FRAMES)) frameskip += 1 next_frame = current_frame if current_frame % frameskip == 0 else current_frame + frameskip - current_frame % frameskip with QMutexLocker(self.cap_mutex): self.cap.set(cv2.CAP_PROP_POS_FRAMES, next_frame) sleep(1/fps) self.cap.release() return