238 lines
7.6 KiB
Python
238 lines
7.6 KiB
Python
"""Serializers for account web"""
|
|
from django.conf import settings
|
|
from django.contrib.auth import password_validation as password_validators
|
|
from django.db.models import Q
|
|
from django.utils.translation import gettext_lazy as _
|
|
from rest_framework import serializers
|
|
|
|
from account import models, tasks
|
|
from authorization.models import JWTRefreshToken
|
|
from utils import exceptions as utils_exceptions
|
|
from utils.serializers import SourceSerializerMixin
|
|
from utils.tokens import GMRefreshToken
|
|
|
|
|
|
class PasswordResetSerializer(serializers.ModelSerializer):
|
|
"""Serializer from model PasswordReset"""
|
|
username_or_email = serializers.CharField(required=False,
|
|
write_only=True,)
|
|
|
|
class Meta:
|
|
"""Meta class"""
|
|
model = models.ResetPasswordToken
|
|
fields = (
|
|
'username_or_email',
|
|
)
|
|
|
|
def validate(self, attrs):
|
|
"""Override validate method"""
|
|
user = self.context.get('request').user
|
|
|
|
if user.is_anonymous:
|
|
username_or_email = attrs.get('username_or_email')
|
|
if not username_or_email:
|
|
raise serializers.ValidationError(_('Username or Email not requested'))
|
|
# Check user in DB
|
|
user_qs = models.User.objects.filter(Q(email=username_or_email) |
|
|
Q(username=username_or_email))
|
|
if user_qs.exists():
|
|
attrs['user'] = user_qs.first()
|
|
else:
|
|
raise utils_exceptions.UserNotFoundError()
|
|
else:
|
|
attrs['user'] = user
|
|
return attrs
|
|
|
|
def create(self, validated_data, *args, **kwargs):
|
|
"""Override create method"""
|
|
user = validated_data.pop('user')
|
|
ip_address = self.context.get('request').META.get('REMOTE_ADDR')
|
|
|
|
obj = models.ResetPasswordToken.objects.create(
|
|
user=user,
|
|
ip_address=ip_address,
|
|
source=models.ResetPasswordToken.WEB
|
|
)
|
|
if settings.USE_CELERY:
|
|
tasks.send_reset_password_email.delay(obj.id)
|
|
else:
|
|
tasks.send_reset_password_email(obj.id)
|
|
return obj
|
|
|
|
|
|
class PasswordResetConfirmSerializer(serializers.ModelSerializer):
|
|
"""Serializer for model User"""
|
|
|
|
password = serializers.CharField(write_only=True)
|
|
|
|
class Meta:
|
|
"""Meta class"""
|
|
model = models.ResetPasswordToken
|
|
fields = ('password', )
|
|
|
|
def validate(self, attrs):
|
|
"""Override validate method"""
|
|
user = self.instance.user
|
|
password = attrs.get('password')
|
|
try:
|
|
# Compare new password with the old ones
|
|
if user.check_password(raw_password=password):
|
|
raise utils_exceptions.PasswordsAreEqual()
|
|
# Validate password
|
|
password_validators.validate_password(password=password)
|
|
except serializers.ValidationError as e:
|
|
raise serializers.ValidationError(str(e))
|
|
else:
|
|
return attrs
|
|
|
|
def update(self, instance, validated_data):
|
|
"""Override update method"""
|
|
# Update user password from instance
|
|
instance.user.set_password(validated_data.get('password'))
|
|
instance.user.save()
|
|
|
|
# Overdue instance
|
|
instance.overdue()
|
|
return instance
|
|
|
|
|
|
class ChangePasswordSerializer(serializers.ModelSerializer):
|
|
"""Serializer for model User."""
|
|
|
|
password = serializers.CharField(write_only=True)
|
|
|
|
class Meta:
|
|
"""Meta class"""
|
|
model = models.User
|
|
fields = ('password', )
|
|
|
|
def validate(self, attrs):
|
|
"""Override validate method"""
|
|
password = attrs.get('password')
|
|
try:
|
|
# Compare new password with the old ones
|
|
if self.instance.check_password(raw_password=password):
|
|
raise utils_exceptions.PasswordsAreEqual()
|
|
# Validate password
|
|
password_validators.validate_password(password=password)
|
|
except serializers.ValidationError as e:
|
|
raise serializers.ValidationError(str(e))
|
|
else:
|
|
return attrs
|
|
|
|
def update(self, instance, validated_data):
|
|
"""Override update method"""
|
|
# Update user password from instance
|
|
instance.set_password(validated_data.get('password'))
|
|
instance.save()
|
|
|
|
# Expire tokens
|
|
instance.expire_access_tokens()
|
|
instance.expire_refresh_tokens()
|
|
return instance
|
|
|
|
|
|
class ChangeEmailSerializer(serializers.ModelSerializer):
|
|
"""Change user email serializer"""
|
|
|
|
class Meta:
|
|
"""Meta class"""
|
|
model = models.User
|
|
fields = (
|
|
'email',
|
|
)
|
|
|
|
def validate_email(self, value):
|
|
"""Validate email value"""
|
|
if value == self.instance.email:
|
|
raise serializers.ValidationError()
|
|
return value
|
|
|
|
def validate(self, attrs):
|
|
"""Override validate method"""
|
|
email_confirmed = self.instance.email_confirmed
|
|
if not email_confirmed:
|
|
raise serializers.ValidationError()
|
|
return attrs
|
|
|
|
def update(self, instance, validated_data):
|
|
"""
|
|
Override update method
|
|
"""
|
|
instance.email = validated_data.get('email')
|
|
instance.email_confirmed = False
|
|
instance.save()
|
|
# Send verification link on user email for change email address
|
|
if settings.USE_CELERY:
|
|
tasks.confirm_new_email_address.delay(instance.id)
|
|
else:
|
|
tasks.confirm_new_email_address(instance.id)
|
|
return instance
|
|
|
|
|
|
class ConfirmEmailSerializer(serializers.ModelSerializer):
|
|
"""Confirm user email serializer"""
|
|
|
|
class Meta:
|
|
"""Meta class"""
|
|
model = models.User
|
|
fields = (
|
|
'email',
|
|
)
|
|
|
|
def validate(self, attrs):
|
|
"""Override validate method"""
|
|
email_confirmed = self.instance.email_confirmed
|
|
if email_confirmed:
|
|
raise serializers.ValidationError()
|
|
return attrs
|
|
|
|
def update(self, instance, validated_data):
|
|
"""
|
|
Override update method
|
|
"""
|
|
# Send verification link on user email for change email address
|
|
if settings.USE_CELERY:
|
|
tasks.confirm_new_email_address.delay(instance.id)
|
|
else:
|
|
tasks.confirm_new_email_address(instance.id)
|
|
return instance
|
|
|
|
|
|
class RefreshTokenSerializer(SourceSerializerMixin):
|
|
"""Serializer for refresh token view"""
|
|
refresh_token = serializers.CharField(read_only=True)
|
|
access_token = serializers.CharField(read_only=True)
|
|
|
|
def get_request(self):
|
|
"""Return request"""
|
|
return self.context.get('request')
|
|
|
|
def validate(self, attrs):
|
|
"""Override validate method"""
|
|
|
|
cookie_refresh_token = self.get_request().COOKIES.get('refresh_token')
|
|
# Check if refresh_token in COOKIES
|
|
if not cookie_refresh_token:
|
|
raise utils_exceptions.NotValidRefreshTokenError()
|
|
|
|
refresh_token = GMRefreshToken(cookie_refresh_token)
|
|
refresh_token_qs = JWTRefreshToken.objects.valid() \
|
|
.by_jti(jti=refresh_token.payload.get('jti'))
|
|
# Check if the user has refresh token
|
|
if not refresh_token_qs.exists():
|
|
raise utils_exceptions.NotValidRefreshTokenError()
|
|
|
|
old_refresh_token = refresh_token_qs.first()
|
|
source = old_refresh_token.source
|
|
user = old_refresh_token.user
|
|
|
|
# Expire existing tokens
|
|
old_refresh_token.expire()
|
|
old_refresh_token.access_token.expire()
|
|
|
|
# Create new one for user
|
|
response = user.create_jwt_tokens(source=source)
|
|
|
|
return response
|