user.py 2.23 KB
from fastapi import APIRouter, Depends
from fastapi.security import OAuth2PasswordRequestForm
from bson.objectid import ObjectId
from server.service.jwttoken import create_access_token
from server.service.oauth import get_current_user
from server.service.hashing import Hash
from server.service.database import get_connection
from server.mock.model import User_helper
from server.mock.schemas import OfficeUser

collection = get_connection("Office_User")
user = APIRouter()

@user.get("/users")
async def get_users():
    users = []
    for i in collection.find():
        users.append(User_helper(i))
    return users

@user.get("/users/{id}")
async def get_user_by_id(id:str):
    user = collection.find_one({"_id": ObjectId(id)})
    return User_helper(user)

@user.post("/user/")
async def create_user(user: OfficeUser):
    payload = {
        "username": user.username,
        "email": user.email,
        "password": Hash.bcrypt(user.password)
    }
    user = collection.insert_one(payload)
    new_user = collection.find_one({"_id":user.inserted_id})
    return User_helper(new_user)

@user.patch("/user/{id}")
async def update_user(id:str ,user: OfficeUser):
    payload = {
        "username": user.username,
        "email": user.email,
    }
    collection.update_one({"_id",ObjectId(id)},{"$set": payload })
    update_user = collection.find_one({"_id":ObjectId(id)})
    return User_helper(update_user)

@user.delete("/user/{id}")
async def delete_user(id: str):
    collection.delete_one({"_id":ObjectId(id)})
    users = []
    for i in collection.find():
        users.append(User_helper(i))
    return users

@user.post("/login")
async def login(req:OAuth2PasswordRequestForm = Depends()):
    user = collection.find_one({"username": req.username})
    if user is None:
        return {"error": "User not found"}
    format_user = User_helper(user)
    if not Hash.verify(format_user["password"],req.password):
        return {"error": "Invalid password"}
    access_token = create_access_token({"username": format_user["username"],"email": format_user["email"]})
    return {"access_token":access_token, "token_type": "Bearer"}

@user.get("/verify_user")
async def verify_token_user(current_user: OfficeUser = Depends(get_current_user)):
    return current_user