import motor.motor_asyncio from bson.objectid import ObjectId MONGO_DETAILS = "mongodb://localhost:27017" client = motor.motor_asyncio.AsyncIOMotorClient(MONGO_DETAILS) database = client.database user_collection = database.get_collection("users") # helpers def user_helper(user) -> dict: return { "id": str(user["_id"]), "username": user["username"], "email": user["email"], "password": user["password"], "projectname": user["projectname"], "projectpath": user["projectpath"], } # Retrieve all users present in the database async def retrieve_users(): users = [] async for user in user_collection.find(): users.append(user_helper(user)) return users # Add a new user into to the database async def add_user(user_data: dict) -> dict: user = await user_collection.insert_one(user_data) new_user = await user_collection.find_one({"_id": user.inserted_id}) return user_helper(new_user) # Retrieve a user with a matching ID async def retrieve_user(username: str) -> dict: user = await user_collection.find_one({"username":username}) if user: return user_helper(user) # Update a user with a matching ID async def update_user(username: str, data: dict): # Return false if an empty request body is sent. if len(data) < 1: return False user = await user_collection.find_one({"username": username}) if user: updated_user = await user_collection.update_one( {"username": username}, {"$set": data} ) if updated_user: return True return False # Delete a user from the database async def delete_user(username: str): user = await user_collection.find_one({"username": username}) if user: await user_collection.delete_one({"username": username}) return True