|
|
|
|
@ -29,8 +29,7 @@ httpbasic = HTTPBasic(auto_error=False)
|
|
|
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_user_from_token(token, secret_key: str, token_type: str = "access",
|
|
|
|
|
raise_on_error: bool = True) -> Union[bool, str]:
|
|
|
|
|
def get_user_from_token(token, secret_key: str, token_type: str = "access") -> str:
|
|
|
|
|
credentials_exception = HTTPException(
|
|
|
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
|
|
|
detail="Could not validate credentials",
|
|
|
|
|
@ -40,28 +39,19 @@ def get_user_from_token(token, secret_key: str, token_type: str = "access",
|
|
|
|
|
payload = jwt.decode(token, secret_key, algorithms=[ALGORITHM])
|
|
|
|
|
username: str = payload.get("identity", {}).get('u')
|
|
|
|
|
if username is None:
|
|
|
|
|
if raise_on_error:
|
|
|
|
|
raise credentials_exception
|
|
|
|
|
else:
|
|
|
|
|
return False
|
|
|
|
|
raise credentials_exception
|
|
|
|
|
if payload.get("type") != token_type:
|
|
|
|
|
if raise_on_error:
|
|
|
|
|
raise credentials_exception
|
|
|
|
|
else:
|
|
|
|
|
return False
|
|
|
|
|
raise credentials_exception
|
|
|
|
|
|
|
|
|
|
except jwt.PyJWTError:
|
|
|
|
|
if raise_on_error:
|
|
|
|
|
raise credentials_exception
|
|
|
|
|
else:
|
|
|
|
|
return False
|
|
|
|
|
raise credentials_exception
|
|
|
|
|
return username
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# This should be reimplemented to better realign with the existing tools provided
|
|
|
|
|
# by FastAPI regarding API Tokens
|
|
|
|
|
# https://github.com/tiangolo/fastapi/blob/master/fastapi/security/api_key.py
|
|
|
|
|
async def get_ws_token(
|
|
|
|
|
async def validate_ws_token(
|
|
|
|
|
ws: WebSocket,
|
|
|
|
|
ws_token: Union[str, None] = Query(..., alias="token"),
|
|
|
|
|
api_config: Dict[str, Any] = Depends(get_api_config)
|
|
|
|
|
@ -72,13 +62,16 @@ async def get_ws_token(
|
|
|
|
|
if secrets.compare_digest(secret_ws_token, ws_token):
|
|
|
|
|
# Just return the token if it matches
|
|
|
|
|
return ws_token
|
|
|
|
|
elif user := get_user_from_token(ws_token, secret_jwt_key, raise_on_error=False):
|
|
|
|
|
# If the token is a jwt, and it's valid return the user
|
|
|
|
|
return user
|
|
|
|
|
else:
|
|
|
|
|
logger.info("Denying websocket request")
|
|
|
|
|
# If it doesn't match, close the websocket connection
|
|
|
|
|
await ws.close(code=status.WS_1008_POLICY_VIOLATION)
|
|
|
|
|
try:
|
|
|
|
|
user = get_user_from_token(ws_token, secret_jwt_key)
|
|
|
|
|
return user
|
|
|
|
|
# If the token is a jwt, and it's valid return the user
|
|
|
|
|
except HTTPException:
|
|
|
|
|
pass
|
|
|
|
|
logger.info("Denying websocket request")
|
|
|
|
|
# If it doesn't match, close the websocket connection
|
|
|
|
|
await ws.close(code=status.WS_1008_POLICY_VIOLATION)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_token(data: dict, secret_key: str, token_type: str = "access") -> str:
|
|
|
|
|
|