gault-millau/apps/account/models.py

192 lines
6.7 KiB
Python

from typing import Union
from django.conf import settings
from django.contrib.auth.models import AbstractUser, UserManager as BaseUserManager
from django.contrib.auth.tokens import default_token_generator
from django.core.mail import send_mail
from django.db import models
from django.template.loader import render_to_string
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _
from rest_framework.authtoken.models import Token
from authorization.models import Application
from utils.models import ImageMixin, ProjectBaseMixin, PlatformMixin
class UserManager(BaseUserManager):
"""Extended manager for User model."""
use_in_migrations = False
def make(self, username: str, email: str,
password: str, newsletter: bool) -> object:
"""Register new user"""
obj = self.model(
username=username,
email=email,
newsletter=newsletter
)
obj.set_password(password)
obj.save()
return obj
class UserQuerySet(models.QuerySet):
"""Extended queryset for User model."""
def active(self, switcher=True):
"""Filter only active users."""
return self.filter(is_active=switcher)
class User(ImageMixin, AbstractUser):
"""Base user model."""
email = models.EmailField(_('email address'), blank=True,
null=True, default=None)
newsletter = models.NullBooleanField(default=True)
EMAIL_FIELD = 'email'
USERNAME_FIELD = 'username'
REQUIRED_FIELDS = ['email']
objects = UserManager.from_queryset(UserQuerySet)()
class Meta:
"""Meta class."""
verbose_name = _('User')
verbose_name_plural = _('Users')
def __str__(self):
"""String method."""
return "%s:%s" % (self.email, self.get_short_name())
def change_status(self, switcher: bool = False):
"""Method to set user status to active or inactive"""
self.is_active = switcher
self.save()
def remove_token(self):
Token.objects.filter(user=self).delete()
def remove_access_tokens(self, source: Union[int, Union[tuple, list]]):
"""Method to remove user access tokens"""
source = source if isinstance(source, list) else [source, ]
self.oauth2_provider_accesstoken.filter(application__source__in=source)\
.delete()
def revoke_refresh_tokens(self, source: Union[int, tuple, list]):
"""Method to remove user refresh tokens"""
source = source if isinstance(source, list) else [source, ]
refresh_tokens = self.oauth2_provider_refreshtoken.filter(
application__source__in=source)
if refresh_tokens.exists():
for token in refresh_tokens:
token.revoke()
def get_body_email_message(self, subject: str, message: str):
"""Prepare the body of the email message"""
return {
'subject': subject,
'message': str(message),
'from_email': settings.EMAIL_HOST_USER,
'recipient_list': [self.email, ]
}
def send_email(self, subject: str, message: str):
"""Send an email to reset user password"""
# todo: move to celery as celery task
send_mail(**self.get_body_email_message(subject=subject,
message=message))
class ResetPasswordTokenQuerySet(models.QuerySet):
"""Reset password token query set"""
def expired(self):
"""Show only expired"""
return self.filter(expiry_datetime__gt=timezone.now())
def valid(self):
"""Show only valid"""
return self.filter(expiry_datetime__lt=timezone.now())
def by_user(self, user):
"""Show obj by user"""
return self.filter(user=user)
class ResetPasswordToken(PlatformMixin, ProjectBaseMixin):
"""Reset password model"""
RESETTING_TOKEN_TEMPLATE_NAME = 'account/password_reset_email.html'
user = models.ForeignKey(User,
related_name='password_reset_tokens',
on_delete=models.CASCADE,
verbose_name=_('The User which is associated to '
'this password reset token'))
# Key field, though it is not the primary key of the model
key = models.CharField(max_length=255,
verbose_name=_('Key'))
ip_address = models.GenericIPAddressField(default='',
blank=True, null=True,
verbose_name=_('The IP address of this session'))
expiry_datetime = models.DateTimeField(blank=True, null=True,
verbose_name=_('Expiration datetime'))
objects = ResetPasswordTokenQuerySet.as_manager()
class Meta:
verbose_name = _("Password Reset Token")
verbose_name_plural = _("Password Reset Tokens")
def __str__(self):
return "Password reset token for user {user}".format(user=self.user)
@property
def get_resetting_token_expiration(self):
"""Get resetting token expiration"""
return settings.RESETTING_TOKEN_EXPIRATION
@property
def is_valid(self):
"""Check if valid token or not"""
return timezone.now() > self.expiry_datetime
def generate_key(self):
"""generates a pseudo random code"""
return default_token_generator.make_token(self.user)
def save(self, *args, **kwargs):
"""Override save method"""
if not self.expiry_datetime:
self.expiry_datetime = (
timezone.now() +
timezone.timedelta(hours=self.get_resetting_token_expiration)
)
if not self.key:
self.key = self.generate_key()
return super(ResetPasswordToken, self).save(*args, **kwargs)
def get_reset_password_template(self):
"""Get reset password template"""
return render_to_string(
template_name=self.RESETTING_TOKEN_TEMPLATE_NAME,
context={'token': self.key,
'domain_uri': settings.DOMAIN_URI})
def send_reset_password_request(self):
"""Method to reset user password"""
subject = _('Password resetting')
# Send an email with url for resetting a password
self.user.send_email(subject=subject,
message=self.get_reset_password_template())
def confirm_reset_password_request(self):
"""Method to confirm reset user passwrod request"""
# Remove access token and revoke refresh tokens
self.user.remove_access_tokens(source=[Application.MOBILE,
Application.WEB])