591 lines
22 KiB
Python
591 lines
22 KiB
Python
"""Account models"""
|
|
from collections import Counter
|
|
from datetime import datetime
|
|
from typing import List
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import AbstractUser, UserManager as BaseUserManager
|
|
from django.contrib.postgres.search import TrigramSimilarity
|
|
from django.core.mail import send_mail
|
|
from django.db import models
|
|
from django.template.loader import render_to_string, get_template
|
|
from django.utils import timezone
|
|
from django.utils.encoding import force_bytes
|
|
from django.utils.html import mark_safe
|
|
from django.utils.http import urlsafe_base64_encode
|
|
from django.utils.translation import ugettext_lazy as _
|
|
from phonenumber_field.modelfields import PhoneNumberField
|
|
from rest_framework.authtoken.models import Token
|
|
|
|
from authorization.models import Application
|
|
from establishment.models import Establishment, EstablishmentSubType
|
|
from location.models import Country
|
|
from main.models import SiteSettings
|
|
from utils.models import GMTokenGenerator
|
|
from utils.models import ImageMixin, ProjectBaseMixin, PlatformMixin
|
|
from utils.tokens import GMRefreshToken
|
|
|
|
|
|
class RoleQuerySet(models.QuerySet):
|
|
|
|
def annotate_role_name(self):
|
|
return self.annotate(role_name=models.Case(*self.model.role_condition_expressions(),
|
|
output_field=models.CharField()))
|
|
|
|
def annotate_role_counter(self):
|
|
return self.annotate(
|
|
role_counter=models.Count('userrole',
|
|
distinct=True,
|
|
filter=models.Q(userrole__state=UserRole.VALIDATED)))
|
|
|
|
|
|
class Role(ProjectBaseMixin):
|
|
"""Base Role model."""
|
|
STANDARD_USER = 1
|
|
MODERATOR = 2
|
|
COUNTRY_ADMIN = 3
|
|
CONTENT_PAGE_MANAGER = 4
|
|
ESTABLISHMENT_MANAGER = 5
|
|
REVIEW_MANAGER = 6
|
|
RESTAURANT_INSPECTOR = 7
|
|
SALES_MAN = 8
|
|
WINERY_WINE_INSPECTOR = 9
|
|
SELLER = 10
|
|
DISTILLERY_LIQUOR_INSPECTOR = 11
|
|
PRODUCER_FOOD_INSPECTOR = 12
|
|
ESTABLISHMENT_ADMINISTRATOR = 13
|
|
ARTISAN_INSPECTOR = 14
|
|
|
|
ROLE_CHOICES = (
|
|
(STANDARD_USER, _('Standard user')),
|
|
(MODERATOR, _('Moderator')),
|
|
(COUNTRY_ADMIN, _('Country admin')),
|
|
(CONTENT_PAGE_MANAGER, _('Content page manager')),
|
|
(ESTABLISHMENT_MANAGER, _('Establishment manager')),
|
|
(REVIEW_MANAGER, _('Review manager')),
|
|
(RESTAURANT_INSPECTOR, _('Restaurant inspector')),
|
|
(SALES_MAN, _('Sales man')),
|
|
(WINERY_WINE_INSPECTOR, _('Winery and wine inspector')),
|
|
(SELLER, _('Seller')),
|
|
(DISTILLERY_LIQUOR_INSPECTOR, _('Distillery & Liquor inspector')),
|
|
(PRODUCER_FOOD_INSPECTOR, _('Producer food inspector')),
|
|
(ESTABLISHMENT_ADMINISTRATOR, _('Establishment administrator')),
|
|
(ARTISAN_INSPECTOR, _('Artisan inspector')),
|
|
)
|
|
|
|
role = models.PositiveIntegerField(verbose_name=_('Role'), choices=ROLE_CHOICES,
|
|
null=False, blank=False)
|
|
country = models.ForeignKey(Country, verbose_name=_('Country'),
|
|
null=True, blank=True, on_delete=models.SET_NULL)
|
|
site = models.ForeignKey(SiteSettings, verbose_name=_('Site settings'),
|
|
null=True, blank=True, on_delete=models.SET_NULL)
|
|
establishment_subtype = models.ForeignKey(EstablishmentSubType,
|
|
verbose_name=_('Establishment subtype'),
|
|
null=True, blank=True, on_delete=models.SET_NULL)
|
|
navigation_bar_permission = models.ForeignKey('main.NavigationBarPermission',
|
|
blank=True, null=True,
|
|
on_delete=models.SET_NULL,
|
|
help_text='navigation bar item permission',
|
|
verbose_name=_('navigation bar permission'))
|
|
|
|
objects = RoleQuerySet.as_manager()
|
|
|
|
@classmethod
|
|
def role_types(cls):
|
|
roles = []
|
|
for role, display_name in dict(cls.ROLE_CHOICES).items():
|
|
roles.append({'role_name': role, 'role_counter': display_name, 'role': 0})
|
|
return roles
|
|
|
|
|
|
class UserManager(BaseUserManager):
|
|
"""Extended manager for User model."""
|
|
|
|
use_in_migrations = False
|
|
|
|
def make(self, email: str, password: str, newsletter: bool, username: str = '') -> object:
|
|
"""Register new user"""
|
|
obj = self.model(
|
|
username=username,
|
|
email=email.lower(),
|
|
newsletter=newsletter)
|
|
obj.set_password(password)
|
|
obj.save()
|
|
return obj
|
|
|
|
|
|
class UserQuerySet(models.QuerySet):
|
|
"""Extended queryset for User model."""
|
|
|
|
def with_base_related(self):
|
|
"""Return QuerySet with base related."""
|
|
return self.select_related('last_country', 'last_country__country')
|
|
|
|
def with_extend_related(self):
|
|
"""Return QuerySet with extend related."""
|
|
return self.with_base_related().prefetch_related('roles', 'subscriber')
|
|
|
|
def active(self, switcher=True):
|
|
"""Filter only active users."""
|
|
return self.filter(is_active=switcher)
|
|
|
|
def by_oauth2_access_token(self, token):
|
|
"""Find users by access token"""
|
|
return self.filter(oauth2_provider_accesstoken__token=token,
|
|
oauth2_provider_accesstoken__expires__gt=timezone.now())
|
|
|
|
def by_oauth2_refresh_token(self, token):
|
|
"""Find users by access token"""
|
|
return self.filter(oauth2_provider_refreshtoken__token=token,
|
|
oauth2_provider_refreshtoken__expires__gt=timezone.now())
|
|
|
|
def by_role(self, role):
|
|
"""Filter by role."""
|
|
return self.filter(userrole__role=role)
|
|
|
|
def by_roles(self, roles: list):
|
|
"""Filter by roles."""
|
|
return self.filter(userrole__role__in=roles)
|
|
|
|
def establishment_admin(self, establishment):
|
|
role = Role.objects.filter(role=Role.ESTABLISHMENT_MANAGER).first()
|
|
return self.by_role(role).filter(userrole__establishment=establishment)
|
|
|
|
def full_text_search(self, search_value: str):
|
|
return self.annotate(
|
|
username_similarity=models.Case(
|
|
models.When(
|
|
models.Q(username__isnull=False),
|
|
then=TrigramSimilarity('username', search_value.lower())
|
|
),
|
|
default=0,
|
|
output_field=models.FloatField()
|
|
),
|
|
first_name_similarity=models.Case(
|
|
models.When(
|
|
models.Q(first_name__isnull=False),
|
|
then=TrigramSimilarity('first_name', search_value.lower())
|
|
),
|
|
default=0,
|
|
output_field=models.FloatField()
|
|
),
|
|
last_name_similarity=models.Case(
|
|
models.When(
|
|
models.Q(last_name__isnull=False),
|
|
then=TrigramSimilarity('last_name', search_value.lower())
|
|
),
|
|
default=0,
|
|
output_field=models.FloatField()
|
|
),
|
|
email_similarity=models.Case(
|
|
models.When(
|
|
models.Q(email__isnull=False),
|
|
then=TrigramSimilarity('email', search_value.lower())
|
|
),
|
|
default=0,
|
|
output_field=models.FloatField()
|
|
),
|
|
phone_similarity=models.Case(
|
|
models.When(
|
|
models.Q(phone__isnull=False),
|
|
then=TrigramSimilarity('phone', search_value.lower())
|
|
),
|
|
default=0,
|
|
output_field=models.FloatField()
|
|
),
|
|
relevance=(
|
|
models.F('username_similarity') +
|
|
models.F('first_name_similarity') +
|
|
models.F('last_name_similarity') +
|
|
models.F('email_similarity') +
|
|
models.F('phone_similarity')
|
|
),
|
|
).filter(relevance__gte=0.1).order_by('-relevance')
|
|
|
|
def by_role_country_code(self, country_code: str):
|
|
"""Filter by role country code."""
|
|
return self.filter(userrole__role__country__code=country_code).distinct()
|
|
|
|
|
|
class User(AbstractUser):
|
|
"""Base user model."""
|
|
username = models.CharField(
|
|
_('username'),
|
|
max_length=150,
|
|
blank=True, null=True, default=None,
|
|
help_text=_('Required. 150 characters or fewer. Letters, digits and ./+/-/_ only.'),
|
|
error_messages={
|
|
'unique': _("A user with that username already exists."),
|
|
},
|
|
)
|
|
image_url = models.URLField(verbose_name=_('Image URL path'),
|
|
blank=True, null=True, default=None,
|
|
max_length=500)
|
|
cropped_image_url = models.URLField(verbose_name=_('Cropped image URL path'),
|
|
blank=True, null=True, default=None)
|
|
email = models.EmailField(_('email address'), unique=True,
|
|
null=True, default=None)
|
|
unconfirmed_email = models.EmailField(_('unconfirmed email'), blank=True, null=True, default=None)
|
|
email_confirmed = models.BooleanField(_('email status'), default=False)
|
|
newsletter = models.NullBooleanField(default=True)
|
|
old_id = models.IntegerField(null=True, blank=True, default=None)
|
|
locale = models.CharField(max_length=10, blank=True, default=None, null=True,
|
|
verbose_name=_('User last used locale'))
|
|
city = models.TextField(default=None, blank=True, null=True,
|
|
verbose_name=_('User last visited from city'))
|
|
last_ip = models.GenericIPAddressField(_('last IP address'), blank=True, null=True, default=None)
|
|
last_country = models.ForeignKey(
|
|
SiteSettings,
|
|
verbose_name=_('last site settings'),
|
|
blank=True,
|
|
null=True,
|
|
default=None,
|
|
on_delete=models.SET_NULL,
|
|
)
|
|
phone = PhoneNumberField(blank=True, null=True, default=None,
|
|
verbose_name=_('Phone'))
|
|
|
|
EMAIL_FIELD = 'email'
|
|
USERNAME_FIELD = 'email'
|
|
REQUIRED_FIELDS = ['username']
|
|
|
|
roles = models.ManyToManyField(
|
|
Role, verbose_name=_('Roles'), symmetrical=False,
|
|
through_fields=('user', 'role'), through='UserRole')
|
|
objects = UserManager.from_queryset(UserQuerySet)()
|
|
|
|
class Meta:
|
|
"""Meta class."""
|
|
verbose_name = _('User')
|
|
verbose_name_plural = _('Users')
|
|
|
|
def __str__(self):
|
|
"""String method."""
|
|
return "%s:%s" % (self.email, self.get_short_name())
|
|
|
|
def get_user_info(self):
|
|
"""Get information about user"""
|
|
return {
|
|
"username": self.username,
|
|
"first_name": self.first_name if self.first_name else None,
|
|
"last_name": self.last_name if self.last_name else None,
|
|
"email": self.email if self.email else None,
|
|
"newsletter": self.newsletter,
|
|
"is_active": self.is_active
|
|
}
|
|
|
|
def change_status(self, switcher: bool = False):
|
|
"""Method to set user status to active or inactive"""
|
|
self.is_active = switcher
|
|
self.save()
|
|
|
|
def create_jwt_tokens(self, source: int = None):
|
|
"""Create JWT tokens for user"""
|
|
token = GMRefreshToken.for_user(self, source)
|
|
return {
|
|
'access_token': str(token.access_token),
|
|
'refresh_token': str(token),
|
|
}
|
|
|
|
def expire_access_tokens(self):
|
|
"""Expire all access tokens"""
|
|
self.access_tokens.update(expires_at=timezone.now())
|
|
|
|
def expire_refresh_tokens(self):
|
|
"""Expire all refresh tokens"""
|
|
self.refresh_tokens.update(expires_at=timezone.now())
|
|
|
|
def confirm_email(self):
|
|
"""Method to confirm user email address"""
|
|
if self.unconfirmed_email is not None:
|
|
self.email = self.unconfirmed_email
|
|
self.unconfirmed_email = None
|
|
self.email_confirmed = True
|
|
self.save()
|
|
|
|
def approve(self):
|
|
"""Set user is_active status to True"""
|
|
self.is_active = True
|
|
self.save()
|
|
|
|
def get_body_email_message(self, subject: str, message: str, emails=None):
|
|
"""Prepare the body of the email message"""
|
|
return {
|
|
'subject': subject,
|
|
'message': str(message[0]),
|
|
'html_message': message[1],
|
|
'from_email': settings.EMAIL_HOST_USER,
|
|
'recipient_list': emails if emails else [self.email, ],
|
|
}
|
|
|
|
def send_email(self, subject: str, message: str, emails=None):
|
|
"""Send an email to reset user password"""
|
|
send_mail(**self.get_body_email_message(subject=subject,
|
|
message=message,
|
|
emails=emails))
|
|
|
|
@property
|
|
def confirm_email_token(self):
|
|
"""Make a token for finish signup."""
|
|
return GMTokenGenerator(purpose=GMTokenGenerator.CONFIRM_EMAIL).make_token(self)
|
|
|
|
@property
|
|
def change_email_token(self):
|
|
"""Make a token for change email."""
|
|
return GMTokenGenerator(purpose=GMTokenGenerator.CHANGE_EMAIL).make_token(self)
|
|
|
|
@property
|
|
def reset_password_token(self):
|
|
"""Make a token for finish signup."""
|
|
return GMTokenGenerator(purpose=GMTokenGenerator.RESET_PASSWORD).make_token(self)
|
|
|
|
@property
|
|
def get_user_uidb64(self):
|
|
"""Get base64 value for user by primary key identifier"""
|
|
return urlsafe_base64_encode(force_bytes(self.pk))
|
|
|
|
def base_template(self, country_code='www', username='', subject=''):
|
|
"""Base email template"""
|
|
socials = SiteSettings.objects.by_country_code(country_code).first()
|
|
return {
|
|
'title': subject,
|
|
'domain_uri': settings.DOMAIN_URI,
|
|
'uidb64': self.get_user_uidb64,
|
|
'site_name': settings.SITE_NAME,
|
|
'year': datetime.now().year,
|
|
'twitter_page_url': socials.twitter_page_url if socials else '#',
|
|
'instagram_page_url': socials.instagram_page_url if socials else '#',
|
|
'facebook_page_url': socials.facebook_page_url if socials else '#',
|
|
'send_to': username,
|
|
}
|
|
|
|
@property
|
|
def image_tag(self):
|
|
return mark_safe(f'<img src="{self.image_url}" />')
|
|
|
|
@property
|
|
def cropped_image_tag(self):
|
|
return mark_safe(f'<img src="{self.cropped_image_url}" />')
|
|
|
|
def reset_password_template(self, country_code, username, subject):
|
|
"""Get reset password template"""
|
|
context = {'token': self.reset_password_token,
|
|
'country_code': country_code}
|
|
context.update(self.base_template(country_code, username, subject))
|
|
return render_to_string(
|
|
template_name=settings.RESETTING_TOKEN_TEMPLATE,
|
|
context=context), get_template(settings.RESETTING_TOKEN_TEMPLATE).render(context)
|
|
|
|
def notify_password_changed_template(self, country_code, username, subject):
|
|
"""Get notification email template"""
|
|
context = {'contry_code': country_code}
|
|
context.update(self.base_template(country_code, username, subject))
|
|
return render_to_string(
|
|
template_name=settings.NOTIFICATION_PASSWORD_TEMPLATE,
|
|
context=context,
|
|
), get_template(settings.NOTIFICATION_PASSWORD_TEMPLATE).render(context)
|
|
|
|
def confirm_email_template(self, country_code, username, subject):
|
|
"""Get confirm email template"""
|
|
context = {'token': self.confirm_email_token,
|
|
'country_code': country_code}
|
|
context.update(self.base_template(country_code, username, subject))
|
|
return render_to_string(
|
|
template_name=settings.CONFIRM_EMAIL_TEMPLATE,
|
|
context=context), get_template(settings.CONFIRM_EMAIL_TEMPLATE).render(context)
|
|
|
|
def change_email_template(self, country_code, username, subject):
|
|
"""Get change email template"""
|
|
context = {'token': self.change_email_token,
|
|
'country_code': country_code}
|
|
context.update(self.base_template(country_code, username, subject))
|
|
return render_to_string(
|
|
template_name=settings.CHANGE_EMAIL_TEMPLATE,
|
|
context=context), get_template(settings.CHANGE_EMAIL_TEMPLATE).render(context)
|
|
|
|
@property
|
|
def favorite_establishment_ids(self):
|
|
"""Return establishment IDs that in favorites for current user."""
|
|
return self.favorites.by_content_type(app_label='establishment',
|
|
model='establishment') \
|
|
.values_list('object_id', flat=True)
|
|
|
|
@property
|
|
def favorite_recipe_ids(self):
|
|
"""Return recipe IDs that in favorites for current user."""
|
|
return self.favorites.by_content_type(app_label='recipe',
|
|
model='recipe') \
|
|
.values_list('object_id', flat=True)
|
|
|
|
@property
|
|
def favorite_news_ids(self):
|
|
"""Return news IDs that in favorites for current user."""
|
|
return self.favorites.by_content_type(
|
|
app_label='news',
|
|
model='news',
|
|
).values_list('object_id', flat=True)
|
|
|
|
@property
|
|
def favorite_product_ids(self):
|
|
"""Return news IDs that in favorites for current user."""
|
|
return self.favorites.by_content_type(
|
|
app_label='product',
|
|
model='product',
|
|
).values_list('object_id', flat=True)
|
|
|
|
@property
|
|
def subscription_types(self):
|
|
result = []
|
|
for subscription in self.subscriber.all():
|
|
for item in subscription.active_subscriptions:
|
|
result.append(item.id)
|
|
return set(result)
|
|
|
|
@property
|
|
def is_country_admin(self):
|
|
if self.userrole_set:
|
|
return self.userrole_set.country_admin().exists()
|
|
|
|
@property
|
|
def is_establishment_manager(self):
|
|
if self.userrole_set:
|
|
return self.userrole_set.establishment_manager().exists()
|
|
|
|
@property
|
|
def is_establishment_administrator(self):
|
|
if self.userrole_set:
|
|
return self.userrole_set.establishment_administrator().exists()
|
|
|
|
@property
|
|
def administrated_country_codes(self) -> list:
|
|
if self.userrole_set:
|
|
return list(
|
|
self.userrole_set
|
|
.exclude(role__site__isnull=True)
|
|
.values_list('role__site__country__code', flat=True)
|
|
.distinct()
|
|
)
|
|
|
|
def set_roles(self, ids: List[int]):
|
|
"""
|
|
Set user roles
|
|
:param ids: list of role ids
|
|
:return: bool
|
|
"""
|
|
self.roles.set(Role.objects.filter(id__in=ids))
|
|
return self
|
|
|
|
|
|
class UserRoleQueryset(models.QuerySet):
|
|
"""QuerySet for model UserRole."""
|
|
|
|
def _role_counter(self, country_code: str = None) -> dict:
|
|
additional_filters = {
|
|
'state': self.model.VALIDATED,
|
|
}
|
|
|
|
if country_code:
|
|
additional_filters.update({'role__site__country__code': country_code})
|
|
|
|
user_roles = (
|
|
self.filter(**additional_filters)
|
|
.distinct('user_id', 'role__role')
|
|
.values('user_id', 'role__role')
|
|
)
|
|
return dict(Counter([i['role__role'] for i in user_roles]))
|
|
|
|
def country_admin_role(self):
|
|
return self.filter(role__role=self.model.role.field.target_field.model.COUNTRY_ADMIN,
|
|
state=self.model.VALIDATED)
|
|
|
|
def aggregate_role_counter(self, country_code: str = None) -> list:
|
|
_role_choices = dict(Role.ROLE_CHOICES)
|
|
role_counter = []
|
|
|
|
# fill existed roles
|
|
for role, count in self._role_counter(country_code=country_code).items():
|
|
role_counter.append({
|
|
'role': role,
|
|
'role_name': _role_choices[role],
|
|
'count': count,
|
|
})
|
|
|
|
# check by roles
|
|
for role, role_name in _role_choices.items():
|
|
if role not in [i['role'] for i in role_counter]:
|
|
role_counter.append({
|
|
'role': role,
|
|
'role_name': _role_choices[role],
|
|
'count': 0,
|
|
})
|
|
return role_counter
|
|
|
|
def validated(self):
|
|
"""Filter QuerySet by state."""
|
|
return self.filter(state=self.model.VALIDATED)
|
|
|
|
def country_admin(self):
|
|
"""Return status by role and state"""
|
|
return (
|
|
self.filter(role__role=Role.COUNTRY_ADMIN)
|
|
.validated()
|
|
)
|
|
|
|
def establishment_manager(self):
|
|
"""Return status by role and state"""
|
|
return (
|
|
self.filter(role__role=Role.ESTABLISHMENT_MANAGER)
|
|
.validated()
|
|
)
|
|
|
|
def establishment_administrator(self):
|
|
"""Return status by role and state"""
|
|
return (
|
|
self.filter(role__role=Role.ESTABLISHMENT_ADMINISTRATOR)
|
|
.validated()
|
|
)
|
|
|
|
|
|
class UserRole(ProjectBaseMixin):
|
|
"""UserRole model."""
|
|
VALIDATED = 'validated'
|
|
PENDING = 'pending'
|
|
CANCELLED = 'cancelled'
|
|
REJECTED = 'rejected'
|
|
|
|
STATE_CHOICES = (
|
|
(VALIDATED, _('validated')),
|
|
(PENDING, _('pending')),
|
|
(CANCELLED, _('cancelled')),
|
|
(REJECTED, _('rejected'))
|
|
)
|
|
|
|
user = models.ForeignKey(
|
|
'account.User', verbose_name=_('User'), on_delete=models.CASCADE)
|
|
role = models.ForeignKey(
|
|
Role, verbose_name=_('Role'), on_delete=models.SET_NULL, null=True)
|
|
establishment = models.ForeignKey(
|
|
Establishment, verbose_name=_('Establishment'),
|
|
on_delete=models.SET_NULL, null=True, blank=True)
|
|
|
|
state = models.CharField(
|
|
_('state'), choices=STATE_CHOICES, max_length=10, default=PENDING)
|
|
requester = models.ForeignKey('account.User', on_delete=models.SET_NULL,
|
|
blank=True, null=True, default=None,
|
|
related_name='roles_requested',
|
|
help_text='A user (REQUESTER) who requests a '
|
|
'role change for a USER')
|
|
|
|
objects = UserRoleQueryset.as_manager()
|
|
|
|
class Meta:
|
|
unique_together = ['user', 'role', 'establishment', 'state']
|
|
|
|
|
|
class OldRole(models.Model):
|
|
new_role = models.CharField(verbose_name=_('New role'), max_length=512)
|
|
old_role = models.CharField(verbose_name=_('Old role'), max_length=512, null=True)
|
|
|
|
class Meta:
|
|
unique_together = ('new_role', 'old_role')
|