1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from fastapi import APIRouter, Depends
from fastapi.security import OAuth2PasswordRequestForm
from bson.objectid import ObjectId
from server.service.jwttoken import create_access_token
from server.service.oauth import get_current_user
from server.service.hashing import Hash
from server.service.database import get_connection
from server.mock.model import User_helper
from server.mock.schemas import OfficeUser
collection = get_connection("Office_User")
user = APIRouter()
@user.get("/users")
async def get_users():
users = []
for i in collection.find():
users.append(User_helper(i))
return users
@user.get("/users/{id}")
async def get_user_by_id(id:str):
user = collection.find_one({"_id": ObjectId(id)})
return User_helper(user)
@user.post("/user/")
async def create_user(user: OfficeUser):
payload = {
"username": user.username,
"email": user.email,
"password": Hash.bcrypt(user.password)
}
user = collection.insert_one(payload)
new_user = collection.find_one({"_id":user.inserted_id})
return User_helper(new_user)
@user.patch("/user/{id}")
async def update_user(id:str ,user: OfficeUser):
payload = {
"username": user.username,
"email": user.email,
}
collection.update_one({"_id",ObjectId(id)},{"$set": payload })
update_user = collection.find_one({"_id":ObjectId(id)})
return User_helper(update_user)
@user.delete("/user/{id}")
async def delete_user(id: str):
collection.delete_one({"_id":ObjectId(id)})
users = []
for i in collection.find():
users.append(User_helper(i))
return users
@user.post("/login")
async def login(req:OAuth2PasswordRequestForm = Depends()):
user = collection.find_one({"username": req.username})
if user is None:
return {"error": "User not found"}
format_user = User_helper(user)
if not Hash.verify(format_user["password"],req.password):
return {"error": "Invalid password"}
access_token = create_access_token({"username": format_user["username"],"email": format_user["email"]})
return {"access_token":access_token, "token_type": "Bearer"}
@user.get("/verify_user")
async def verify_token_user(current_user: OfficeUser = Depends(get_current_user)):
return current_user