176 lines
6.2 KiB
Python
176 lines
6.2 KiB
Python
"""Common serializer for application authorization"""
|
|
from django.conf import settings
|
|
from django.contrib.auth import authenticate
|
|
from django.contrib.auth import password_validation as password_validators
|
|
from django.db.models import Q
|
|
from rest_framework import serializers
|
|
from rest_framework import validators as rest_validators
|
|
|
|
from account import models as account_models
|
|
from authorization import tasks
|
|
from authorization.models import JWTRefreshToken
|
|
from utils import exceptions as utils_exceptions
|
|
from utils import methods as utils_methods
|
|
from utils.serializers import SourceSerializerMixin
|
|
from utils.tokens import GMRefreshToken
|
|
|
|
|
|
# Serializers
|
|
class SignupSerializer(serializers.ModelSerializer):
|
|
"""Signup serializer serializer mixin"""
|
|
# REQUEST
|
|
username = serializers.CharField(write_only=True)
|
|
password = serializers.CharField(write_only=True)
|
|
email = serializers.EmailField(write_only=True)
|
|
newsletter = serializers.BooleanField(write_only=True)
|
|
|
|
class Meta:
|
|
model = account_models.User
|
|
fields = (
|
|
'username',
|
|
'password',
|
|
'email',
|
|
'newsletter'
|
|
)
|
|
|
|
def validate_username(self, value):
|
|
"""Custom username validation"""
|
|
valid = utils_methods.username_validator(username=value)
|
|
if not valid:
|
|
raise utils_exceptions.NotValidUsernameError()
|
|
if account_models.User.objects.filter(username__icontains=value).exists():
|
|
raise serializers.ValidationError()
|
|
return value
|
|
|
|
def validate_email(self, value):
|
|
"""Validate email"""
|
|
if account_models.User.objects.filter(email__icontains=value).exists():
|
|
raise serializers.ValidationError()
|
|
return value
|
|
|
|
def validate_password(self, value):
|
|
"""Custom password validation"""
|
|
try:
|
|
password_validators.validate_password(password=value)
|
|
except serializers.ValidationError as e:
|
|
raise serializers.ValidationError(str(e))
|
|
else:
|
|
return value
|
|
|
|
def create(self, validated_data):
|
|
"""Override create method"""
|
|
obj = account_models.User.objects.make(
|
|
username=validated_data.get('username'),
|
|
password=validated_data.get('password'),
|
|
email=validated_data.get('email').lower(),
|
|
newsletter=validated_data.get('newsletter'))
|
|
# Send verification link on user email
|
|
if settings.USE_CELERY:
|
|
tasks.send_confirm_email.delay(
|
|
user_id=obj.id,
|
|
country_code=self.context.get('request').country_code)
|
|
else:
|
|
tasks.send_confirm_email(
|
|
user_id=obj.id,
|
|
country_code=self.context.get('request').country_code)
|
|
return obj
|
|
|
|
|
|
class LoginByUsernameOrEmailSerializer(SourceSerializerMixin,
|
|
serializers.ModelSerializer):
|
|
"""Serializer for login user"""
|
|
# REQUEST
|
|
username_or_email = serializers.CharField(write_only=True)
|
|
password = serializers.CharField(write_only=True)
|
|
|
|
# For cookie properties (Max-Age)
|
|
remember = serializers.BooleanField(write_only=True)
|
|
|
|
# RESPONSE
|
|
refresh_token = serializers.CharField(read_only=True)
|
|
access_token = serializers.CharField(read_only=True)
|
|
|
|
class Meta:
|
|
"""Meta-class"""
|
|
model = account_models.User
|
|
fields = (
|
|
'username_or_email',
|
|
'password',
|
|
'remember',
|
|
'source',
|
|
'refresh_token',
|
|
'access_token',
|
|
)
|
|
|
|
def validate(self, attrs):
|
|
"""Override validate method"""
|
|
username_or_email = attrs.pop('username_or_email')
|
|
password = attrs.pop('password')
|
|
user_qs = account_models.User.objects.filter(Q(username=username_or_email) |
|
|
(Q(email=username_or_email)))
|
|
if not user_qs.exists():
|
|
raise utils_exceptions.UserNotFoundError()
|
|
else:
|
|
user = user_qs.first()
|
|
authentication = authenticate(username=user.get_username(),
|
|
password=password)
|
|
if not authentication:
|
|
raise utils_exceptions.WrongAuthCredentials()
|
|
self.instance = user
|
|
return attrs
|
|
|
|
def to_representation(self, instance):
|
|
"""Override to_representation method"""
|
|
tokens = instance.create_jwt_tokens(source=self.validated_data.get('source'))
|
|
setattr(instance, 'access_token', tokens.get('access_token'))
|
|
setattr(instance, 'refresh_token', tokens.get('refresh_token'))
|
|
return super().to_representation(instance)
|
|
|
|
|
|
class LogoutSerializer(SourceSerializerMixin):
|
|
"""Serializer for Logout endpoint."""
|
|
|
|
|
|
class RefreshTokenSerializer(SourceSerializerMixin):
|
|
"""Serializer for refresh token view"""
|
|
refresh_token = serializers.CharField(read_only=True)
|
|
access_token = serializers.CharField(read_only=True)
|
|
|
|
def validate(self, attrs):
|
|
"""Override validate method"""
|
|
|
|
cookie_refresh_token = self.context.get('request').COOKIES.get('refresh_token')
|
|
# Check if refresh_token in COOKIES
|
|
if not cookie_refresh_token:
|
|
raise utils_exceptions.NotValidRefreshTokenError()
|
|
|
|
refresh_token = GMRefreshToken(cookie_refresh_token)
|
|
refresh_token_qs = JWTRefreshToken.objects.valid() \
|
|
.by_jti(jti=refresh_token.payload.get('jti'))
|
|
# Check if the user has refresh token
|
|
if not refresh_token_qs.exists():
|
|
raise utils_exceptions.NotValidRefreshTokenError()
|
|
|
|
old_refresh_token = refresh_token_qs.first()
|
|
source = old_refresh_token.source
|
|
user = old_refresh_token.user
|
|
|
|
# Expire existing tokens
|
|
old_refresh_token.expire()
|
|
old_refresh_token.access_token.expire()
|
|
|
|
# Create new one for user
|
|
response = user.create_jwt_tokens(source=source)
|
|
|
|
return response
|
|
|
|
|
|
# OAuth
|
|
class OAuth2Serialzier(SourceSerializerMixin):
|
|
"""Serializer OAuth2 authorization"""
|
|
token = serializers.CharField(max_length=255)
|
|
|
|
|
|
class OAuth2LogoutSerializer(SourceSerializerMixin):
|
|
"""Serializer for logout"""
|