"""Common views for application Account""" import json from braces.views import CsrfExemptMixin from django.conf import settings from django.utils.encoding import force_text from django.utils.http import urlsafe_base64_decode from django.utils.translation import gettext_lazy as _ from oauth2_provider.oauth2_backends import OAuthLibCore from oauth2_provider.settings import oauth2_settings from oauth2_provider.views.mixins import OAuthLibMixin from rest_framework import generics from rest_framework import permissions from rest_framework import status from rest_framework.response import Response from rest_framework_simplejwt.tokens import AccessToken from rest_framework_social_oauth2.oauth2_backends import KeepRequestCore from rest_framework_social_oauth2.oauth2_endpoints import SocialTokenServer from account.models import User from authorization.models import Application from authorization.models import JWTAccessToken from authorization.serializers import common as serializers from utils import exceptions as utils_exceptions from utils.models import GMTokenGenerator from utils.permissions import IsAuthenticatedAndTokenIsValid from utils.views import JWTGenericViewMixin # OAuth2 class BaseOAuth2ViewMixin(generics.GenericAPIView): """BaseMixin for classic auth views""" def get_client_credentials(self, source) -> dict: """Get application credentials by source.""" credentials = {} qs = Application.objects.filter(authorization_grant_type=Application.GRANT_PASSWORD, source=source) if qs.exists(): application = qs.first() credentials = dict(client_id=application.client_id, client_secret=application.client_secret) return credentials class OAuth2ViewMixin(CsrfExemptMixin, OAuthLibMixin, BaseOAuth2ViewMixin): """Basic mixin for OAuth2 views""" server_class = oauth2_settings.OAUTH2_SERVER_CLASS validator_class = oauth2_settings.OAUTH2_VALIDATOR_CLASS oauthlib_backend_class = OAuthLibCore def prepare_request_data(self, validated_data: dict) -> dict: """Preparing request data""" source = validated_data.get('source') credentials = self.get_client_credentials(source=source) client_id = credentials.get('client_id') client_secret = credentials.get('client_secret') token = validated_data.get('token') if client_id and client_secret and token: return { 'client_id': client_id, 'client_secret': client_secret, 'token': token, } else: raise utils_exceptions.ServiceError(data={ 'detail': _('Validation OAuth2 request data error') }) # Sign in via Facebook class OAuth2SignUpView(OAuth2ViewMixin, JWTGenericViewMixin, generics.GenericAPIView): """ Implements an endpoint to convert a provider token to an access token The endpoint is used in the following flows: * Authorization code * Client credentials """ server_class = SocialTokenServer oauthlib_backend_class = KeepRequestCore permission_classes = (permissions.AllowAny, ) serializer_class = serializers.OAuth2Serialzier def post(self, request, *args, **kwargs): # Preparing request data serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) request_data = self.prepare_request_data(serializer.validated_data) source = serializer.validated_data.get('source') request_data.update({ 'grant_type': settings.OAUTH2_SOCIAL_AUTH_GRANT_TYPE, 'backend': settings.OAUTH2_SOCIAL_AUTH_BACKEND_NAME, }) # Use the rest framework `.data` to fake the post body of the django request. request._request.POST = request._request.POST.copy() for key, value in request_data.items(): request._request.POST[key] = value # OAuth2 authentication process url, headers, body, oauth2_status = self.create_token_response(request._request) body = json.loads(body) # Check OAuth2 response if oauth2_status != status.HTTP_200_OK: raise utils_exceptions.OAuth2Error(detail=body) # Get authenticated user user = User.objects.by_oauth2_access_token(token=body.get('access_token'))\ .first() # Create JWT token tokens = user.create_jwt_tokens(source) access_token, refresh_token = tokens.get('access_token'), tokens.get('refresh_token') response = Response(data={'access_token': access_token, 'refresh_token': refresh_token}, status=status.HTTP_200_OK) return self._put_cookies_in_response( cookies=self._put_data_in_cookies(access_token=access_token, refresh_token=refresh_token, permanent=True), response=response) # JWT # Sign in via username and password class SignUpView(generics.GenericAPIView): """View for classic signup""" permission_classes = (permissions.AllowAny, ) serializer_class = serializers.SignupSerializer def post(self, request, *args, **kwargs): """Implement POST-method""" serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) serializer.save() return Response(status=status.HTTP_201_CREATED) class ConfirmationEmailView(JWTGenericViewMixin, generics.GenericAPIView): """View for confirmation email""" permission_classes = (permissions.AllowAny, ) def get(self, request, *args, **kwargs): """Implement GET-method""" uidb64 = kwargs.get('uidb64') token = kwargs.get('token') uid = force_text(urlsafe_base64_decode(uidb64)) user_qs = User.objects.filter(pk=uid) if user_qs.exists(): user = user_qs.first() if not GMTokenGenerator(GMTokenGenerator.CONFIRM_EMAIL).check_token( user, token): raise utils_exceptions.NotValidTokenError() # Approve email status user.confirm_email() response = Response(status=status.HTTP_200_OK) # Create tokens tokens = user.create_jwt_tokens() return self._put_cookies_in_response( cookies=self._put_data_in_cookies( access_token=tokens.get('access_token'), refresh_token=tokens.get('refresh_token')), response=response) else: raise utils_exceptions.UserNotFoundError() # Login by username|email + password class LoginByUsernameOrEmailView(JWTGenericViewMixin, generics.GenericAPIView): """Login by email and password""" permission_classes = (permissions.AllowAny,) serializer_class = serializers.LoginByUsernameOrEmailSerializer def post(self, request, *args, **kwargs): """ ## Login view. ### POST-request data ``` { "username_or_email": , "password": , "remember": , "source": # 0 - Mobile, 1 - Web, 2 - All (by default used: 1) } ``` ### Response After a successful login, server side set up access_token and refresh token to cookies. In a payload of access token, the following information is being embed: see `User().get_user_info()`. ### Description COOKIE Max-age are determined by `remember` flag: if `remember` is `True` then `Max-age` parameter taken from `settings.COOKIES_MAX_AGE` otherwise using session COOKIE Max-age. """ serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) response = Response(serializer.data, status=status.HTTP_200_OK) access_token = serializer.data.get('access_token') refresh_token = serializer.data.get('refresh_token') is_permanent = serializer.validated_data.get('remember') return self._put_cookies_in_response( cookies=self._put_data_in_cookies(access_token=access_token, refresh_token=refresh_token, permanent=is_permanent), response=response) # Logout class LogoutView(JWTGenericViewMixin, generics.GenericAPIView): """Logout user""" permission_classes = (IsAuthenticatedAndTokenIsValid, ) def post(self, request, *args, **kwargs): """ ## Logout view. ### POST-request data ``` {} ``` ### Response If user has *valid* `access_token` in COOKIES, then response return blank response data with `HTTP_STATUS_CODE` *204*. ### Description For complete logout, user must provide *valid* `access_token` (`access_token` must be kept in `COOKIES`). After successful request with valid access_token, token would be expired, for reuse protection. """ # Get access token objs by JTI access_token = AccessToken(request.COOKIES.get('access_token')) access_token_obj = JWTAccessToken.objects.get(jti=access_token.payload.get('jti')) # Expire tokens access_token_obj.expire() access_token_obj.refresh_token.expire() return Response(status=status.HTTP_204_NO_CONTENT) # Refresh token class RefreshTokenView(JWTGenericViewMixin, generics.GenericAPIView): """Refresh access_token""" permission_classes = (permissions.AllowAny, ) serializer_class = serializers.RefreshTokenSerializer def post(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) response = Response(serializer.data, status=status.HTTP_201_CREATED) access_token = serializer.data.get('access_token') refresh_token = serializer.data.get('refresh_token') return self._put_cookies_in_response( cookies=self._put_data_in_cookies(access_token=access_token, refresh_token=refresh_token), response=response)