gault-millau/apps/account/models.py

234 lines
8.0 KiB
Python

"""Account models"""
from typing import Union
from django.conf import settings
from django.contrib.auth.models import AbstractUser, UserManager as BaseUserManager
from django.core.mail import send_mail
from django.db import models
from django.template.loader import render_to_string
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _
from rest_framework.authtoken.models import Token
from django.utils.encoding import force_bytes, force_text
from django.utils.http import urlsafe_base64_encode, urlsafe_base64_decode
from utils.models import gm_token_generator
from authorization.models import Application
from utils.models import ImageMixin, ProjectBaseMixin, PlatformMixin
class UserManager(BaseUserManager):
"""Extended manager for User model."""
use_in_migrations = False
def make(self, username: str, email: str,
password: str, newsletter: bool) -> object:
"""Register new user"""
obj = self.model(
username=username,
email=email,
newsletter=newsletter
)
obj.set_password(password)
obj.save()
return obj
class UserQuerySet(models.QuerySet):
"""Extended queryset for User model."""
def active(self, switcher=True):
"""Filter only active users."""
return self.filter(is_active=switcher)
def by_oauth2_access_token(self, token):
"""Find user by access token"""
return self.filter(oauth2_provider_accesstoken__token=token,
oauth2_provider_accesstoken__expires__gt=timezone.now())
def by_oauth2_refresh_token(self, token):
"""Find user by access token"""
return self.filter(oauth2_provider_refreshtoken__token=token,
oauth2_provider_refreshtoken__expires__gt=timezone.now())
class User(ImageMixin, AbstractUser):
"""Base user model."""
email = models.EmailField(_('email address'), blank=True,
null=True, default=None)
newsletter = models.NullBooleanField(default=True)
EMAIL_FIELD = 'email'
USERNAME_FIELD = 'username'
REQUIRED_FIELDS = ['email']
objects = UserManager.from_queryset(UserQuerySet)()
class Meta:
"""Meta class."""
verbose_name = _('User')
verbose_name_plural = _('Users')
def __str__(self):
"""String method."""
return "%s:%s" % (self.email, self.get_short_name())
def get_user_info(self):
"""Get information about user"""
return {
"username": self.username,
"first_name": self.first_name if self.first_name else None,
"last_name": self.last_name if self.last_name else None,
"email": self.email if self.email else None,
"newsletter": self.newsletter,
"is_active": self.is_active
}
def change_status(self, switcher: bool = False):
"""Method to set user status to active or inactive"""
self.is_active = switcher
self.save()
def remove_token(self):
Token.objects.filter(user=self).delete()
def remove_access_tokens(self, source: Union[int, Union[tuple, list]]):
"""Method to remove user access tokens"""
source = source if isinstance(source, list) else [source, ]
self.oauth2_provider_accesstoken.filter(application__source__in=source)\
.delete()
def revoke_refresh_tokens(self, source: Union[int, tuple, list]):
"""Method to remove user refresh tokens"""
source = source if isinstance(source, list) else [source, ]
refresh_tokens = self.oauth2_provider_refreshtoken.filter(
application__source__in=source,
access_token__isnull=False
)
if refresh_tokens.exists():
for token in refresh_tokens:
token.revoke()
@property
def get_signup_finish_token(self):
"""Make a token for finish signup."""
return gm_token_generator.make_token(self)
@property
def get_user_uid(self):
"""Get base64 value for user by primary key identifier"""
return urlsafe_base64_encode(force_bytes(self.pk))
def get_confirm_signup_template(self):
"""Get confirm signup email template"""
return render_to_string(
template_name=settings.CONFIRM_SIGNUP_TEMPLATE,
context={'token': self.get_signup_finish_token,
'uid': self.get_user_uid,
'domain_uri': settings.DOMAIN_URI})
def get_body_email_message(self, subject: str, message: str):
"""Prepare the body of the email message"""
return {
'subject': subject,
'message': str(message),
'from_email': settings.EMAIL_HOST_USER,
'recipient_list': [self.email, ]
}
def send_email(self, subject: str, message: str):
"""Send an email to reset user password"""
send_mail(**self.get_body_email_message(subject=subject,
message=message))
class ResetPasswordTokenQuerySet(models.QuerySet):
"""Reset password token query set"""
def expired(self):
"""Show only expired"""
return self.filter(expiry_datetime__lt=timezone.now())
def valid(self):
"""Show only valid"""
return self.filter(expiry_datetime__gt=timezone.now())
def by_user(self, user):
"""Show obj by user"""
return self.filter(user=user)
class ResetPasswordToken(PlatformMixin, ProjectBaseMixin):
"""Reset password model"""
user = models.ForeignKey(User,
related_name='password_reset_tokens',
on_delete=models.CASCADE,
verbose_name=_('The User which is associated to '
'this password reset token'))
# Key field, though it is not the primary key of the model
key = models.CharField(max_length=255,
verbose_name=_('Key'))
ip_address = models.GenericIPAddressField(default='',
blank=True, null=True,
verbose_name=_('The IP address of this session'))
expiry_datetime = models.DateTimeField(blank=True, null=True,
verbose_name=_('Expiration datetime'))
objects = ResetPasswordTokenQuerySet.as_manager()
class Meta:
verbose_name = _("Password Reset Token")
verbose_name_plural = _("Password Reset Tokens")
def __str__(self):
return "Password reset token for user {user}".format(user=self.user)
@property
def get_resetting_token_expiration(self):
"""Get resetting token expiration"""
return settings.RESETTING_TOKEN_EXPIRATION
@property
def is_valid(self):
"""Check if valid token or not"""
return timezone.now() > self.expiry_datetime
@property
def generate_token(self):
"""Generates a pseudo random code"""
return gm_token_generator.make_token(self.user)
@staticmethod
def token_is_valid(user, token):
"""Check if token is valid"""
return gm_token_generator.check_token(user, token)
def overdue(self):
"""Overdue instance"""
self.expiry_datetime = timezone.now()
self.save()
def save(self, *args, **kwargs):
"""Override save method"""
if not self.expiry_datetime:
self.expiry_datetime = (
timezone.now() +
timezone.timedelta(hours=self.get_resetting_token_expiration)
)
if not self.key:
self.key = self.generate_token
return super(ResetPasswordToken, self).save(*args, **kwargs)
def get_reset_password_template(self):
"""Get reset password template"""
return render_to_string(
template_name=settings.RESETTING_TOKEN_TEMPLATE_NAME,
context={'token': self.key,
'uid': self.user.get_user_uid,
'domain_uri': settings.DOMAIN_URI})