|
|
|
|
@ -1,6 +1,6 @@
|
|
|
|
|
import logging
|
|
|
|
|
import secrets
|
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
|
from typing import Any, Dict, Union
|
|
|
|
|
|
|
|
|
|
import jwt
|
|
|
|
|
@ -88,14 +88,14 @@ async def validate_ws_token(
|
|
|
|
|
def create_token(data: dict, secret_key: str, token_type: str = "access") -> str:
|
|
|
|
|
to_encode = data.copy()
|
|
|
|
|
if token_type == "access":
|
|
|
|
|
expire = datetime.utcnow() + timedelta(minutes=15)
|
|
|
|
|
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
|
|
|
|
|
elif token_type == "refresh":
|
|
|
|
|
expire = datetime.utcnow() + timedelta(days=30)
|
|
|
|
|
expire = datetime.now(timezone.utc) + timedelta(days=30)
|
|
|
|
|
else:
|
|
|
|
|
raise ValueError()
|
|
|
|
|
to_encode.update({
|
|
|
|
|
"exp": expire,
|
|
|
|
|
"iat": datetime.utcnow(),
|
|
|
|
|
"iat": datetime.now(timezone.utc),
|
|
|
|
|
"type": token_type,
|
|
|
|
|
})
|
|
|
|
|
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=ALGORITHM)
|
|
|
|
|
|