gault-millau/apps/authorization/views/common.py

273 lines
10 KiB
Python

"""Common views for application Account"""
import json
from braces.views import CsrfExemptMixin
from django.conf import settings
from django.utils.encoding import force_text
from django.utils.http import urlsafe_base64_decode
from django.utils.translation import gettext_lazy as _
from oauth2_provider.oauth2_backends import OAuthLibCore
from oauth2_provider.settings import oauth2_settings
from oauth2_provider.views.mixins import OAuthLibMixin
from rest_framework import generics
from rest_framework import permissions
from rest_framework import status
from rest_framework.response import Response
from rest_framework_simplejwt.tokens import AccessToken
from rest_framework_social_oauth2.oauth2_backends import KeepRequestCore
from rest_framework_social_oauth2.oauth2_endpoints import SocialTokenServer
from account.models import User
from authorization.models import Application
from authorization.models import JWTAccessToken
from authorization.serializers import common as serializers
from utils import exceptions as utils_exceptions
from utils.models import GMTokenGenerator
from utils.permissions import IsAuthenticatedAndTokenIsValid
from utils.views import JWTGenericViewMixin
# OAuth2
class BaseOAuth2ViewMixin(generics.GenericAPIView):
"""BaseMixin for classic auth views"""
def get_client_credentials(self, source) -> dict:
"""Get application credentials by source."""
credentials = {}
qs = Application.objects.filter(authorization_grant_type=Application.GRANT_PASSWORD,
source=source)
if qs.exists():
application = qs.first()
credentials = dict(client_id=application.client_id,
client_secret=application.client_secret)
return credentials
class OAuth2ViewMixin(CsrfExemptMixin, OAuthLibMixin, BaseOAuth2ViewMixin):
"""Basic mixin for OAuth2 views"""
server_class = oauth2_settings.OAUTH2_SERVER_CLASS
validator_class = oauth2_settings.OAUTH2_VALIDATOR_CLASS
oauthlib_backend_class = OAuthLibCore
def prepare_request_data(self, validated_data: dict) -> dict:
"""Preparing request data"""
source = validated_data.get('source')
credentials = self.get_client_credentials(source=source)
client_id = credentials.get('client_id')
client_secret = credentials.get('client_secret')
token = validated_data.get('token')
if client_id and client_secret and token:
return {
'client_id': client_id,
'client_secret': client_secret,
'token': token,
}
else:
raise utils_exceptions.ServiceError(data={
'detail': _('Validation OAuth2 request data error')
})
# Sign in via Facebook
class OAuth2SignUpView(OAuth2ViewMixin, JWTGenericViewMixin, generics.GenericAPIView):
"""
Implements an endpoint to convert a provider token to an access token
The endpoint is used in the following flows:
* Authorization code
* Client credentials
"""
server_class = SocialTokenServer
oauthlib_backend_class = KeepRequestCore
permission_classes = (permissions.AllowAny, )
serializer_class = serializers.OAuth2Serialzier
def post(self, request, *args, **kwargs):
# Preparing request data
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
request_data = self.prepare_request_data(serializer.validated_data)
source = serializer.validated_data.get('source')
request_data.update({
'grant_type': settings.OAUTH2_SOCIAL_AUTH_GRANT_TYPE,
'backend': settings.OAUTH2_SOCIAL_AUTH_BACKEND_NAME,
})
# Use the rest framework `.data` to fake the post body of the django request.
request._request.POST = request._request.POST.copy()
for key, value in request_data.items():
request._request.POST[key] = value
# OAuth2 authentication process
url, headers, body, oauth2_status = self.create_token_response(request._request)
body = json.loads(body)
# Check OAuth2 response
if oauth2_status != status.HTTP_200_OK:
raise utils_exceptions.OAuth2Error(detail=body)
# Get authenticated user
user = User.objects.by_oauth2_access_token(token=body.get('access_token'))\
.first()
# Create JWT token
tokens = user.create_jwt_tokens(source)
access_token, refresh_token = tokens.get('access_token'), tokens.get('refresh_token')
response = Response(data={'access_token': access_token,
'refresh_token': refresh_token},
status=status.HTTP_200_OK)
return self._put_cookies_in_response(
cookies=self._put_data_in_cookies(access_token=access_token,
refresh_token=refresh_token,
permanent=True),
response=response)
# JWT
# Sign in via username and password
class SignUpView(generics.GenericAPIView):
"""View for classic signup"""
permission_classes = (permissions.AllowAny, )
serializer_class = serializers.SignupSerializer
def post(self, request, *args, **kwargs):
"""Implement POST-method"""
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save()
return Response(status=status.HTTP_201_CREATED)
class ConfirmationEmailView(JWTGenericViewMixin, generics.GenericAPIView):
"""View for confirmation email"""
permission_classes = (permissions.AllowAny, )
def get(self, request, *args, **kwargs):
"""Implement GET-method"""
uidb64 = kwargs.get('uidb64')
token = kwargs.get('token')
uid = force_text(urlsafe_base64_decode(uidb64))
user_qs = User.objects.filter(pk=uid)
if user_qs.exists():
user = user_qs.first()
if not GMTokenGenerator(GMTokenGenerator.CONFIRM_EMAIL).check_token(
user, token):
raise utils_exceptions.NotValidTokenError()
# Approve email status
user.confirm_email()
response = Response(status=status.HTTP_200_OK)
# Create tokens
tokens = user.create_jwt_tokens()
return self._put_cookies_in_response(
cookies=self._put_data_in_cookies(
access_token=tokens.get('access_token'),
refresh_token=tokens.get('refresh_token')),
response=response)
else:
raise utils_exceptions.UserNotFoundError()
# Login by username|email + password
class LoginByUsernameOrEmailView(JWTGenericViewMixin, generics.GenericAPIView):
"""Login by email and password"""
permission_classes = (permissions.AllowAny,)
serializer_class = serializers.LoginByUsernameOrEmailSerializer
def post(self, request, *args, **kwargs):
"""
## Login view.
### POST-request data
```
{
"username_or_email": <str>,
"password": <str>,
"remember": <bool>,
"source": <int> # 0 - Mobile, 1 - Web, 2 - All (by default used: 1)
}
```
### Response
After a successful login, server side set up access_token and refresh token to cookies.
In a payload of access token, the following information is being embed:
see `User().get_user_info()`.
### Description
COOKIE Max-age are determined by `remember` flag:
if `remember` is `True` then `Max-age` parameter taken from `settings.COOKIES_MAX_AGE`
otherwise using session COOKIE Max-age.
"""
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
response = Response(serializer.data, status=status.HTTP_200_OK)
access_token = serializer.data.get('access_token')
refresh_token = serializer.data.get('refresh_token')
is_permanent = serializer.validated_data.get('remember')
return self._put_cookies_in_response(
cookies=self._put_data_in_cookies(access_token=access_token,
refresh_token=refresh_token,
permanent=is_permanent),
response=response)
# Logout
class LogoutView(JWTGenericViewMixin, generics.GenericAPIView):
"""Logout user"""
permission_classes = (IsAuthenticatedAndTokenIsValid, )
def post(self, request, *args, **kwargs):
"""
## Logout view.
### POST-request data
```
{}
```
### Response
If user has *valid* `access_token` in COOKIES, then response return
blank response data with `HTTP_STATUS_CODE` *204*.
### Description
For complete logout, user must provide *valid* `access_token`
(`access_token` must be kept in `COOKIES`).
After successful request with valid access_token, token would be expired,
for reuse protection.
"""
# Get access token objs by JTI
access_token = AccessToken(request.COOKIES.get('access_token'))
access_token_obj = JWTAccessToken.objects.get(jti=access_token.payload.get('jti'))
# Expire tokens
access_token_obj.expire()
access_token_obj.refresh_token.expire()
return Response(status=status.HTTP_204_NO_CONTENT)
# Refresh token
class RefreshTokenView(JWTGenericViewMixin, generics.GenericAPIView):
"""Refresh access_token"""
permission_classes = (permissions.AllowAny, )
serializer_class = serializers.RefreshTokenSerializer
def post(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
response = Response(serializer.data, status=status.HTTP_201_CREATED)
access_token = serializer.data.get('access_token')
refresh_token = serializer.data.get('refresh_token')
return self._put_cookies_in_response(
cookies=self._put_data_in_cookies(access_token=access_token,
refresh_token=refresh_token),
response=response)