47 lines
1.6 KiB
Python
47 lines
1.6 KiB
Python
"""Custom authentication based on JWTAuthentication class"""
|
|
from rest_framework import HTTP_HEADER_ENCODING
|
|
from rest_framework_simplejwt.authentication import JWTAuthentication
|
|
from rest_framework_simplejwt.settings import api_settings
|
|
|
|
from utils.methods import get_token_from_cookies
|
|
|
|
AUTH_HEADER_TYPES = api_settings.AUTH_HEADER_TYPES
|
|
|
|
if not isinstance(api_settings.AUTH_HEADER_TYPES, (list, tuple)):
|
|
AUTH_HEADER_TYPES = (AUTH_HEADER_TYPES,)
|
|
|
|
AUTH_HEADER_TYPE_BYTES = set(
|
|
h.encode(HTTP_HEADER_ENCODING)
|
|
for h in AUTH_HEADER_TYPES
|
|
)
|
|
|
|
|
|
class GMJWTAuthentication(JWTAuthentication):
|
|
"""
|
|
An authentication plugin that authenticates requests through a JSON web
|
|
token provided in a request cookies.
|
|
"""
|
|
|
|
def authenticate(self, request):
|
|
try:
|
|
token = get_token_from_cookies(request)
|
|
# Return non-authorized user if token not in cookies
|
|
assert token
|
|
|
|
raw_token = self.get_raw_token(token)
|
|
# Return non-authorized user if cant get raw token
|
|
assert raw_token
|
|
|
|
validated_token = self.get_validated_token(raw_token)
|
|
user = self.get_user(validated_token)
|
|
|
|
# Check record in DB
|
|
token_is_valid = user.access_tokens.valid() \
|
|
.by_jti(jti=validated_token.payload.get('jti'))
|
|
assert token_is_valid.exists()
|
|
except:
|
|
# Return non-authorized user if token is invalid or raised an error when run checks.
|
|
return None
|
|
else:
|
|
return user, None
|