gault-millau/apps/authorization/views/common.py

249 lines
9.6 KiB
Python

"""Common views for application Account"""
import json
from braces.views import CsrfExemptMixin
from django.conf import settings
from django.utils.encoding import force_text
from django.utils.http import urlsafe_base64_decode
from django.utils.translation import gettext_lazy as _
from oauth2_provider.oauth2_backends import OAuthLibCore
from oauth2_provider.settings import oauth2_settings
from oauth2_provider.views.mixins import OAuthLibMixin
from rest_framework import generics
from rest_framework import permissions
from rest_framework import status
from rest_framework.response import Response
from rest_framework_simplejwt.tokens import AccessToken
from rest_framework_social_oauth2.oauth2_backends import KeepRequestCore
from rest_framework_social_oauth2.oauth2_endpoints import SocialTokenServer
from account.models import User
from authorization.models import Application
from authorization.models import JWTAccessToken
from authorization.serializers import common as serializers
from utils import exceptions as utils_exceptions
from utils.models import GMTokenGenerator
from utils.permissions import IsAuthenticatedAndTokenIsValid
from utils.views import (JWTGenericViewMixin,
JWTCreateAPIView)
# Mixins
# JWTAuthView mixin
class JWTAuthViewMixin(JWTCreateAPIView):
"""Mixin for authentication views"""
def post(self, request, *args, **kwargs):
"""Implement POST method"""
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
response = Response(serializer.data, status=status.HTTP_200_OK)
access_token = serializer.data.get('access_token')
refresh_token = serializer.data.get('refresh_token')
return self._put_cookies_in_response(
cookies=self._put_data_in_cookies(access_token=access_token,
refresh_token=refresh_token),
response=response)
# OAuth2
class BaseOAuth2ViewMixin(generics.GenericAPIView):
"""BaseMixin for classic auth views"""
def get_client_id(self, source) -> str:
"""Get application client id"""
qs = Application.objects.by_source(source=source)
if qs.exists():
return qs.first().client_id
else:
raise utils_exceptions.ServiceError(data={
'detail': _('Application is not found')})
def get_client_secret(self, source) -> str:
"""Get application client id"""
if source == Application.MOBILE:
qs = Application.objects.by_source(source=source)
if qs.exists:
return qs.first().client_secret
else:
raise utils_exceptions.ServiceError(data={
'detail': _('Not found an application with this source')})
class OAuth2ViewMixin(CsrfExemptMixin, OAuthLibMixin, BaseOAuth2ViewMixin):
"""Basic mixin for OAuth2 views"""
server_class = oauth2_settings.OAUTH2_SERVER_CLASS
validator_class = oauth2_settings.OAUTH2_VALIDATOR_CLASS
oauthlib_backend_class = OAuthLibCore
def prepare_request_data(self, validated_data: dict) -> dict:
"""Preparing request data"""
source = validated_data.get('source')
# Set OAuth2 request parameters
_request_data = {
'client_id': self.get_client_id(source)
}
# Fill client secret parameter by platform
if validated_data.get('source') == Application.MOBILE:
_request_data['client_secret'] = self.get_client_secret(source)
# Fill token parameter if transfer
if validated_data.get('token'):
_request_data['token'] = validated_data.get('token')
if _request_data:
return _request_data
else:
raise utils_exceptions.ServiceError()
# Sign in via Facebook
class OAuth2SignUpView(OAuth2ViewMixin, JWTGenericViewMixin):
"""
Implements an endpoint to convert a provider token to an access token
The endpoint is used in the following flows:
* Authorization code
* Client credentials
"""
server_class = SocialTokenServer
oauthlib_backend_class = KeepRequestCore
permission_classes = (permissions.AllowAny, )
serializer_class = serializers.OAuth2Serialzier
def post(self, request, *args, **kwargs):
# Preparing request data
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
request_data = self.prepare_request_data(serializer.validated_data)
source = serializer.validated_data.get('source')
request_data.update({
'grant_type': settings.OAUTH2_SOCIAL_AUTH_GRANT_TYPE,
'backend': settings.OAUTH2_SOCIAL_AUTH_BACKEND_NAME
})
# Use the rest framework `.data` to fake the post body of the django request.
request._request.POST = request._request.POST.copy()
for key, value in request_data.items():
request._request.POST[key] = value
# OAuth2 authentication process
url, headers, body, oauth2_status = self.create_token_response(request._request)
body = json.loads(body)
# Check OAuth2 response
if oauth2_status != status.HTTP_200_OK:
raise utils_exceptions.OAuth2Error()
# Get authenticated user
user = User.objects.by_oauth2_access_token(token=body.get('access_token'))\
.first()
# Create JWT token
tokens = user.create_jwt_tokens(source)
access_token, refresh_token = tokens.get('access_token'), tokens.get('refresh_token')
response = Response(data={'access_token': access_token,
'refresh_token': refresh_token},
status=status.HTTP_200_OK)
return self._put_cookies_in_response(
cookies=self._put_data_in_cookies(access_token=access_token,
refresh_token=refresh_token),
response=response)
# JWT
# Sign in via username and password
class SignUpView(JWTCreateAPIView):
"""View for classic signup"""
permission_classes = (permissions.AllowAny, )
serializer_class = serializers.SignupSerializer
def post(self, request, *args, **kwargs):
"""Implement POST-method"""
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save()
return Response(status=status.HTTP_201_CREATED)
class VerifyEmailConfirmView(JWTGenericViewMixin):
"""View for confirmation email"""
permission_classes = (permissions.AllowAny, )
def get(self, request, *args, **kwargs):
"""Implement GET-method"""
uidb64 = kwargs.get('uidb64')
token = kwargs.get('token')
uid = force_text(urlsafe_base64_decode(uidb64))
user_qs = User.objects.filter(pk=uid)
if user_qs.exists():
user = user_qs.first()
if not GMTokenGenerator(GMTokenGenerator.CONFIRM_EMAIL).check_token(
user, token):
raise utils_exceptions.NotValidTokenError()
# Approve email status
user.confirm_email()
# Set user status as active
user.approve()
return Response(status=status.HTTP_200_OK)
else:
raise utils_exceptions.UserNotFoundError()
# Login by username|email + password
class LoginByUsernameOrEmailView(JWTAuthViewMixin):
"""Login by email and password"""
permission_classes = (permissions.AllowAny,)
serializer_class = serializers.LoginByUsernameOrEmailSerializer
def post(self, request, *args, **kwargs):
"""Implement POST method"""
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
response = Response(serializer.data, status=status.HTTP_200_OK)
access_token = serializer.data.get('access_token')
refresh_token = serializer.data.get('refresh_token')
is_permanent = serializer.validated_data.get('remember')
return self._put_cookies_in_response(
cookies=self._put_data_in_cookies(access_token=access_token,
refresh_token=refresh_token,
permanent=is_permanent),
response=response)
# Logout
class LogoutView(JWTGenericViewMixin):
"""Logout user"""
permission_classes = (IsAuthenticatedAndTokenIsValid, )
def post(self, request, *args, **kwargs):
"""Override create method"""
# Get access token objs by JTI
access_token = AccessToken(request.COOKIES.get('access_token'))
access_token_obj = JWTAccessToken.objects.get(jti=access_token.payload.get('jti'))
# Expire tokens
access_token_obj.expire()
access_token_obj.refresh_token.expire()
return Response(status=status.HTTP_204_NO_CONTENT)
# Refresh token
class RefreshTokenView(JWTGenericViewMixin):
"""Refresh access_token"""
permission_classes = (permissions.AllowAny, )
serializer_class = serializers.RefreshTokenSerializer
def post(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
response = Response(serializer.data, status=status.HTTP_201_CREATED)
access_token = serializer.data.get('access_token')
refresh_token = serializer.data.get('refresh_token')
return self._put_cookies_in_response(
cookies=self._put_data_in_cookies(access_token=access_token,
refresh_token=refresh_token),
response=response)