gault-millau/apps/account/models.py

281 lines
9.7 KiB
Python

"""Account models"""
from django.conf import settings
from django.contrib.auth.models import AbstractUser, UserManager as BaseUserManager
from django.contrib.auth.tokens import default_token_generator as password_token_generator
from django.core.mail import send_mail
from django.db import models
from django.template.loader import render_to_string
from django.utils import timezone
from django.utils.encoding import force_bytes
from django.utils.http import urlsafe_base64_encode
from django.utils.translation import ugettext_lazy as _
from easy_thumbnails.fields import ThumbnailerImageField
from rest_framework.authtoken.models import Token
from authorization.models import Application
from utils.methods import image_path
from utils.models import GMTokenGenerator
from utils.models import ImageMixin, ProjectBaseMixin, PlatformMixin
from utils.tokens import GMRefreshToken
class UserManager(BaseUserManager):
"""Extended manager for User model."""
use_in_migrations = False
def make(self, username: str, email: str, password: str,
newsletter: bool, is_active: bool = False) -> object:
"""Register new user"""
obj = self.model(
username=username,
email=email,
newsletter=newsletter,
is_active=is_active
)
obj.set_password(password)
obj.save()
return obj
class UserQuerySet(models.QuerySet):
"""Extended queryset for User model."""
def active(self, switcher=True):
"""Filter only active users."""
return self.filter(is_active=switcher)
def by_oauth2_access_token(self, token):
"""Find users by access token"""
return self.filter(oauth2_provider_accesstoken__token=token,
oauth2_provider_accesstoken__expires__gt=timezone.now())
def by_oauth2_refresh_token(self, token):
"""Find users by access token"""
return self.filter(oauth2_provider_refreshtoken__token=token,
oauth2_provider_refreshtoken__expires__gt=timezone.now())
def by_username(self, username: str):
"""Filter users by username."""
return self.filter(username=username)
class User(ImageMixin, AbstractUser):
"""Base user model."""
cropped_image = ThumbnailerImageField(upload_to=image_path,
blank=True, null=True, default=None,
verbose_name=_('Crop image'))
email = models.EmailField(_('email address'), blank=True,
null=True, default=None)
email_confirmed = models.BooleanField(_('email status'), default=False)
newsletter = models.NullBooleanField(default=True)
EMAIL_FIELD = 'email'
USERNAME_FIELD = 'username'
REQUIRED_FIELDS = ['email']
objects = UserManager.from_queryset(UserQuerySet)()
class Meta:
"""Meta class."""
verbose_name = _('User')
verbose_name_plural = _('Users')
def __str__(self):
"""String method."""
return "%s:%s" % (self.email, self.get_short_name())
def get_user_info(self):
"""Get information about user"""
return {
"username": self.username,
"first_name": self.first_name if self.first_name else None,
"last_name": self.last_name if self.last_name else None,
"email": self.email if self.email else None,
"newsletter": self.newsletter,
"is_active": self.is_active
}
def change_status(self, switcher: bool = False):
"""Method to set user status to active or inactive"""
self.is_active = switcher
self.save()
def create_jwt_tokens(self, source: int):
"""Create JWT tokens for user"""
token = GMRefreshToken.for_user_by_source(self, source)
return {
'access_token': str(token.access_token),
'refresh_token': str(token),
}
def expire_access_tokens(self):
"""Expire all access tokens"""
self.access_tokens.update(expires_at=timezone.now())
def expire_refresh_tokens(self):
"""Expire all refresh tokens"""
self.refresh_tokens.update(expires_at=timezone.now())
def confirm_email(self):
"""Method to confirm user email address"""
self.email_confirmed = True
self.save()
def approve(self):
"""Set user is_active status to True"""
self.is_active = True
self.save()
def get_body_email_message(self, subject: str, message: str):
"""Prepare the body of the email message"""
return {
'subject': subject,
'message': str(message),
'from_email': settings.EMAIL_HOST_USER,
'recipient_list': [self.email, ]
}
def send_email(self, subject: str, message: str):
"""Send an email to reset user password"""
send_mail(**self.get_body_email_message(subject=subject,
message=message))
@property
def confirm_email_token(self):
"""Make a token for finish signup."""
return GMTokenGenerator(purpose=GMTokenGenerator.CONFIRM_EMAIL).make_token(self)
@property
def change_email_token(self):
"""Make a token for change email."""
return GMTokenGenerator(purpose=GMTokenGenerator.CHANGE_EMAIL).make_token(self)
@property
def reset_password_token(self):
"""Make a token for finish signup."""
return GMTokenGenerator(purpose=GMTokenGenerator.RESET_PASSWORD).make_token(self)
@property
def get_user_uidb64(self):
"""Get base64 value for user by primary key identifier"""
return urlsafe_base64_encode(force_bytes(self.pk))
@property
def confirm_email_template(self):
"""Get confirm email template"""
return render_to_string(
template_name=settings.CONFIRM_EMAIL_TEMPLATE,
context={'token': self.confirm_email_token,
'uidb64': self.get_user_uidb64,
'domain_uri': settings.DOMAIN_URI,
'site_name': settings.SITE_NAME})
@property
def change_email_template(self):
"""Get change email template"""
return render_to_string(
template_name=settings.CHANGE_EMAIL_TEMPLATE,
context={'token': self.change_email_token,
'uidb64': self.get_user_uidb64,
'domain_uri': settings.DOMAIN_URI,
'site_name': settings.SITE_NAME})
@property
def fullname(self):
fullname = []
if self.first_name: fullname.append(self.first_name)
if self.last_name: fullname.append(self.last_name)
return ' '.join(fullname)
class ResetPasswordTokenQuerySet(models.QuerySet):
"""Reset password token query set"""
def expired(self):
"""Show only expired"""
return self.filter(expiry_datetime__lt=timezone.now())
def valid(self):
"""Show only valid"""
return self.filter(expiry_datetime__gt=timezone.now())
def by_user(self, user):
"""Show obj by user"""
return self.filter(user=user)
class ResetPasswordToken(PlatformMixin, ProjectBaseMixin):
"""Reset password model"""
user = models.ForeignKey(User,
related_name='password_reset_tokens',
on_delete=models.CASCADE,
verbose_name=_('The User which is associated to '
'this password reset token'))
# Key field, though it is not the primary key of the model
key = models.CharField(max_length=255,
verbose_name=_('Key'))
ip_address = models.GenericIPAddressField(default='',
blank=True, null=True,
verbose_name=_('The IP address of this session'))
expiry_datetime = models.DateTimeField(blank=True, null=True,
verbose_name=_('Expiration datetime'))
objects = ResetPasswordTokenQuerySet.as_manager()
class Meta:
verbose_name = _("Password Reset Token")
verbose_name_plural = _("Password Reset Tokens")
def __str__(self):
return "Password reset token for user {user}".format(user=self.user)
def save(self, *args, **kwargs):
"""Override save method"""
if not self.expiry_datetime:
self.expiry_datetime = (
timezone.now() +
timezone.timedelta(hours=self.get_resetting_token_expiration)
)
if not self.key:
self.key = self.generate_token
return super(ResetPasswordToken, self).save(*args, **kwargs)
@property
def get_resetting_token_expiration(self):
"""Get resetting token expiration"""
return settings.RESETTING_TOKEN_EXPIRATION
@property
def is_valid(self):
"""Check if valid token or not"""
return timezone.now() > self.expiry_datetime
@property
def generate_token(self):
"""Generates a pseudo random code"""
return password_token_generator.make_token(self.user)
@property
def reset_password_template(self):
"""Get reset password template"""
return render_to_string(
template_name=settings.RESETTING_TOKEN_TEMPLATE,
context={'token': self.key,
'uidb64': self.user.get_user_uidb64,
'domain_uri': settings.DOMAIN_URI,
'site_name': settings.SITE_NAME})
@staticmethod
def token_is_valid(user, token):
"""Check if token is valid"""
return password_token_generator.check_token(user, token)
def overdue(self):
"""Overdue instance"""
self.expiry_datetime = timezone.now()
self.save()