feat: add initial implementation of the shitpost scanning application with database integration and various scanners for music, text, speech, and tags extraction
- Introduce `app.py` as the main application file to handle shitpost scanning. - Create `config.py` for configuration settings including scan paths and file types. - Implement database models in `Models.py` for shitposts, songs, speech outputs, and tags. - Add database creation logic in `db.py`. - Develop various scanners (`OcrScanner.py`, `SongScanner.py`, `SpeechScanner.py`, `TagScanner.py`) for extracting information from shitposts. - Implement utility functions in `dateExtractor.py` and `shitpostFactory.py` for handling file metadata and creating shitpost objects. - Include a `pyproject.toml` for project dependencies and configuration.
This commit is contained in:
		
						commit
						03a41b6996
					
				
							
								
								
									
										79
									
								
								app.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										79
									
								
								app.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,79 @@ | ||||
| import os | ||||
| import time | ||||
| from typing import List, Tuple | ||||
| import warnings | ||||
| import asyncio | ||||
| import threading | ||||
| 
 | ||||
| from sqlalchemy import create_engine | ||||
| from sqlalchemy.orm import Session | ||||
| 
 | ||||
| from config import SCAN_PATHS | ||||
| from db.Models import Shitpost | ||||
| from db.db import create_db | ||||
| from scanners.OcrScanner import extractText | ||||
| from scanners.SongScanner import extractSong | ||||
| from scanners.SpeechScanner import extractSpeech | ||||
| from utils.shitpostFactory import ShitpostFactory | ||||
| 
 | ||||
| warnings.filterwarnings("ignore") | ||||
| 
 | ||||
| 
 | ||||
| def scanMusics(shitposts:List[Tuple[Shitpost,threading.Lock]]): | ||||
|     for shitpost in shitposts: | ||||
|         lock = shitpost[1] | ||||
|         shitpost = shitpost[0] | ||||
|         extractSong(shitpost,lock) | ||||
| 
 | ||||
| def scanText(shitposts:List[Tuple[Shitpost,threading.Lock]]): | ||||
|     for shitpost in shitposts: | ||||
|         lock = shitpost[1] | ||||
|         shitpost = shitpost[0] | ||||
|         extractText(shitpost,lock) | ||||
| 
 | ||||
| def scanSpeech(shitposts:List[Tuple[Shitpost,threading.Lock]]): | ||||
|     for shitpost in shitposts: | ||||
|         lock = shitpost[1] | ||||
|         shitpost = shitpost[0] | ||||
|         extractSpeech(shitpost,lock) | ||||
| 
 | ||||
| 
 | ||||
| async def scanShitposts(): | ||||
|     engine = create_engine("sqlite:///Shitpost.db", future=True) | ||||
|     session = Session(engine) | ||||
|     shitposts = [] | ||||
|     paths = set() | ||||
|     for shitpost in session.query(Shitpost).all(): | ||||
|         shitposts.append((shitpost,threading.Lock())) | ||||
|         paths.add(shitpost.path) | ||||
| 
 | ||||
|     for path in SCAN_PATHS: | ||||
|         for r,d,f in os.walk(path): | ||||
|             for file in f: | ||||
|                 path = os.path.join(r,file) | ||||
|                 if path not in paths: | ||||
|                     print(file) | ||||
|                     try: | ||||
|                         shitposts.append((ShitpostFactory(path),threading.Lock())) | ||||
|                     except: | ||||
|                         f = open("failed.txt","a") | ||||
|                         f.write(file+"\n") | ||||
|                         f.close() | ||||
| 
 | ||||
|     # scanMusics(shitposts) | ||||
|     # scanText(shitposts) | ||||
|     # scanSpeech(shitposts) | ||||
|     task1 = asyncio.to_thread(scanSpeech,shitposts) | ||||
|     task2 = asyncio.to_thread(scanText,shitposts) | ||||
|     task3 = asyncio.to_thread(scanMusics,shitposts) | ||||
|     await asyncio.gather(task1,task2,task3) | ||||
|     for shitpost in shitposts: | ||||
|         shitpost = shitpost[0] | ||||
|         session.add(shitpost) | ||||
|     session.commit() | ||||
|     session.close() | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     create_db() | ||||
|     asyncio.run(scanShitposts()) | ||||
							
								
								
									
										11
									
								
								config.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										11
									
								
								config.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,11 @@ | ||||
| SCAN_PATHS = [ | ||||
|         #"./testMedia/", | ||||
|         "/home/djalim/Vidéos/Shitpost/", | ||||
|         "/home/djalim/Images/Shitpost et Art/" | ||||
|         ] | ||||
| 
 | ||||
| VIDEO_FILETYPES = [ | ||||
|         "mp4","webm","mkv" | ||||
|         ] | ||||
| 
 | ||||
| IMAGE_FILETYPES = ["jpg", "jpeg", "png", "webp"] | ||||
							
								
								
									
										92
									
								
								db/Models.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										92
									
								
								db/Models.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,92 @@ | ||||
| from sqlalchemy import Column, ForeignKey, LargeBinary, Table, String, Integer, Boolean # Added String, Integer, Boolean for Table definition | ||||
| from sqlalchemy.orm import declarative_base, Mapped, mapped_column, relationship | ||||
| from typing import List | ||||
| 
 | ||||
| Base = declarative_base() | ||||
| 
 | ||||
| # It's good practice to add primary_key=True for columns in an association table | ||||
| # and explicitly state types, though SQLAlchemy can often infer them. | ||||
| shitposts_tags = Table( | ||||
|     "shitposts_tags", | ||||
|     Base.metadata, | ||||
|     Column("left_id", String, ForeignKey("shitposts.hash"), primary_key=True), | ||||
|     Column("right_id", Integer, ForeignKey("tags.id"), primary_key=True), | ||||
| ) | ||||
| 
 | ||||
| class Shitpost(Base): | ||||
|     __tablename__ = 'shitposts' | ||||
|     hash: Mapped[str] = mapped_column(primary_key=True) | ||||
|     path: Mapped[str] = mapped_column() | ||||
|     date: Mapped[str] = mapped_column()  # Consider using sqlalchemy.types.DateTime for date fields | ||||
|     file_type: Mapped[str] = mapped_column() | ||||
|     thumbnail: Mapped[str] = mapped_column(LargeBinary,deferred=True) | ||||
|     correct_song_match: Mapped[bool] = mapped_column() | ||||
| 
 | ||||
|     # One-to-one relationship with OcrOutput | ||||
|     # Mapped["OcrOutput"] implies uselist=False, making it scalar. | ||||
|     # For this to be truly 1-to-1, OcrOutput.shitpost_id should be unique. | ||||
|     ocr_output: Mapped["OcrOutput"] = relationship(back_populates="shitpost") | ||||
| 
 | ||||
|     # One-to-one relationship with SpeechOutput | ||||
|     # Mapped["SpeechOutput"] implies uselist=False. | ||||
|     # For this to be truly 1-to-1, SpeechOutput.shitpost_id should be unique. | ||||
|     speech_output: Mapped["SpeechOutput"] = relationship(back_populates="shitpost") | ||||
| 
 | ||||
|     # Many-to-many relationship with Tags | ||||
|     tags: Mapped[List["Tags"]] = relationship( | ||||
|         secondary=shitposts_tags, | ||||
|         back_populates="shitposts"  # Matches Tags.shitposts | ||||
|     ) | ||||
| 
 | ||||
|     # Foreign key to SongMatch | ||||
|     song_id: Mapped[int] = mapped_column(ForeignKey('song_match.id'), nullable=True) # Assuming a shitpost might not have a song | ||||
|     # Many-to-one relationship with SongMatch | ||||
|     song: Mapped["SongMatch"] = relationship(back_populates="shitposts") # Added back_populates | ||||
| 
 | ||||
| class SongMatch(Base): | ||||
|     __tablename__ = 'song_match' | ||||
|     id: Mapped[int] = mapped_column(primary_key=True) | ||||
|     song_name: Mapped[str] = mapped_column()  # Added mapped_column() | ||||
|     artist_name: Mapped[str] = mapped_column()  # Added mapped_column() | ||||
| 
 | ||||
|     # One-to-many relationship with Shitpost | ||||
|     shitposts: Mapped[List["Shitpost"]] = relationship(back_populates="song") | ||||
| 
 | ||||
| class SpeechOutput(Base): | ||||
|     __tablename__ = 'speech_output'  # Corrected: was 'ocr_output' | ||||
|     id: Mapped[int] = mapped_column(primary_key=True) | ||||
|     text: Mapped[str] = mapped_column() | ||||
| 
 | ||||
|     # Foreign key column linking to Shitpost. | ||||
|     # Shitpost.hash is Mapped[str], so shitpost_id must be Mapped[str]. | ||||
|     shitpost_id: Mapped[str] = mapped_column(ForeignKey('shitposts.hash')) # Corrected type and ForeignKey usage | ||||
| 
 | ||||
|     # Relationship to Shitpost. Mapped['Shitpost'] indicates a scalar (single object) relationship. | ||||
|     shitpost: Mapped['Shitpost'] = relationship(back_populates='speech_output') # Corrected: uses relationship() | ||||
| 
 | ||||
| class Tags(Base): | ||||
|     __tablename__ = "tags" | ||||
|     id: Mapped[int] = mapped_column(primary_key=True) | ||||
|     name: Mapped[str] = mapped_column()  # Added mapped_column() | ||||
| 
 | ||||
|     # Many-to-many relationship with Shitpost | ||||
|     shitposts: Mapped[List["Shitpost"]] = relationship( | ||||
|         secondary=shitposts_tags, | ||||
|         back_populates="tags"  # Corrected: uses relationship(), matches Shitpost.tags | ||||
|     ) | ||||
| 
 | ||||
| class OcrOutput(Base): | ||||
|     __tablename__ = 'ocr_output' | ||||
|     id: Mapped[int] = mapped_column(primary_key=True) | ||||
|     text: Mapped[str] = mapped_column() | ||||
| 
 | ||||
|     # Foreign key column linking to Shitpost. | ||||
|     # Shitpost.hash is Mapped[str], so shitpost_id must be Mapped[str]. | ||||
|     shitpost_id: Mapped[str] = mapped_column( | ||||
|         ForeignKey('shitposts.hash'),  # Corrected: target table 'shitposts', column 'hash', and ForeignKey usage | ||||
|         nullable=False, | ||||
|         index=True | ||||
|     ) | ||||
|     # Relationship to Shitpost. Mapped['Shitpost'] indicates a scalar relationship. | ||||
|     shitpost: Mapped['Shitpost'] = relationship(back_populates='ocr_output') | ||||
|     # Removed duplicated shitpost_id definition that was here. | ||||
							
								
								
									
										7
									
								
								db/db.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										7
									
								
								db/db.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,7 @@ | ||||
| from sqlalchemy import create_engine | ||||
| from db.Models import * | ||||
| 
 | ||||
| 
 | ||||
| def create_db(): | ||||
|     engine = create_engine("sqlite:///Shitpost.db", future=True) | ||||
|     Base.metadata.create_all(engine) | ||||
							
								
								
									
										2553
									
								
								poetry.lock
									
									
									
										generated
									
									
									
										Normal file
									
								
							
							
						
						
									
										2553
									
								
								poetry.lock
									
									
									
										generated
									
									
									
										Normal file
									
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							
							
								
								
									
										33
									
								
								pyproject.toml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										33
									
								
								pyproject.toml
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,33 @@ | ||||
| [project] | ||||
| name = "memedb" | ||||
| version = "0.1.0" | ||||
| description = "" | ||||
| authors = [ | ||||
|     {name = "Djalim Simaila",email = "DjalimS.pro@outlook.fr"} | ||||
| ] | ||||
| readme = "README.md" | ||||
| requires-python = ">=3.13,<4.0" | ||||
| dependencies = [ | ||||
|     "torch (>=2.7.0,<3.0.0)", | ||||
|     "sqlalchemy (>=2.0.41,<3.0.0)", | ||||
|     "pillow (>=11.2.1,<12.0.0)", | ||||
|     "opencv-python (>=4.11.0.86,<5.0.0.0)", | ||||
|     "faster-whisper (>=1.1.1,<2.0.0)", | ||||
|     "easyocr (>=1.7.2,<2.0.0)", | ||||
|     "thumbnail (>=1.5,<2.0)", | ||||
|     "openai (>=1.84.0,<2.0.0)", | ||||
|     "lmstudio (>=1.3.1,<2.0.0)" | ||||
| ] | ||||
| package-mode = false | ||||
| 
 | ||||
| 
 | ||||
| [build-system] | ||||
| requires = ["poetry-core>=2.0.0,<3.0.0"] | ||||
| build-backend = "poetry.core.masonry.api" | ||||
| 
 | ||||
| [tool.pyright] | ||||
| venvPath = "." | ||||
| venv = ".venv" | ||||
| 
 | ||||
| [virtualenvs] | ||||
| in-project = true | ||||
							
								
								
									
										72
									
								
								scanners/OcrScanner.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										72
									
								
								scanners/OcrScanner.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,72 @@ | ||||
| from json import dumps | ||||
| import os | ||||
| import subprocess | ||||
| import tempfile | ||||
| import threading | ||||
| import lmstudio as lms | ||||
| from config import IMAGE_FILETYPES, VIDEO_FILETYPES | ||||
| from db.Models import OcrOutput, Shitpost | ||||
| 
 | ||||
| import time | ||||
| import base64 | ||||
| import easyocr | ||||
| reader = easyocr.Reader(['fr', 'en']) | ||||
| 
 | ||||
| 
 | ||||
| def get_filename_from_path(path): | ||||
|     return path.split("/")[-1].split(".")[0] | ||||
| 
 | ||||
| def extractImage(filepath:str)->str: | ||||
|     s = "" | ||||
|     results = reader.readtext(filepath) | ||||
|     for (bbox, text, prob) in results: | ||||
|        s+= text + "\n" | ||||
|     return s | ||||
| 
 | ||||
| def extractImageLlm(filepath:str)->str: | ||||
|     prompt = """extract the text in the image, put it between two "%" exemple :  %extracted \ntext%, if there is nothing just say %%""" | ||||
| 
 | ||||
|     image = lms.prepare_image(filepath) | ||||
|     chat = lms.Chat() | ||||
|     model = lms.llm("qwen/qwen2.5-vl-7b") | ||||
|     chat.add_user_message(prompt, images=[image]) | ||||
|     prediction = model.respond(chat) | ||||
| 
 | ||||
|     return prediction.content.split("%")[1] | ||||
| 
 | ||||
| 
 | ||||
| def scanImage(shitpost: Shitpost): | ||||
|     text = extractImage(shitpost.path) | ||||
|     dico = {"frames":{0:text}} | ||||
|     ocr_output = dumps(dico) | ||||
|     shitpost.ocr_output = OcrOutput(text=ocr_output) | ||||
| 
 | ||||
| def scanVideo(shitpost:Shitpost,lock:threading.Lock): | ||||
|     #create tmp dir | ||||
|     dico = {"frames" : {}} | ||||
|     with tempfile.TemporaryDirectory() as tmpdir: | ||||
|         tmp_path = tmpdir | ||||
|         # extract keyframes | ||||
|         cmd = f"ffmpeg -loglevel quiet -i '{shitpost.path}' -r 1 -f image2 {tmp_path}/frame-%04d.jpg" | ||||
|         subprocess.run(cmd, shell=True) | ||||
|         # apply OCR to each frame | ||||
|         for r,d,f in os.walk(tmp_path): | ||||
|             for file in f: | ||||
|                 if file.endswith(".jpg"): | ||||
|                     #parse frame number | ||||
|                     frame_number = int(file.split("-")[1].split(".")[0]) | ||||
|                     text = extractImage(os.path.join(r,file)) | ||||
|                     dico['frames'][frame_number] = text | ||||
|         with lock: | ||||
|             shitpost.ocr_output = OcrOutput(text=dumps(dico)) | ||||
| 
 | ||||
| 
 | ||||
| def extractText(shitpost:Shitpost,lock): | ||||
|     t1 = time.time() | ||||
|     print(f"\tstarting to extract text for {shitpost.hash[:4]} aka {shitpost.path}") | ||||
|     if shitpost.ocr_output is None: | ||||
|         if shitpost.file_type in VIDEO_FILETYPES: | ||||
|             scanVideo(shitpost, lock) | ||||
|         if shitpost.file_type in IMAGE_FILETYPES: | ||||
|             scanImage(shitpost) | ||||
|     print(f"\ttext extraced for {shitpost.hash[:4]} in :{time.time()-t1} ") | ||||
							
								
								
									
										21
									
								
								scanners/SongScanner.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										21
									
								
								scanners/SongScanner.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,21 @@ | ||||
| from time import time | ||||
| from config import VIDEO_FILETYPES | ||||
| from db.Models import Shitpost, SongMatch | ||||
| import subprocess | ||||
| import threading | ||||
| 
 | ||||
| def extractSong(shitpost: Shitpost,lock:threading.Lock): | ||||
|     t1 = time() | ||||
|     print(f"\tstarting to extract song for {shitpost.hash[:4]} aka {shitpost.path}") | ||||
| 
 | ||||
|     if shitpost.song is None and shitpost.file_type in VIDEO_FILETYPES: | ||||
|         result = subprocess.run(['songrec', 'recognize', shitpost.path], capture_output=True, text=True) | ||||
|         artist = result.stdout.split("-")[0][:-1] | ||||
|         songname = result.stdout.split("-")[1][0:] | ||||
|         songmatch = SongMatch( | ||||
|                 song_name=songname, | ||||
|                 artist_name=artist | ||||
|         ) | ||||
|         with lock: | ||||
|             shitpost.song = songmatch | ||||
|     print(f"\tsong extraced for {shitpost.hash[:4]} in :{time()-t1} ") | ||||
							
								
								
									
										51
									
								
								scanners/SpeechScanner.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										51
									
								
								scanners/SpeechScanner.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,51 @@ | ||||
| import tempfile | ||||
| from time import time | ||||
| 
 | ||||
| from config import VIDEO_FILETYPES | ||||
| from db.Models import Shitpost, SpeechOutput | ||||
| from faster_whisper import WhisperModel | ||||
| 
 | ||||
| 
 | ||||
| import threading | ||||
| import subprocess | ||||
| import json | ||||
| 
 | ||||
| def extractSpeech(shitpost: Shitpost,lock:threading.Lock): | ||||
|     t1 = time() | ||||
|     print(f"\tstarting to extract speech for {shitpost.hash[:4]} aka {shitpost.path}") | ||||
|     if shitpost.speech_output is None and shitpost.file_type not in VIDEO_FILETYPES: | ||||
|         #whisper(shitpost) | ||||
|         fastWhisper(shitpost, lock) | ||||
|     print(f"\tspeech extracted for {shitpost.hash[:4]} in :{time()-t1} ") | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| def whisper(shitpost: Shitpost): | ||||
|     filename = shitpost.path.split("/")[-1] | ||||
|     filename = filename.split(".")[0] | ||||
|     with tempfile.TemporaryDirectory() as tmpdir: | ||||
|         cmd = f"whisper --verbose False -f json -o {tmpdir} \"{shitpost.path}\"" | ||||
|         subprocess.run(cmd, shell=True) | ||||
|         with open(f"{tmpdir}/{filename}.json", "r") as file: | ||||
|             data = json.load(file) | ||||
|             print("extracted speech :", data["text"] ) | ||||
|             shitpost.speech_output = SpeechOutput( | ||||
|                 text=json.dumps(data), | ||||
|             ) | ||||
| 
 | ||||
| def fastWhisper(shitpost: Shitpost,lock:threading.Lock): | ||||
|     dico = {} | ||||
|     filename = shitpost.path.split("/")[-1] | ||||
|     filename = filename.split(".")[0] | ||||
|     with tempfile.TemporaryDirectory() as tmpdir: | ||||
|         model = WhisperModel("turbo", device="cpu", compute_type="int8") | ||||
|         segments, info = model.transcribe(shitpost.path,beam_size=5,language_detection_segments=2) | ||||
|         #print("Detected language '%s' with probability %f" % (info.language, info.language_probability)) | ||||
|         for segment in segments: | ||||
|             #print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text)) | ||||
|             dico[round(segment.start,2)] = segment.text | ||||
| 
 | ||||
|     with lock: | ||||
|         shitpost.speech_output = SpeechOutput( | ||||
|                 text=json.dumps(dico), | ||||
|         ) | ||||
							
								
								
									
										15
									
								
								scanners/TagScanner.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										15
									
								
								scanners/TagScanner.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,15 @@ | ||||
| import base64 | ||||
| import threading | ||||
| import lmstudio as lms | ||||
| 
 | ||||
| from db.Models import Shitpost | ||||
| 
 | ||||
| def extractTags(shitpost:Shitpost, lock:threading.Lock): | ||||
|     prompt = f"""I will give you an image and a list of tags, you'll have to return a list of tags that accurately describe the image in a json format, you'll try to match any existing tags, before trying to add new one | ||||
| tags=[]""" | ||||
| 
 | ||||
|     image = lms.prepare_image(shitpost.path) | ||||
|     chat = lms.Chat() | ||||
|     model = lms.llm("qwen/qwen2.5-vl-7b") | ||||
|     chat.add_user_message(prompt, images=[image]) | ||||
|     prediction = model.respond(chat) | ||||
							
								
								
									
										61
									
								
								utils/dateExtractor.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										61
									
								
								utils/dateExtractor.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,61 @@ | ||||
| import re | ||||
| import os | ||||
| from datetime import datetime | ||||
| from typing import Optional, Union # For type hinting | ||||
| 
 | ||||
| def extract_date_from_path(file_path: str) -> Optional[datetime]: | ||||
|     """ | ||||
|     Extracts a datetime object from a file path. | ||||
|     It first tries to parse the date and time from the filename. | ||||
|     If unsuccessful, it falls back to the file's last modification timestamp. | ||||
| 
 | ||||
|     Args: | ||||
|         file_path (str): The absolute or relative path to the file. | ||||
| 
 | ||||
|     Returns: | ||||
|         Optional[datetime]: A datetime object if a date could be extracted, | ||||
|                             otherwise None. | ||||
|     """ | ||||
|     filename = os.path.basename(file_path) | ||||
| 
 | ||||
|     # Define regex patterns to try. Order might matter if filenames could match multiple. | ||||
|     # Pattern 1: (Optional_Prefix_)YYYYMMDD_HHMMSS(_Optional_Suffix) | ||||
|     # e.g., IMG_20210811_141036.jpg, 20210509_005303.jpg, IMG_20190723_211320_065.jpg | ||||
|     patterns = [ | ||||
|         re.compile(r"(?:.*_)?(\d{4})(\d{2})(\d{2})_(\d{2})(\d{2})(\d{2})(?:_.*)?\..+"), # YYYYMMDD_HHMMSS | ||||
|         re.compile(r"(\d{4})(\d{2})(\d{2})_(\d{2})(\d{2})(\d{2})(?:_.*)?\..+"),       # Strict YYYYMMDD_HHMMSS at start | ||||
|         re.compile(r"(\d{8})_(\d{6})(?:_.*)?\..+")                                 # YYYYYMMDD_HHMMSS (compact) | ||||
|     ] | ||||
| 
 | ||||
|     for pattern in patterns: | ||||
|         match = pattern.search(filename) | ||||
|         if match: | ||||
|             groups = match.groups() | ||||
|             try: | ||||
|                 if len(groups) == 6: # YYYY, MM, DD, HH, MM, SS | ||||
|                     year, month, day, hour, minute, second = map(int, groups) | ||||
|                     return datetime(year, month, day, hour, minute, second) | ||||
|                 elif len(groups) == 2: # YYYYMMDD, HHMMSS | ||||
|                     date_str, time_str = groups | ||||
|                     return datetime.strptime(date_str + time_str, "%Y%m%d%H%M%S") | ||||
|             except ValueError as e: | ||||
|                 print(f"Warning: Could not parse date from filename groups {groups} for {filename}: {e}") | ||||
|                 # Continue to next pattern or fallback | ||||
|                 pass | ||||
| 
 | ||||
|     if os.path.exists(file_path): | ||||
|         try: | ||||
|             # Get the last modification time | ||||
|             mtime = os.path.getmtime(file_path) | ||||
|             return datetime.fromtimestamp(mtime) | ||||
|         except OSError as e: | ||||
|             print(f"Error: Could not get metadata for {file_path}: {e}") | ||||
|             return None | ||||
|         except Exception as e: # Catch any other unexpected error during metadata access | ||||
|             print(f"Unexpected error getting metadata for {file_path}: {e}") | ||||
|             return None | ||||
|     else: | ||||
|         print(f"Warning: File not found at '{file_path}'. Cannot get metadata.") | ||||
|         return None | ||||
| 
 | ||||
|     return None # Should not be reached if os.path.exists is handled, but as a final fallback. | ||||
							
								
								
									
										55
									
								
								utils/shitpostFactory.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										55
									
								
								utils/shitpostFactory.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,55 @@ | ||||
| import hashlib | ||||
| import os | ||||
| import shutil | ||||
| import tempfile | ||||
| 
 | ||||
| from thumbnail import generate_thumbnail | ||||
| 
 | ||||
| from db.Models import Shitpost | ||||
| from utils.dateExtractor import extract_date_from_path | ||||
| 
 | ||||
| options = { | ||||
| 	'trim': False, | ||||
| 	'height': 300, | ||||
| 	'width': 300, | ||||
| 	'quality': 85, | ||||
| 	'type': 'thumbnail' | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| def hashfile(file_path:str)->str: | ||||
|     with open(file_path, 'rb', buffering=0) as f: | ||||
|         return hashlib.file_digest(f, 'sha256').hexdigest() | ||||
| 
 | ||||
| def ShitpostFactory(file_path:str): | ||||
|     with tempfile.TemporaryDirectory() as tmpdir:  | ||||
| 
 | ||||
|         filename = file_path.split("/")[-1] | ||||
|         filetype = os.path.splitext(filename)[1].lower()[1:] | ||||
|         shitpost_hash = hashfile(file_path) | ||||
| 
 | ||||
| 
 | ||||
|         #get date file | ||||
|         shitpost_date = extract_date_from_path(file_path).timestamp() | ||||
|         shitpost = Shitpost( | ||||
|                  hash=shitpost_hash, | ||||
|                  path=file_path, | ||||
|                  date=shitpost_date, | ||||
|                  file_type=filetype | ||||
|         ) | ||||
| 
 | ||||
|         #create thumbnail  | ||||
|         shitpost_cpy = os.path.join(tmpdir, f"{shitpost_hash}.{filetype}") | ||||
|         shutil.copyfile(file_path, shitpost_cpy) | ||||
|          | ||||
|         thumpath = f"{tmpdir}/{shitpost_hash}.png" | ||||
|         generate_thumbnail(shitpost_cpy,thumpath, options) | ||||
|          | ||||
|         thumb = open(f"{tmpdir}/{shitpost_hash}.png", "rb") | ||||
|         shitpost.thumbnail = thumb.read() | ||||
|         thumb.close() | ||||
|          | ||||
|         #song match default value | ||||
|         shitpost.correct_song_match = False | ||||
|          | ||||
|         return shitpost | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user