gault-millau/apps/account/models.py
2020-01-29 22:53:28 +03:00

536 lines
20 KiB
Python

"""Account models"""
from datetime import datetime
from django.contrib.postgres.search import TrigramSimilarity
from django.conf import settings
from django.contrib.auth.models import AbstractUser, UserManager as BaseUserManager
from django.core.mail import send_mail
from django.db import models
from django.template.loader import render_to_string, get_template
from django.utils import timezone
from django.utils.encoding import force_bytes
from django.utils.html import mark_safe
from django.utils.http import urlsafe_base64_encode
from django.utils.translation import ugettext_lazy as _
from rest_framework.authtoken.models import Token
from collections import Counter
from typing import List
from authorization.models import Application
from establishment.models import Establishment, EstablishmentSubType
from location.models import Country
from main.models import SiteSettings
from utils.models import GMTokenGenerator
from utils.models import ImageMixin, ProjectBaseMixin, PlatformMixin
from utils.tokens import GMRefreshToken
from phonenumber_field.modelfields import PhoneNumberField
class RoleQuerySet(models.QuerySet):
def annotate_role_name(self):
return self.annotate(role_name=models.Case(*self.model.role_condition_expressions(),
output_field=models.CharField()))
def annotate_role_counter(self):
return self.annotate(
role_counter=models.Count('userrole',
distinct=True,
filter=models.Q(userrole__state=UserRole.VALIDATED)))
class Role(ProjectBaseMixin):
"""Base Role model."""
STANDARD_USER = 1
COMMENTS_MODERATOR = 2
COUNTRY_ADMIN = 3
CONTENT_PAGE_MANAGER = 4
ESTABLISHMENT_MANAGER = 5
REVIEWER_MANGER = 6
RESTAURANT_REVIEWER = 7
SALES_MAN = 8
WINERY_REVIEWER = 9 # Establishments subtype "winery"
SELLER = 10
LIQUOR_REVIEWER = 11
PRODUCT_REVIEWER = 12
ROLE_CHOICES = (
(STANDARD_USER, _('Standard user')),
(COMMENTS_MODERATOR, _('Comments moderator')),
(COUNTRY_ADMIN, _('Country admin')),
(CONTENT_PAGE_MANAGER, _('Content page manager')),
(ESTABLISHMENT_MANAGER, _('Establishment manager')),
(REVIEWER_MANGER, _('Reviewer manager')),
(RESTAURANT_REVIEWER, _('Restaurant reviewer')),
(SALES_MAN, _('Sales man')),
(WINERY_REVIEWER, _('Winery reviewer')),
(SELLER, _('Seller')),
(LIQUOR_REVIEWER, _('Liquor reviewer')),
(PRODUCT_REVIEWER, _('Product reviewer')),
)
role = models.PositiveIntegerField(verbose_name=_('Role'), choices=ROLE_CHOICES,
null=False, blank=False)
country = models.ForeignKey(Country, verbose_name=_('Country'),
null=True, blank=True, on_delete=models.SET_NULL)
site = models.ForeignKey(SiteSettings, verbose_name=_('Site settings'),
null=True, blank=True, on_delete=models.SET_NULL)
establishment_subtype = models.ForeignKey(EstablishmentSubType,
verbose_name=_('Establishment subtype'),
null=True, blank=True, on_delete=models.SET_NULL)
navigation_bar_permission = models.ForeignKey('main.NavigationBarPermission',
blank=True, null=True,
on_delete=models.SET_NULL,
help_text='navigation bar item permission',
verbose_name=_('navigation bar permission'))
objects = RoleQuerySet.as_manager()
@classmethod
def role_types(cls):
roles = []
for role, display_name in dict(cls.ROLE_CHOICES).items():
roles.append({'role_name': role, 'role_counter': display_name, 'role': 0})
return roles
class UserManager(BaseUserManager):
"""Extended manager for User model."""
use_in_migrations = False
def make(self, email: str, password: str, newsletter: bool, username: str = '') -> object:
"""Register new user"""
obj = self.model(
username=username,
email=email.lower(),
newsletter=newsletter)
obj.set_password(password)
obj.save()
return obj
class UserQuerySet(models.QuerySet):
"""Extended queryset for User model."""
def with_base_related(self):
"""Return QuerySet with base related."""
return self.select_related('last_country', 'last_country__country')
def with_extend_related(self):
"""Return QuerySet with extend related."""
return self.with_base_related().prefetch_related('roles', 'subscriber')
def active(self, switcher=True):
"""Filter only active users."""
return self.filter(is_active=switcher)
def by_oauth2_access_token(self, token):
"""Find users by access token"""
return self.filter(oauth2_provider_accesstoken__token=token,
oauth2_provider_accesstoken__expires__gt=timezone.now())
def by_oauth2_refresh_token(self, token):
"""Find users by access token"""
return self.filter(oauth2_provider_refreshtoken__token=token,
oauth2_provider_refreshtoken__expires__gt=timezone.now())
def by_role(self, role):
"""Filter by role."""
return self.filter(userrole__role=role)
def by_roles(self, roles: list):
"""Filter by roles."""
return self.filter(userrole__role__in=roles)
def establishment_admin(self, establishment):
role = Role.objects.filter(role=Role.ESTABLISHMENT_MANAGER).first()
return self.by_role(role).filter(userrole__establishment=establishment)
def full_text_search(self, search_value: str):
return self.annotate(
username_similarity=models.Case(
models.When(
models.Q(username__isnull=False),
then=TrigramSimilarity('username', search_value.lower())
),
default=0,
output_field=models.FloatField()
),
first_name_similarity=models.Case(
models.When(
models.Q(first_name__isnull=False),
then=TrigramSimilarity('first_name', search_value.lower())
),
default=0,
output_field=models.FloatField()
),
last_name_similarity=models.Case(
models.When(
models.Q(last_name__isnull=False),
then=TrigramSimilarity('last_name', search_value.lower())
),
default=0,
output_field=models.FloatField()
),
email_similarity=models.Case(
models.When(
models.Q(email__isnull=False),
then=TrigramSimilarity('email', search_value.lower())
),
default=0,
output_field=models.FloatField()
),
phone_similarity=models.Case(
models.When(
models.Q(phone__isnull=False),
then=TrigramSimilarity('phone', search_value.lower())
),
default=0,
output_field=models.FloatField()
),
relevance=(
models.F('username_similarity') +
models.F('first_name_similarity') +
models.F('last_name_similarity') +
models.F('email_similarity') +
models.F('phone_similarity')
),
).filter(relevance__gte=0.1).order_by('-relevance')
def by_role_country_code(self, country_code: str):
"""Filter by role country code."""
return self.filter(userrole__role__country__code=country_code).distinct()
class User(AbstractUser):
"""Base user model."""
username = models.CharField(
_('username'),
max_length=150,
blank=True, null=True, default=None,
help_text=_('Required. 150 characters or fewer. Letters, digits and ./+/-/_ only.'),
error_messages={
'unique': _("A user with that username already exists."),
},
)
image_url = models.URLField(verbose_name=_('Image URL path'),
blank=True, null=True, default=None,
max_length=500)
cropped_image_url = models.URLField(verbose_name=_('Cropped image URL path'),
blank=True, null=True, default=None)
email = models.EmailField(_('email address'), unique=True,
null=True, default=None)
unconfirmed_email = models.EmailField(_('unconfirmed email'), blank=True, null=True, default=None)
email_confirmed = models.BooleanField(_('email status'), default=False)
newsletter = models.NullBooleanField(default=True)
old_id = models.IntegerField(null=True, blank=True, default=None)
locale = models.CharField(max_length=10, blank=True, default=None, null=True,
verbose_name=_('User last used locale'))
city = models.TextField(default=None, blank=True, null=True,
verbose_name=_('User last visited from city'))
last_ip = models.GenericIPAddressField(_('last IP address'), blank=True, null=True, default=None)
last_country = models.ForeignKey(
SiteSettings,
verbose_name=_('last site settings'),
blank=True,
null=True,
default=None,
on_delete=models.SET_NULL,
)
phone = PhoneNumberField(blank=True, null=True, default=None,
verbose_name=_('Phone'))
EMAIL_FIELD = 'email'
USERNAME_FIELD = 'email'
REQUIRED_FIELDS = ['username']
roles = models.ManyToManyField(
Role, verbose_name=_('Roles'), symmetrical=False,
through_fields=('user', 'role'), through='UserRole')
objects = UserManager.from_queryset(UserQuerySet)()
class Meta:
"""Meta class."""
verbose_name = _('User')
verbose_name_plural = _('Users')
def __str__(self):
"""String method."""
return "%s:%s" % (self.email, self.get_short_name())
def get_user_info(self):
"""Get information about user"""
return {
"username": self.username,
"first_name": self.first_name if self.first_name else None,
"last_name": self.last_name if self.last_name else None,
"email": self.email if self.email else None,
"newsletter": self.newsletter,
"is_active": self.is_active
}
def change_status(self, switcher: bool = False):
"""Method to set user status to active or inactive"""
self.is_active = switcher
self.save()
def create_jwt_tokens(self, source: int = None):
"""Create JWT tokens for user"""
token = GMRefreshToken.for_user(self, source)
return {
'access_token': str(token.access_token),
'refresh_token': str(token),
}
def expire_access_tokens(self):
"""Expire all access tokens"""
self.access_tokens.update(expires_at=timezone.now())
def expire_refresh_tokens(self):
"""Expire all refresh tokens"""
self.refresh_tokens.update(expires_at=timezone.now())
def confirm_email(self):
"""Method to confirm user email address"""
if self.unconfirmed_email is not None:
self.email = self.unconfirmed_email
self.unconfirmed_email = None
self.email_confirmed = True
self.save()
def approve(self):
"""Set user is_active status to True"""
self.is_active = True
self.save()
def get_body_email_message(self, subject: str, message: str, emails=None):
"""Prepare the body of the email message"""
return {
'subject': subject,
'message': str(message[0]),
'html_message': message[1],
'from_email': settings.EMAIL_HOST_USER,
'recipient_list': emails if emails else [self.email, ],
}
def send_email(self, subject: str, message: str, emails=None):
"""Send an email to reset user password"""
send_mail(**self.get_body_email_message(subject=subject,
message=message,
emails=emails))
@property
def confirm_email_token(self):
"""Make a token for finish signup."""
return GMTokenGenerator(purpose=GMTokenGenerator.CONFIRM_EMAIL).make_token(self)
@property
def change_email_token(self):
"""Make a token for change email."""
return GMTokenGenerator(purpose=GMTokenGenerator.CHANGE_EMAIL).make_token(self)
@property
def reset_password_token(self):
"""Make a token for finish signup."""
return GMTokenGenerator(purpose=GMTokenGenerator.RESET_PASSWORD).make_token(self)
@property
def get_user_uidb64(self):
"""Get base64 value for user by primary key identifier"""
return urlsafe_base64_encode(force_bytes(self.pk))
def base_template(self, country_code='www', username='', subject=''):
"""Base email template"""
socials = SiteSettings.objects.by_country_code(country_code).first()
return {
'title': subject,
'domain_uri': settings.DOMAIN_URI,
'uidb64': self.get_user_uidb64,
'site_name': settings.SITE_NAME,
'year': datetime.now().year,
'twitter_page_url': socials.twitter_page_url if socials else '#',
'instagram_page_url': socials.instagram_page_url if socials else '#',
'facebook_page_url': socials.facebook_page_url if socials else '#',
'send_to': username,
}
@property
def image_tag(self):
return mark_safe(f'<img src="{self.image_url}" />')
@property
def cropped_image_tag(self):
return mark_safe(f'<img src="{self.cropped_image_url}" />')
def reset_password_template(self, country_code, username, subject):
"""Get reset password template"""
context = {'token': self.reset_password_token,
'country_code': country_code}
context.update(self.base_template(country_code, username, subject))
return render_to_string(
template_name=settings.RESETTING_TOKEN_TEMPLATE,
context=context), get_template(settings.RESETTING_TOKEN_TEMPLATE).render(context)
def notify_password_changed_template(self, country_code, username, subject):
"""Get notification email template"""
context = {'contry_code': country_code}
context.update(self.base_template(country_code, username, subject))
return render_to_string(
template_name=settings.NOTIFICATION_PASSWORD_TEMPLATE,
context=context,
), get_template(settings.NOTIFICATION_PASSWORD_TEMPLATE).render(context)
def confirm_email_template(self, country_code, username, subject):
"""Get confirm email template"""
context = {'token': self.confirm_email_token,
'country_code': country_code}
context.update(self.base_template(country_code, username, subject))
return render_to_string(
template_name=settings.CONFIRM_EMAIL_TEMPLATE,
context=context), get_template(settings.CONFIRM_EMAIL_TEMPLATE).render(context)
def change_email_template(self, country_code, username, subject):
"""Get change email template"""
context = {'token': self.change_email_token,
'country_code': country_code}
context.update(self.base_template(country_code, username, subject))
return render_to_string(
template_name=settings.CHANGE_EMAIL_TEMPLATE,
context=context), get_template(settings.CHANGE_EMAIL_TEMPLATE).render(context)
@property
def favorite_establishment_ids(self):
"""Return establishment IDs that in favorites for current user."""
return self.favorites.by_content_type(app_label='establishment',
model='establishment') \
.values_list('object_id', flat=True)
@property
def favorite_recipe_ids(self):
"""Return recipe IDs that in favorites for current user."""
return self.favorites.by_content_type(app_label='recipe',
model='recipe') \
.values_list('object_id', flat=True)
@property
def favorite_news_ids(self):
"""Return news IDs that in favorites for current user."""
return self.favorites.by_content_type(
app_label='news',
model='news',
).values_list('object_id', flat=True)
@property
def favorite_product_ids(self):
"""Return news IDs that in favorites for current user."""
return self.favorites.by_content_type(
app_label='product',
model='product',
).values_list('object_id', flat=True)
@property
def subscription_types(self):
result = []
for subscription in self.subscriber.all():
for item in subscription.active_subscriptions:
result.append(item.id)
return set(result)
def set_roles(self, ids: List[int]):
"""
Set user roles
:param ids: list of role ids
:return: bool
"""
self.roles.set(Role.objects.filter(id__in=ids))
return self
class UserRoleQueryset(models.QuerySet):
"""QuerySet for model UserRole."""
def _role_counter(self, country_code: str = None) -> dict:
additional_filters = {
'state': self.model.VALIDATED,
}
if country_code:
additional_filters.update({'role__site__country__code': country_code})
user_roles = (
self.filter(**additional_filters)
.distinct('user_id', 'role__role')
.values('user_id', 'role__role')
)
return dict(Counter([i['role__role'] for i in user_roles]))
def country_admin_role(self):
return self.filter(role__role=self.model.role.field.target_field.model.COUNTRY_ADMIN,
state=self.model.VALIDATED)
def aggregate_role_counter(self, country_code: str = None) -> list:
_role_choices = dict(Role.ROLE_CHOICES)
role_counter = []
# fill existed roles
for role, count in self._role_counter(country_code=country_code).items():
role_counter.append({
'role': role,
'role_name': _role_choices[role],
'count': count,
})
# check by roles
for role, role_name in _role_choices.items():
if role not in [i['role'] for i in role_counter]:
role_counter.append({
'role': role,
'role_name': _role_choices[role],
'count': 0,
})
return role_counter
class UserRole(ProjectBaseMixin):
"""UserRole model."""
VALIDATED = 'validated'
PENDING = 'pending'
CANCELLED = 'cancelled'
REJECTED = 'rejected'
STATE_CHOICES = (
(VALIDATED, _('validated')),
(PENDING, _('pending')),
(CANCELLED, _('cancelled')),
(REJECTED, _('rejected'))
)
user = models.ForeignKey(
'account.User', verbose_name=_('User'), on_delete=models.CASCADE)
role = models.ForeignKey(
Role, verbose_name=_('Role'), on_delete=models.SET_NULL, null=True)
establishment = models.ForeignKey(
Establishment, verbose_name=_('Establishment'),
on_delete=models.SET_NULL, null=True, blank=True)
state = models.CharField(
_('state'), choices=STATE_CHOICES, max_length=10, default=PENDING)
requester = models.ForeignKey('account.User', on_delete=models.SET_NULL,
blank=True, null=True, default=None,
related_name='roles_requested',
help_text='A user (REQUESTER) who requests a '
'role change for a USER')
objects = UserRoleQueryset.as_manager()
class Meta:
unique_together = ['user', 'role', 'establishment', 'state']
class OldRole(models.Model):
new_role = models.CharField(verbose_name=_('New role'), max_length=512)
old_role = models.CharField(verbose_name=_('Old role'), max_length=512, null=True)
class Meta:
unique_together = ('new_role', 'old_role')