commit atomique

This commit is contained in:
Djalim Simaila 2023-10-20 16:05:52 +02:00
commit fcfa71d832
7 changed files with 903 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
*__pycache__
*.vscode

20
main.py Normal file
View File

@ -0,0 +1,20 @@
import os
from utils.CollectionManager import OsuDatabase
OSUPATH = "path/to/osu!"
db = OsuDatabase(OSUPATH+"osu!.db")
print(db.version)
print(db.folder_count)
print(db.account_unlocked)
print(db.player_name)
print(db.number_of_beatmaps)
for beatmap in db.beatmaps:
print(beatmap)
print(beatmap.beatmap_id)

447
utils/CollectionManager.py Executable file
View File

@ -0,0 +1,447 @@
import os
import pickle
import shutil
from typing import Any, Union, List
from enum import Enum, auto
from difflib import SequenceMatcher
from utils.mapDownloader import download_beatmap
from utils.parsedb import parse_collection_db, parse_osu_db
def find_file_in_case_sensitive_fs(filepath:str,start_at:str="")->Union[str,"FileStatus"]:
"""returns the true path to a file or folder in a case sensitive filesystem
Args:
filepath (str): the path to the file or folder
Returns:
str: the true path to the file or folder
"""
root = "/"
path_element = filepath.split(os.sep)
if start_at != "":
for index,element in enumerate(path_element):
if element == start_at:
root = os.path.join(root,element)
path_element = path_element[index+1:]
break
else:
root = os.path.join(root,element)
element_to_find = path_element.pop(0)
if os.path.exists(root):
for file in os.listdir(root):
if file.lower() == element_to_find.lower():
temp = os.path.join(root,file)
#if there is no subfolder, return the path
if len(path_element) == 0:
return temp
temp = os.path.join(temp,*path_element)
path = find_file_in_case_sensitive_fs(temp,file)
if isinstance(path,str):
return path
else:
return FileStatus.not_found
else:
return FileStatus.not_found
SONG_FOLDER_PATH = "/Volumes/SharedStorage/Jeux/osu!/Songs"
class RankedStatus(Enum):
""" This enum represent the ranked status of a beatmap
Values:
unknown (int): 0
unsubmitted (int): 1
pending_wip_graveyard (int): 2
unused (int): 3
ranked (int): 4
approved (int): 5
qualified (int): 6
loved (int): 7
"""
unknown = 0
unsubmitted = 1
pending_wip_graveyard = 2
unused = 3
ranked = 4
approved = 5
qualified = 6
loved = 7
def __str__(self):
return self.name
def __repr__(self):
return self.name
@staticmethod
def from_int(value:int):
""" This function return a RankedStatus from an integer
Args:
value (int): the integer to convert
Returns:
RankedStatus: the RankedStatus corresponding to the integer
"""
return RankedStatus(value)
@staticmethod
def from_str(value:str):
""" This function return a RankedStatus from a string
Args:
value (str): the string to convert
Returns:
RankedStatus: the RankedStatus corresponding to the string
"""
match value:
case "unsubmitted":
return RankedStatus.unsubmitted
case "pending/wip/graveyard":
return RankedStatus.pending_wip_graveyard
case "unused":
return RankedStatus.unused
case "ranked":
return RankedStatus.ranked
case "approved":
return RankedStatus.approved
case "qualified":
return RankedStatus.qualified
case "loved":
return RankedStatus.loved
case "unknown":
return RankedStatus.unknown
case _:
return RankedStatus.unknown
class GameplayMode(Enum):
""" This enum represent the gameplay mode of a beatmap
Values:
osu_standard (int): 0
taiko (int): 1
ctb (int): 2
mania (int): 3
"""
osu_standard = 0
taiko = 1
ctb = 2
mania = 3
def __str__(self):
return self.name
def __repr__(self):
return self.name
@staticmethod
def from_int(value:int):
""" This function return a GameplayMode from an integer
Args:
value (int): the integer to convert
Returns:
GameplayMode: the GameplayMode corresponding to the integer
"""
return GameplayMode(value)
@staticmethod
def from_str(value:str):
""" This function return a GameplayMode from a string
Args:
value (str): the string to convert
Returns:
GameplayMode: the GameplayMode corresponding to the string
"""
match value:
case "osu!standard":
return GameplayMode.osu_standard
case "taiko":
return GameplayMode.taiko
case "ctb":
return GameplayMode.ctb
case "mania":
return GameplayMode.mania
case _:
return GameplayMode.osu_standard
class UserPermissions(Enum):
""" This enum represent the user permissions of a user
Values:
none (int): 0
normal (int): 1
moderator (int): 2
supporter (int): 4
friend (int): 8
peppy (int): 16
tournament (int): 32
"""
none = 0
normal = 1
moderator = 2
supporter = 4
friend = 8
peppy = 16
tournament = 32
def __str__(self):
return self.name
def __repr__(self):
return self.name
@staticmethod
def from_int(value:int):
""" This function return a UserPermissions from an integer
Args:
value (int): the integer to convert
Returns:
UserPermissions: the UserPermissions corresponding to the integer
"""
return UserPermissions(value)
@staticmethod
def from_str(value:str):
""" This function return a UserPermissions from a string
Args:
value (str): the string to convert
Returns:
UserPermissions: the UserPermissions corresponding to the string
"""
match value:
case "none":
return UserPermissions.none
case "normal":
return UserPermissions.normal
case "moderator":
return UserPermissions.moderator
case "supporter":
return UserPermissions.supporter
case "friend":
return UserPermissions.friend
case "peppy":
return UserPermissions.peppy
case "tournament":
return UserPermissions.tournament
case _:
return UserPermissions.none
class FileStatus(Enum):
""" This enum represent the status of a file
Values:
not_checked (int): 0
not_found (int): 1
corrupted (int): 2
found (int): 3
"""
not_checked = 0
not_found = 1
corrupted = 2
found = 3
def __str__(self):
return self.name
def __repr__(self):
return self.name
@staticmethod
def from_int(value:int):
return FileStatus(value)
@staticmethod
def from_str(value:str):
match value:
case "not_checked":
return FileStatus.not_checked
case "not_found":
return FileStatus.not_found
case "corrupted":
return FileStatus.corrupted
case "found":
return FileStatus.found
class OsuBeatmap:
"""This class represent a beatmap from the osu!.db file
Args:
beatmap_dict (dict): a dictionary containing the data of the beatmap
Attributes:
artist_name (str): The artist name
artist_name_unicode (str): The artist name in unicode
song_name (str): The song name
song_name_unicode (str): The song name in unicode
creator_name (str): The creator name
difficulty (str): The difficulty name
audio_file_name (str): The audio file name
md5_hash (str): The md5 hash of the beatmap
osu_file_name (str): The osu file name
ranked_status (RankedStatus): The ranked status of the beatmap
ranked_status_text (str): The ranked status of the beatmap in text
hitcircle_number (int): The number of hitcircles
slider_number (int): The number of sliders
spinner_number (int): The number of spinners
last_modification_time (datetime): The last modification time
approach_rate (float): The approach rate
circle_size (float): The circle size
hp_drain (float): The hp drain
overall_difficulty (float): The overall difficulty
slider_velocity (float): The slider velocity
osu_standard_stars_ratings (float): The osu standard stars ratings
taiko_stars_ratings (float): The taiko stars ratings
ctb_stars_ratings (float): The ctb stars ratings
mania_stars_ratings (float): The mania stars ratings
drain_time (int): The drain time
total_time (int): The total time
audio_preview_time (int): The audio preview time
timing_points (list[tuple[float,float,bool]]): The timing points
difficulty_id (int): The difficulty id
beatmap_id (int): The beatmap id
thread_id (int): The thread id
standard_grade (int): The standard grade
taiko_grade (int): The taiko grade
ctb_grade (int): The ctb grade
mania_grade (int): The mania grade
local_offset (int): The local offset
stack_leniency (float): The stack leniency
gameplay_mode (GameplayMode): The gameplay mode
gameplay_mode_text (str): The gameplay mode in text
song_source (str): The song source
song_tags (str): The song tags
online_offset (int): The online offset
font_used (str): The font used
is_unplayed (bool): Is unplayed
last_play_time (datetime): The last play time
is_osz2 (bool): Is osz2
folder_name (str): The folder name
last_check_time (datetime): The last check time
ignore_beatmap_sounds (bool): Ignore beatmap sounds
ignore_beatmap_skin (bool): Ignore beatmap skin
disable_storyboard (bool): Disable storyboard
disable_video (bool): Disable video
visual_override (bool): Visual override
mania_scroll_speed (int): The mania scroll speed
isCorrupted (bool): Is the beatmap corrupted
isFound (bool): Is the beatmap found in the specified osu! folder
"""
def __init__(self, beatmap_dict:dict,song_folder_path:str = None, check_file_status:bool = False):
self.artist_name = beatmap_dict["Artist name"]
self.artist_name_unicode = beatmap_dict["Artist name (unicode)"]
self.song_name = beatmap_dict["Song name"]
self.song_name_unicode = beatmap_dict["Song name (unicode)"]
self.creator_name = beatmap_dict["Creator name"]
self.difficulty = beatmap_dict["Difficulty"]
self.audio_file_name = beatmap_dict["Audio file name"]
self.md5_hash = beatmap_dict["MD5 hash"]
self.osu_file_name = beatmap_dict[".osu filename"]
self.ranked_status = RankedStatus.from_int(beatmap_dict["Ranked status"])
self.ranked_status_text = beatmap_dict["Ranked status text"]
self.hitcircle_number = beatmap_dict["Hitcircle number"]
self.slider_number = beatmap_dict["Slider number"]
self.spinner_number = beatmap_dict["Spinner number"]
self.last_modification_time = beatmap_dict["Last modification time"]
self.approach_rate = beatmap_dict["Approach rate"]
self.circle_size = beatmap_dict["Circle size"]
self.hp_drain = beatmap_dict["HP drain"]
self.overall_difficulty = beatmap_dict["Overall difficulty"]
self.slider_velocity = beatmap_dict["Slider velocity"]
self.osu_standard_stars_ratings = beatmap_dict["Osu!Standard stars ratings"]
self.taiko_stars_ratings = beatmap_dict["Taiko stars ratings"]
self.ctb_stars_ratings = beatmap_dict["CTB stars ratings"]
self.mania_stars_ratings = beatmap_dict["Mania stars ratings"]
self.drain_time = beatmap_dict["Drain time"]
self.total_time = beatmap_dict["Total time"]
self.audio_preview_time = beatmap_dict["Audio preview time"]
self.timing_points = beatmap_dict["Timing points"]
self.difficulty_id = beatmap_dict["Difficulty ID"]
self.beatmap_id = beatmap_dict["Beatmap ID"]
self.thread_id = beatmap_dict["Thread ID"]
self.standard_grade = beatmap_dict["Standard grade"]
self.taiko_grade = beatmap_dict["Taiko grade"]
self.ctb_grade = beatmap_dict["CTB grade"]
self.mania_grade = beatmap_dict["Mania grade"]
self.local_offset = beatmap_dict["Local offset"]
self.stack_leniency = beatmap_dict["Stack leniency"]
self.gameplay_mode = GameplayMode.from_int(beatmap_dict["Gameplay mode"])
self.gameplay_mode_text = beatmap_dict["Gameplay mode text"]
self.song_source = beatmap_dict["Song source"]
self.song_tags = beatmap_dict["Song tags"]
self.online_offset = beatmap_dict["Online offset"]
self.font_used = beatmap_dict["Font used"]
self.is_unplayed = beatmap_dict["Is unplayed"]
self.last_play_time = beatmap_dict["Last play time"]
self.is_osz2 = beatmap_dict["Is osz2"]
self.folder_name = beatmap_dict["Folder name"]
self.last_check_time = beatmap_dict["Last check time"]
self.ignore_beatmap_sounds = beatmap_dict["Ignore beatmap sounds"]
self.ignore_beatmap_skin = beatmap_dict["Ignore beatmap skin"]
self.disable_storyboard = beatmap_dict["Disable storyboard"]
self.disable_video = beatmap_dict["Disable video"]
self.visual_override = beatmap_dict["Visual override"]
self.mania_scroll_speed = beatmap_dict["Mania scroll speed"]
self.file_status = FileStatus.not_checked
if song_folder_path: self.path = os.path.join(song_folder_path, self.folder_name, self.osu_file_name)
if check_file_status and song_folder_path:
f = None
try :
f = open(self.path, "r", encoding="utf-8")
self.file_status = FileStatus.found
except FileNotFoundError:
self.file_status = FileStatus.not_found
if f is not None:
try:
lines = f.readlines()
lines = [line for line in lines if not line.startswith("//")]
for index,line in enumerate(lines):
if line.startswith("0,0"):
self.background_file_name = line.split(",")[2].strip()
except UnicodeDecodeError:
self.file_status = FileStatus.corrupted
def __str__(self):
return self.artist_name + " - " + self.song_name + " [" + self.difficulty + "]"
class OsuCollection:
"""This class represent a collection from the collection.db file
Args:
name (str): The name of the collection
beatmaps (list[OsuBeatmap]): The beatmaps in the collection
Attributes:
name (str): The name of the collection
beatmaps (list[OsuBeatmap]): The beatmaps in the collection
"""
def __init__(self, name:str, beatmaps: list[OsuBeatmap]):
self.name = name
self.beatmaps = beatmaps
def __str__(self):
return self.name
def __repr__(self):
return self.name
def __eq__(self, other):
if isinstance(other, OsuCollection):
return self.name == other.name
else:
return False
def __hash__(self):
return hash(self.name)
def add_beatmap(self, beatmap):
self.beatmaps.append(beatmap)
def remove_beatmap(self, beatmap):
self.beatmaps.remove(beatmap)
def get_beatmaps(self):
return self.beatmaps
def get_name(self):
return self.name
def get_beatmap_count(self):
return len(self.beatmaps)
def get_beatmap_folder_names(self):
return [beatmap.folder_name for beatmap in self.beatmaps]
def get_beatmap_ids(self):
return [beatmap.beatmap_id for beatmap in self.beatmaps]
def get_beatmap_audio_file_path(self):
return [beatmap.folder_name + beatmap.audio_file_name for beatmap in self.beatmaps]
class OsuDatabase:
def __init__(self, osu_db_file_path:str) -> None:
db_dict = parse_osu_db(osu_db_file_path)
self.version = db_dict["version"]
self.folder_count = db_dict["folder count"]
self.account_unlocked = db_dict["account blocked"]
self.account_unlock_date = db_dict["unblock date"]
self.player_name = db_dict["username"]
self.number_of_beatmaps = db_dict["beatmap count"]
self.beatmaps: List[OsuBeatmap] = []
number = 0
beatmap_count = len(db_dict["beatmaps"])
for beatmap_dict in db_dict["beatmaps"]:
self.beatmaps.append(OsuBeatmap(beatmap_dict,song_folder_path=None, check_file_status=True))
number_of_dash = int(((number+1)/beatmap_count)*50)
string = f"[{'#'*number_of_dash}{' '*(50 - number_of_dash)}] {number+1}/{beatmap_count}"
print(string,end="\r")
number += 1
self.user_permissions = db_dict["User permissions"]
def __str__(self):
return "OsuDatabase(version=" + str(self.version) + ", folder_count=" + str(self.folder_count) + ", account_unlocked=" + str(self.account_unlocked) + ", account_unlock_date=" + str(self.account_unlock_date) + ", player_name=" + str(self.player_name) + ", number_of_beatmaps=" + str(self.number_of_beatmaps) + ", beatmaps=" + str(self.beatmaps) + ", user_permissions=" + str(self.user_permissions) + ")"

35
utils/mapDownloader.py Executable file
View File

@ -0,0 +1,35 @@
import requests
import re
import os
import shutil
from typing import Union
def download_beatmap(beatmap_id:Union[int,str], output_dir:str,session:str):
"""Download a beatmap from osu! website
Args:
beatmap_id (int,str): the beatmap id
output_dir (str): the output directory
session (str): the session token
Raises:
Exception: if the request failed
"""
headers = {
"referer": f"https://osu.ppy.sh/beatmapsets/{beatmap_id}"
}
cookie = {
"osu_session": session
}
link = f"https://osu.ppy.sh/beatmapsets/{beatmap_id}/download"
response = requests.get(link, cookies = cookie ,headers=headers, allow_redirects=True)
if response.status_code != 200:
raise Exception(f"Request failed with status code {response.status_code}")
fname = ''
if "Content-Disposition" in response.headers.keys():
fname = re.findall("filename=(.+)", response.headers["Content-Disposition"])[0].strip('"')
else:
fname = str(beatmap_id) + ".osz"
fname = re.sub(r'[^\w_. -]', '_', fname)
with open(f"./{output_dir}/{fname}", "wb") as f:
f.write(response.content)

43
utils/mapDownloader_obsolete.py Executable file
View File

@ -0,0 +1,43 @@
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import os
import shutil
def download(beatmap_id, download_directory, session):
beatmap_id = str(beatmap_id)
os.makedirs(f"/tmp/osudd/{beatmap_id}/", exist_ok=True)
options = webdriver.ChromeOptions()
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_experimental_option('excludeSwitches', ['enable-logging'])
prefs = {"profile.default_content_settings.popups": 0,
"download.default_directory":f"/tmp/osudd/{beatmap_id}/", ### Set the path accordingly
"download.prompt_for_download": False, ## change the downpath accordingly
"download.directory_upgrade": True}
options.add_experimental_option("prefs", prefs)
driver = webdriver.Chrome(options=options)
driver.get("https://osu.ppy.sh/")
new_cookie = {'name': 'osu_session',
'value': session,
'domain': 'osu.ppy.sh'
}
driver.add_cookie(new_cookie)
driver.get(f"https://osu.ppy.sh/beatmapsets/{beatmap_id}")
time.sleep(2)
element = driver.find_element(By.XPATH,"//*[contains(text(), 'Download')]")
parent_a_tag = element.find_element(By.XPATH,'ancestor::a[1]')
parent_a_tag.click()
timeout = 60
elapsed_time = 0
while not any(file.endswith(".osz") for file in os.listdir(f"/tmp/osudd/{beatmap_id}/")):
time.sleep(1)
elapsed_time += 1
if elapsed_time > timeout:
break
shutil.move(f"/tmp/osudd/{beatmap_id}/", f"{download_directory}/{beatmap_id}/")
driver.quit()

191
utils/parsedb.py Executable file
View File

@ -0,0 +1,191 @@
#!/usr/bin/env python3
import datetime
from typing import IO
from utils.readfunctions import *
from enum import Enum, auto
import json
def parse_osu_db_beatmaps(file: IO[any],beatmap_count: int,version:int) -> list[dict]:
""" This function parse the beatmaps from the osu!.db file and returns the
data as a list of dictionaries, each dictionary represent a beatmap
Args:
file (IO[any]): the osu!.db file to read from
beatmap_count (_type_): the number of beatmaps to read
version (_type_): the version of the osu!.db file
Returns:
list[dict]: a list of dictionaries, each dictionary represent a beatmap
"""
beatmaps = []
for number in range(beatmap_count):
number_of_dash = int(((number+1)/beatmap_count)*50)
string = f"[{'#'*number_of_dash}{' '*(50 - number_of_dash)}] {number+1}/{beatmap_count}"
print(string,end="\r")
beatmap = {}
if version < 20191106:
beatmap["Data byte size"] = read_int(file)
beatmap["Artist name"] = read_string(file)
beatmap["Artist name (unicode)"] = read_string(file)
beatmap["Song name"] = read_string(file)
beatmap["Song name (unicode)"] = read_string(file)
beatmap["Creator name"] = read_string(file)
beatmap["Difficulty"] = read_string(file)
beatmap["Audio file name"] = read_string(file)
beatmap["MD5 hash"] = read_string(file)
beatmap[".osu filename"] = read_string(file)
ranked_status = read_byte(file)
ranked_status_str = ""
match ranked_status:
case 0:
ranked_status_str = "unknown"
case 1:
ranked_status_str = "unsubmitted"
case 2:
ranked_status_str = "pending/wip/graveyard"
case 3:
ranked_status_str = "unused"
case 4:
ranked_status_str = "ranked"
case 5:
ranked_status_str = "approved"
case 6:
ranked_status_str = "qualified"
case 7:
ranked_status_str = "loved"
case _:
ranked_status_str = "unknown"
beatmap["Ranked status"] = ranked_status
beatmap["Ranked status text"] = ranked_status_str
beatmap["Hitcircle number"] = read_short(file)
beatmap["Slider number"] = read_short(file)
beatmap["Spinner number"] = read_short(file)
beatmap["Last modification time"] = read_long(file)
beatmap["Approach rate"] = read_byte(file) if version <20140609 else read_single(file)
beatmap["Circle size"] = read_byte(file) if version <20140609 else read_single(file)
beatmap["HP drain"] = read_byte(file) if version <20140609 else read_single(file)
beatmap["Overall difficulty"] = read_byte(file) if version <20140609 else read_single(file)
beatmap["Slider velocity"] = read_double(file)
if version >= 20140609:
beatmap["Osu!Standard stars ratings"] = []
pair_number = read_int(file)
for i in range(pair_number):
beatmap["Osu!Standard stars ratings"].append(read_int_double_pair(file))
if version >= 20140609:
beatmap["Taiko stars ratings"] = []
pair_number = read_int(file)
for i in range(pair_number):
beatmap["Taiko stars ratings"].append(read_int_double_pair(file))
if version >= 20140609:
beatmap["CTB stars ratings"] = []
pair_number = read_int(file)
for i in range(pair_number):
beatmap["CTB stars ratings"].append(read_int_double_pair(file))
if version >= 20140609:
beatmap["Mania stars ratings"] = []
pair_number = read_int(file)
for i in range(pair_number):
beatmap["Mania stars ratings"].append(read_int_double_pair(file))
beatmap["Drain time"] = read_int(file)
beatmap["Total time"] = read_int(file)
beatmap["Audio preview time"] = read_int(file)
beatmap["Timing points"] = []
timing_point_number = read_int(file)
for i in range(timing_point_number):
beatmap["Timing points"].append(read_timing_point(file))
beatmap["Difficulty ID"] = read_int(file)
beatmap["Beatmap ID"] = read_int(file)
beatmap["Thread ID"] = read_int(file)
beatmap["Standard grade"] = read_byte(file)
beatmap["Taiko grade"] = read_byte(file)
beatmap["CTB grade"] = read_byte(file)
beatmap["Mania grade"] = read_byte(file)
beatmap["Local offset"] = read_short(file)
beatmap["Stack leniency"] = read_single(file)
game_mode = read_byte(file)
beatmap["Gameplay mode"] = game_mode
game_mode_str = ""
match game_mode:
case 0:
game_mode_str = "Osu!Standard"
case 1:
game_mode_str = "Taiko"
case 2:
game_mode_str = "CTB"
case 3:
game_mode_str = "Mania"
case _:
game_mode_str = "unknown"
beatmap["Gameplay mode text"] = game_mode_str
beatmap["Song source"] = read_string(file)
beatmap["Song tags"] = read_string(file)
beatmap["Online offset"] = read_short(file)
beatmap["Font used"] = read_string(file)
beatmap["Is unplayed"] = read_bool(file)
beatmap["Last play time"] = read_long(file)
beatmap["Is osz2"] = read_bool(file)
beatmap["Folder name"] = read_string(file)
beatmap["Last check time"] = read_long(file)
beatmap["Ignore beatmap sounds"] = read_bool(file)
beatmap["Ignore beatmap skin"] = read_bool(file)
beatmap["Disable storyboard"] = read_bool(file)
beatmap["Disable video"] = read_bool(file)
beatmap["Visual override"] = read_bool(file)
if version < 20140609:
beatmap["Unknown"] = read_byte(file)
beatmap["Last modification time"] = read_int(file)
beatmap["Mania scroll speed"] = read_byte(file)
beatmaps.append(beatmap)
print(80*" ",end="\r")
return beatmaps
def parse_osu_db(filepath:str)->dict:
""" This function parse the osu!.db file and returns the data as a dictionary
Args:
filepath (str): the path to the osu!.db file
Returns:
dict: a dictionary containing the data from the osu!.db file
"""
data = {}
with open(filepath,'rb') as db_file:
version = read_int(db_file)
data["version"] = version
data['folder count'] = read_int(db_file)
data['account blocked'] = read_bool(db_file)
data['unblock date'] = read_date_time(db_file).strftime("%d/%m/%Y, %H:%M:%S")
data['username'] = read_string(db_file)
beatmap_count = read_int(db_file)
data['beatmap count'] = beatmap_count
data['beatmaps'] = parse_osu_db_beatmaps(db_file,beatmap_count,version)
data['User permissions'] = read_int(db_file)
return data
def parse_collection_db(filepath:str)->dict:
""" This function parse the collection.db file and returns the data as a dictionary
Args:
filepath (str): the path to the collection.db file
Returns:
dict: a dictionary containing the data from the collection.db file
"""
data = {}
with open(filepath,'rb') as db_file:
version = read_int(db_file)
data["version"] = version
collection_count = read_int(db_file)
data["collection count"] = collection_count
data["collections"] = []
for i in range(collection_count):
collection = {}
collection["Name"] = read_string(db_file)
beatmap_count = read_int(db_file)
collection["Beatmap count"] = beatmap_count
collection["Beatmaps"] = []
for j in range(beatmap_count):
collection["Beatmaps"].append(read_string(db_file))
data["collections"].append(collection)
return data

165
utils/readfunctions.py Normal file
View File

@ -0,0 +1,165 @@
from typing import IO, Tuple
import struct
import datetime
def read_byte(file: IO[any])->int:
"""
This function read a byte from a file and return it as an integer
Args:
file (IO[any]): The file to read from
Returns:
int: The byte read from the file
"""
return int.from_bytes(file.read(1),byteorder='little')
def read_short(file: IO[any]) -> int:
"""
This function read a short from a file and return it as an integer
Args:
file (IO[any]): The file to read from
Returns:
int: The short read from the file
"""
return int.from_bytes(file.read(2),byteorder='little')
def read_int(file) -> int:
"""
This function read an integer from a file and return it as an integer
Args:
file (IO[any]): The file to read from
Returns:
int: The integer read from the file
"""
return int.from_bytes(file.read(4),byteorder='little')
def read_long(file) ->int:
"""
This function read a long from a file and return it as an integer
Args:
file (IO[any]): The file to read from
Returns:
int: The long read from the file
"""
return int.from_bytes(file.read(8),byteorder='little')
def read_LEB128(file) -> int:
"""
This function read a LEB128 from a file and return it as an integer
Args:
file (IO[any]): The file to read from
Returns:
int: The LEB128 read from the file
"""
result = 0
shift = 0
while True:
byte = read_byte(file)
result |= (byte & 0x7f) << shift
shift += 7
if byte & 0x80 == 0:
break
return result
def read_single(file) -> float:
"""
This function read a single from a file and return it as a float
Args:
file (IO[any]): The file to read from
Returns:
float: The single read from the file
"""
return struct.unpack('f',file.read(4))
def read_double(file) -> float:
"""
This function read a double from a file and return it as a float
Args:
file (IO[any]): The file to read from
Returns:
float: The double read from the file
"""
return struct.unpack('d',file.read(8))
def read_bool(file)-> bool:
"""
This function read a boolean from a file and return it as a boolean
Args:
file (IO[any]): The file to read from
Returns:
bool: The boolean read from the file
"""
return read_byte(file) != 0
def read_string(file)->str:
"""
This function read a string from a file and return it as a string
Args:
file (IO[any]): The file to read from
Returns:
str: The string read from the file
"""
byte = file.read(1)[0]
if byte == 0x00:
return ""
elif byte == 0x0b:
length = read_LEB128(file)
return file.read(length).decode('utf-8')
def read_int_double_pair(file)->tuple[int,float]:
"""
This function read an int-double pair from a file and return it as a tuple
Args:
file (IO[any]): The file to read from
Returns:
tuple[int,float]: The int-double pair read from the file
"""
file.read(1)
int = read_int(file)
file.read(1)
double = read_double(file)
return (int,double)
def read_timing_point(file)->tuple[float,float,bool]:
"""
This function read a timing point from a file and return it as a tuple
Args:
file (IO[any]): The file to read from
Returns:
tuple[float,float,bool]: The timing point read from the file
"""
return (read_double(file),read_double(file),read_bool(file))
def read_date_time(file)-> datetime.datetime:
"""
This function read a datetime from a file and return it as a datetime
Args:
file (IO[any]): The file to read from
Returns:
datetime.datetime: The datetime read from the file
"""
return datetime.datetime.fromtimestamp(read_long(file))