"""Common serializer for application authorization""" from django.conf import settings from django.contrib.auth import authenticate from django.contrib.auth import password_validation as password_validators from django.db.models import Q from rest_framework import serializers from rest_framework import validators as rest_validators from account import models as account_models from authorization import tasks from authorization.models import JWTRefreshToken from utils import exceptions as utils_exceptions from utils import methods as utils_methods from utils.serializers import SourceSerializerMixin from utils.tokens import GMRefreshToken # Serializers class SignupSerializer(serializers.ModelSerializer): """Signup serializer serializer mixin""" class Meta: model = account_models.User fields = ( 'username', 'password', 'email', 'newsletter' ) extra_kwargs = { 'username': {'write_only': True}, 'password': {'write_only': True}, 'email': {'write_only': True}, 'newsletter': {'write_only': True} } def validate_username(self, value): """Custom username validation""" valid = utils_methods.username_validator(username=value) if not valid: raise utils_exceptions.NotValidUsernameError() if account_models.User.objects.filter(username__icontains=value).exists(): raise serializers.ValidationError() return value def validate_email(self, value): """Validate email""" if account_models.User.objects.filter(email__icontains=value).exists(): raise serializers.ValidationError() return value def validate_password(self, value): """Custom password validation""" try: password_validators.validate_password(password=value) except serializers.ValidationError as e: raise serializers.ValidationError(str(e)) else: return value def create(self, validated_data): """Override create method""" obj = account_models.User.objects.make( username=validated_data.get('username'), password=validated_data.get('password'), email=validated_data.get('email').lower(), newsletter=validated_data.get('newsletter')) # Send verification link on user email if settings.USE_CELERY: tasks.send_confirm_email.delay( user_id=obj.id, country_code=self.context.get('request').country_code) else: tasks.send_confirm_email( user_id=obj.id, country_code=self.context.get('request').country_code) return obj class LoginByUsernameOrEmailSerializer(SourceSerializerMixin, serializers.ModelSerializer): """Serializer for login user""" # REQUEST username_or_email = serializers.CharField(write_only=True) password = serializers.CharField(write_only=True) # For cookie properties (Max-Age) remember = serializers.BooleanField(write_only=True) # RESPONSE refresh_token = serializers.CharField(read_only=True) access_token = serializers.CharField(read_only=True) class Meta: """Meta-class""" model = account_models.User fields = ( 'username_or_email', 'password', 'remember', 'source', 'refresh_token', 'access_token', ) def validate(self, attrs): """Override validate method""" username_or_email = attrs.pop('username_or_email') password = attrs.pop('password') user_qs = account_models.User.objects.filter(Q(username=username_or_email) | (Q(email=username_or_email))) if not user_qs.exists(): raise utils_exceptions.UserNotFoundError() else: user = user_qs.first() authentication = authenticate(username=user.get_username(), password=password) if not authentication: raise utils_exceptions.UserNotFoundError() self.instance = user return attrs def to_representation(self, instance): """Override to_representation method""" tokens = instance.create_jwt_tokens(source=self.validated_data.get('source')) setattr(instance, 'access_token', tokens.get('access_token')) setattr(instance, 'refresh_token', tokens.get('refresh_token')) return super().to_representation(instance) class LogoutSerializer(SourceSerializerMixin): """Serializer for Logout endpoint.""" class RefreshTokenSerializer(SourceSerializerMixin): """Serializer for refresh token view""" refresh_token = serializers.CharField(read_only=True) access_token = serializers.CharField(read_only=True) def validate(self, attrs): """Override validate method""" cookie_refresh_token = self.context.get('request').COOKIES.get('refresh_token') # Check if refresh_token in COOKIES if not cookie_refresh_token: raise utils_exceptions.NotValidRefreshTokenError() refresh_token = GMRefreshToken(cookie_refresh_token) refresh_token_qs = JWTRefreshToken.objects.valid() \ .by_jti(jti=refresh_token.payload.get('jti')) # Check if the user has refresh token if not refresh_token_qs.exists(): raise utils_exceptions.NotValidRefreshTokenError() old_refresh_token = refresh_token_qs.first() source = old_refresh_token.source user = old_refresh_token.user # Expire existing tokens old_refresh_token.expire() old_refresh_token.access_token.expire() # Create new one for user response = user.create_jwt_tokens(source=source) return response # OAuth class OAuth2Serialzier(SourceSerializerMixin): """Serializer OAuth2 authorization""" token = serializers.CharField(max_length=255) class OAuth2LogoutSerializer(SourceSerializerMixin): """Serializer for logout"""