import sys import secrets import time from typing import Union,List from pymongo import MongoClient from Models.BaseModels.Clipboard import Clipboard from Models.BaseModels.Notification import Notification from Models.BaseModels.DB import BaseDb client = MongoClient('simailadjalim.fr:27017', username='ANOTHER_USER', password='PASS', authSource='FullSync', authMechanism='SCRAM-SHA-256') try: client.admin.command('ping') print("Pinged your deployment. You successfully connected to MongoDB!") except Exception as e: print(e) sys.exit("Could not connect to MongoDB.") db = client["FullSync"] class Mongo(BaseDb): def first_run(self)->None: db.create_collection("User") db.create_collection("Clipboard") db.create_collection("Notification") ############################################################################### def authentificate_user(self,username, password) -> Union[bool, str]: user = db.User.find_one({"username": username, "password": password}) if user is None: return False else: return user["token"] def register(self,username, password) -> bool: # check for username already existing user = db.User.find_one({"username": username, "password": password}) if user is not None: print("username already taken") return False # add user in db token = secrets.token_urlsafe(64) try: db.User.insert_one({"username": username, "password": password, "token": token}) return True except: return False ############################################################################### def add_notification(self,token, title, content, device_name) -> bool: # check if token is valid user = db.User.find_one({"token": token}) if user is None: print("no user") return False # add notification in db db.Notification.insert_one({"timestamp": time.time(), "device_name": device_name, "token": token, "title": title, "content": content}) return True def get_notification(self,token) -> Union[bool, List[Notification]]: # check if token is valid user = db.User.find_one({"token": token}) if user is None: print("no user") return False # get all notification in db for this token db_notifications = db.Notification.find({"token": token}) notifications = [] for notification in db_notifications: notifications.append(Notification(0, notification["timestamp"], notification["device_name"], notification["token"], notification["title"], notification["content"])) return notifications def get_notification_by_id(self,token, id) -> Union[bool, Notification]: # check if token is valid user = db.User.find_one({"token": token}) if user is None: print("no user") return False # get notification in db for this token and this notification id db_notifications = db.Notification.find_one({"_id":id,"token": token}) notification = Notification(0, db_notifications["timestamp"], db_notifications["device_name"], db_notifications["token"], db_notifications["title"], db_notifications["content"]) return notification ############################################################################### def add_clipboard(self,token, content, device_name) -> bool: user = db.User.find_one({"token": token}) if user is None: print("no user") return False db.Clipboard.insert_one({"timestamp": time.time(), "device_name": device_name, "token": token, "content": content}) return True def get_clipboard(self,token) -> Union[bool, List[Clipboard]]: #check if token is valid user = db.User.find_one({"token": token}) if user is None: print("no user") return False db_clipboards = db.Clipboard.find({"token": token}) clipboards = [] for clipboard in db_clipboards: clipboards.append(Clipboard(0, clipboard["timestamp"], clipboard["device_name"], clipboard["token"], clipboard["content"])) return clipboards def get_clipboard_by_id(self,token, id) -> Union[bool, Clipboard]: #check if token is valid user = db.User.find_one({"token": token}) if user is None: print("no user") return False # get clipboard in db for this token and this clipboard id db_clipboard = db.ClipBoard.find_one({"_id":id,"token": token}) clipboard = Clipboard(0, db_clipboard["timestamp"], db_clipboard["device_name"], db_clipboard["token"], db_clipboard["content"]) return clipboard