610 lines
23 KiB
Python
610 lines
23 KiB
Python
"""Account models"""
|
||
from collections import Counter
|
||
from datetime import datetime
|
||
from typing import List
|
||
|
||
from django.conf import settings
|
||
from django.contrib.auth.models import AbstractUser, UserManager as BaseUserManager
|
||
from django.contrib.postgres.search import TrigramSimilarity
|
||
from django.core.mail import send_mail
|
||
from django.db import models
|
||
from django.template.loader import render_to_string, get_template
|
||
from django.utils import timezone
|
||
from django.utils.encoding import force_bytes
|
||
from django.utils.html import mark_safe
|
||
from django.utils.http import urlsafe_base64_encode
|
||
from django.utils.translation import ugettext_lazy as _
|
||
from phonenumber_field.modelfields import PhoneNumberField
|
||
from rest_framework.authtoken.models import Token
|
||
|
||
from authorization.models import Application
|
||
from establishment.models import Establishment, EstablishmentSubType
|
||
from location.models import Country
|
||
from main.models import SiteSettings
|
||
from utils.models import GMTokenGenerator
|
||
from utils.models import ImageMixin, ProjectBaseMixin, PlatformMixin
|
||
from utils.tokens import GMRefreshToken
|
||
|
||
|
||
class RoleQuerySet(models.QuerySet):
|
||
|
||
def annotate_role_name(self):
|
||
return self.annotate(role_name=models.Case(*self.model.role_condition_expressions(),
|
||
output_field=models.CharField()))
|
||
|
||
def annotate_role_counter(self):
|
||
return self.annotate(
|
||
role_counter=models.Count('userrole',
|
||
distinct=True,
|
||
filter=models.Q(userrole__state=UserRole.VALIDATED)))
|
||
|
||
|
||
class Role(ProjectBaseMixin):
|
||
"""Base Role model."""
|
||
STANDARD_USER = 1
|
||
MODERATOR = 2
|
||
COUNTRY_ADMIN = 3
|
||
CONTENT_PAGE_MANAGER = 4
|
||
ESTABLISHMENT_MANAGER = 5
|
||
REVIEW_MANAGER = 6
|
||
RESTAURANT_INSPECTOR = 7
|
||
SALES_MAN = 8
|
||
WINERY_WINE_INSPECTOR = 9
|
||
SELLER = 10
|
||
DISTILLERY_LIQUOR_INSPECTOR = 11
|
||
PRODUCER_FOOD_INSPECTOR = 12
|
||
ESTABLISHMENT_ADMINISTRATOR = 13
|
||
ARTISAN_INSPECTOR = 14
|
||
|
||
ROLE_CHOICES = (
|
||
(STANDARD_USER, _('Standard user')),
|
||
(MODERATOR, _('Moderator')),
|
||
(COUNTRY_ADMIN, _('Country admin')),
|
||
(CONTENT_PAGE_MANAGER, _('Content page manager')),
|
||
(ESTABLISHMENT_MANAGER, _('Establishment manager')),
|
||
(REVIEW_MANAGER, _('Review manager')),
|
||
(RESTAURANT_INSPECTOR, _('Restaurant inspector')),
|
||
(SALES_MAN, _('Sales man')),
|
||
(WINERY_WINE_INSPECTOR, _('Winery and wine inspector')),
|
||
(SELLER, _('Seller')),
|
||
(DISTILLERY_LIQUOR_INSPECTOR, _('Distillery & Liquor inspector')),
|
||
(PRODUCER_FOOD_INSPECTOR, _('Producer food inspector')),
|
||
(ESTABLISHMENT_ADMINISTRATOR, _('Establishment administrator')),
|
||
(ARTISAN_INSPECTOR, _('Artisan inspector')),
|
||
)
|
||
|
||
role = models.PositiveIntegerField(verbose_name=_('Role'), choices=ROLE_CHOICES,
|
||
null=False, blank=False)
|
||
country = models.ForeignKey(Country, verbose_name=_('Country'),
|
||
null=True, blank=True, on_delete=models.SET_NULL)
|
||
site = models.ForeignKey(SiteSettings, verbose_name=_('Site settings'),
|
||
null=True, blank=True, on_delete=models.SET_NULL)
|
||
establishment_subtype = models.ForeignKey(EstablishmentSubType,
|
||
verbose_name=_('Establishment subtype'),
|
||
null=True, blank=True, on_delete=models.SET_NULL)
|
||
navigation_bar_permission = models.ForeignKey('main.NavigationBarPermission',
|
||
blank=True, null=True,
|
||
on_delete=models.SET_NULL,
|
||
help_text='navigation bar item permission',
|
||
verbose_name=_('navigation bar permission'))
|
||
|
||
objects = RoleQuerySet.as_manager()
|
||
|
||
@classmethod
|
||
def role_types(cls):
|
||
roles = []
|
||
for role, display_name in dict(cls.ROLE_CHOICES).items():
|
||
roles.append({'role_name': role, 'role_counter': display_name, 'role': 0})
|
||
return roles
|
||
|
||
|
||
class UserManager(BaseUserManager):
|
||
"""Extended manager for User model."""
|
||
|
||
use_in_migrations = False
|
||
|
||
def make(self, email: str, password: str, newsletter: bool, username: str = '') -> object:
|
||
"""Register new user"""
|
||
obj = self.model(
|
||
username=username,
|
||
email=email.lower(),
|
||
newsletter=newsletter)
|
||
obj.set_password(password)
|
||
obj.save()
|
||
return obj
|
||
|
||
|
||
class UserQuerySet(models.QuerySet):
|
||
"""Extended queryset for User model."""
|
||
|
||
def with_base_related(self):
|
||
"""Return QuerySet with base related."""
|
||
return self.select_related('last_country', 'last_country__country')
|
||
|
||
def with_extend_related(self):
|
||
"""Return QuerySet with extend related."""
|
||
return self.with_base_related().prefetch_related('roles', 'subscriber')
|
||
|
||
def active(self, switcher=True):
|
||
"""Filter only active users."""
|
||
return self.filter(is_active=switcher)
|
||
|
||
def by_oauth2_access_token(self, token):
|
||
"""Find users by access token"""
|
||
return self.filter(oauth2_provider_accesstoken__token=token,
|
||
oauth2_provider_accesstoken__expires__gt=timezone.now())
|
||
|
||
def by_oauth2_refresh_token(self, token):
|
||
"""Find users by access token"""
|
||
return self.filter(oauth2_provider_refreshtoken__token=token,
|
||
oauth2_provider_refreshtoken__expires__gt=timezone.now())
|
||
|
||
def by_role(self, role):
|
||
"""Filter by role."""
|
||
return self.filter(userrole__role=role)
|
||
|
||
def by_roles(self, roles: list):
|
||
"""Filter by roles."""
|
||
return self.filter(userrole__role__in=roles)
|
||
|
||
def establishment_admin(self, establishment):
|
||
role = Role.objects.filter(role=Role.ESTABLISHMENT_MANAGER).first()
|
||
return self.by_role(role).filter(userrole__establishment=establishment)
|
||
|
||
def full_text_search(self, search_value: str):
|
||
return self.annotate(
|
||
username_similarity=models.Case(
|
||
models.When(
|
||
models.Q(username__isnull=False),
|
||
then=TrigramSimilarity('username', search_value.lower())
|
||
),
|
||
default=0,
|
||
output_field=models.FloatField()
|
||
),
|
||
first_name_similarity=models.Case(
|
||
models.When(
|
||
models.Q(first_name__isnull=False),
|
||
then=TrigramSimilarity('first_name', search_value.lower())
|
||
),
|
||
default=0,
|
||
output_field=models.FloatField()
|
||
),
|
||
last_name_similarity=models.Case(
|
||
models.When(
|
||
models.Q(last_name__isnull=False),
|
||
then=TrigramSimilarity('last_name', search_value.lower())
|
||
),
|
||
default=0,
|
||
output_field=models.FloatField()
|
||
),
|
||
email_similarity=models.Case(
|
||
models.When(
|
||
models.Q(email__isnull=False),
|
||
then=TrigramSimilarity('email', search_value.lower())
|
||
),
|
||
default=0,
|
||
output_field=models.FloatField()
|
||
),
|
||
phone_similarity=models.Case(
|
||
models.When(
|
||
models.Q(phone__isnull=False),
|
||
then=TrigramSimilarity('phone', search_value.lower())
|
||
),
|
||
default=0,
|
||
output_field=models.FloatField()
|
||
),
|
||
relevance=(
|
||
models.F('username_similarity') +
|
||
models.F('first_name_similarity') +
|
||
models.F('last_name_similarity') +
|
||
models.F('email_similarity') +
|
||
models.F('phone_similarity')
|
||
),
|
||
).filter(relevance__gte=0.1).order_by('-relevance')
|
||
|
||
def by_role_country_code(self, country_code: str):
|
||
"""Filter by role country code."""
|
||
return self.filter(userrole__role__country__code=country_code).distinct()
|
||
|
||
|
||
class User(AbstractUser):
|
||
"""Base user model."""
|
||
username = models.CharField(
|
||
_('username'),
|
||
max_length=150,
|
||
blank=True, null=True, default=None,
|
||
help_text=_('Required. 150 characters or fewer. Letters, digits and ./+/-/_ only.'),
|
||
error_messages={
|
||
'unique': _("A user with that username already exists."),
|
||
},
|
||
)
|
||
image_url = models.URLField(verbose_name=_('Image URL path'),
|
||
blank=True, null=True, default=None,
|
||
max_length=500)
|
||
cropped_image_url = models.URLField(verbose_name=_('Cropped image URL path'),
|
||
blank=True, null=True, default=None)
|
||
email = models.EmailField(_('email address'), unique=True,
|
||
null=True, default=None)
|
||
unconfirmed_email = models.EmailField(_('unconfirmed email'), blank=True, null=True, default=None)
|
||
email_confirmed = models.BooleanField(_('email status'), default=False)
|
||
confirmed_at = models.DateTimeField(_('confirmed at'), null=True, blank=True, default=None)
|
||
newsletter = models.NullBooleanField(default=True)
|
||
old_id = models.IntegerField(null=True, blank=True, default=None)
|
||
locale = models.CharField(max_length=10, blank=True, default=None, null=True,
|
||
verbose_name=_('User last used locale'))
|
||
city = models.TextField(default=None, blank=True, null=True,
|
||
verbose_name=_('User last visited from city'))
|
||
last_ip = models.GenericIPAddressField(_('last IP address'), blank=True, null=True, default=None)
|
||
last_country = models.ForeignKey(
|
||
SiteSettings,
|
||
verbose_name=_('last site settings'),
|
||
blank=True,
|
||
null=True,
|
||
default=None,
|
||
on_delete=models.SET_NULL,
|
||
)
|
||
phone = PhoneNumberField(blank=True, null=True, default=None,
|
||
verbose_name=_('Phone'))
|
||
|
||
EMAIL_FIELD = 'email'
|
||
USERNAME_FIELD = 'email'
|
||
REQUIRED_FIELDS = ['username']
|
||
|
||
roles = models.ManyToManyField(
|
||
Role, verbose_name=_('Roles'), symmetrical=False,
|
||
through_fields=('user', 'role'), through='UserRole')
|
||
objects = UserManager.from_queryset(UserQuerySet)()
|
||
|
||
class Meta:
|
||
"""Meta class."""
|
||
verbose_name = _('User')
|
||
verbose_name_plural = _('Users')
|
||
|
||
def __str__(self):
|
||
"""String method."""
|
||
return "%s:%s" % (self.email, self.get_short_name())
|
||
|
||
def get_user_info(self):
|
||
"""Get information about user"""
|
||
return {
|
||
"username": self.username,
|
||
"first_name": self.first_name if self.first_name else None,
|
||
"last_name": self.last_name if self.last_name else None,
|
||
"email": self.email if self.email else None,
|
||
"newsletter": self.newsletter,
|
||
"is_active": self.is_active
|
||
}
|
||
|
||
def change_status(self, switcher: bool = False):
|
||
"""Method to set user status to active or inactive"""
|
||
self.is_active = switcher
|
||
self.save()
|
||
|
||
def create_jwt_tokens(self, source: int = None):
|
||
"""Create JWT tokens for user"""
|
||
token = GMRefreshToken.for_user(self, source)
|
||
return {
|
||
'access_token': str(token.access_token),
|
||
'refresh_token': str(token),
|
||
}
|
||
|
||
def expire_access_tokens(self):
|
||
"""Expire all access tokens"""
|
||
self.access_tokens.update(expires_at=timezone.now())
|
||
|
||
def expire_refresh_tokens(self):
|
||
"""Expire all refresh tokens"""
|
||
self.refresh_tokens.update(expires_at=timezone.now())
|
||
|
||
def confirm_email(self):
|
||
"""Method to confirm user email address"""
|
||
if self.unconfirmed_email is not None:
|
||
self.email = self.unconfirmed_email
|
||
self.unconfirmed_email = None
|
||
self.email_confirmed = True
|
||
self.confirmed_at = datetime.now()
|
||
self.save()
|
||
|
||
def approve(self):
|
||
"""Set user is_active status to True"""
|
||
self.is_active = True
|
||
self.save()
|
||
|
||
def get_body_email_message(self, subject: str, message: str, emails=None):
|
||
"""Prepare the body of the email message"""
|
||
return {
|
||
'subject': subject,
|
||
'message': str(message[0]),
|
||
'html_message': message[1],
|
||
'from_email': settings.EMAIL_HOST_USER,
|
||
'recipient_list': emails if emails else [self.email, ],
|
||
}
|
||
|
||
def send_email(self, subject: str, message: str, emails=None):
|
||
"""Send an email to reset user password"""
|
||
send_mail(**self.get_body_email_message(subject=subject,
|
||
message=message,
|
||
emails=emails))
|
||
|
||
@property
|
||
def confirm_email_token(self):
|
||
"""Make a token for finish signup."""
|
||
return GMTokenGenerator(purpose=GMTokenGenerator.CONFIRM_EMAIL).make_token(self)
|
||
|
||
@property
|
||
def change_email_token(self):
|
||
"""Make a token for change email."""
|
||
return GMTokenGenerator(purpose=GMTokenGenerator.CHANGE_EMAIL).make_token(self)
|
||
|
||
@property
|
||
def reset_password_token(self):
|
||
"""Make a token for finish signup."""
|
||
return GMTokenGenerator(purpose=GMTokenGenerator.RESET_PASSWORD).make_token(self)
|
||
|
||
@property
|
||
def get_user_uidb64(self):
|
||
"""Get base64 value for user by primary key identifier"""
|
||
return urlsafe_base64_encode(force_bytes(self.pk))
|
||
|
||
def base_template(self, country_code='www', username='', subject=''):
|
||
"""Base email template"""
|
||
socials = SiteSettings.objects.by_country_code(country_code).first()
|
||
return {
|
||
'title': subject,
|
||
'domain_uri': settings.DOMAIN_URI,
|
||
'uidb64': self.get_user_uidb64,
|
||
'site_name': settings.SITE_NAME,
|
||
'year': datetime.now().year,
|
||
'twitter_page_url': socials.twitter_page_url if socials else '#',
|
||
'instagram_page_url': socials.instagram_page_url if socials else '#',
|
||
'facebook_page_url': socials.facebook_page_url if socials else '#',
|
||
'send_to': username,
|
||
}
|
||
|
||
@property
|
||
def image_tag(self):
|
||
return mark_safe(f'<img src="{self.image_url}" />')
|
||
|
||
@property
|
||
def cropped_image_tag(self):
|
||
return mark_safe(f'<img src="{self.cropped_image_url}" />')
|
||
|
||
def reset_password_template(self, country_code, username, subject):
|
||
"""Get reset password template"""
|
||
context = {'token': self.reset_password_token,
|
||
'country_code': country_code}
|
||
context.update(self.base_template(country_code, username, subject))
|
||
return render_to_string(
|
||
template_name=settings.RESETTING_TOKEN_TEMPLATE,
|
||
context=context), get_template(settings.RESETTING_TOKEN_TEMPLATE).render(context)
|
||
|
||
def notify_password_changed_template(self, country_code, username, subject):
|
||
"""Get notification email template"""
|
||
context = {'contry_code': country_code}
|
||
context.update(self.base_template(country_code, username, subject))
|
||
return render_to_string(
|
||
template_name=settings.NOTIFICATION_PASSWORD_TEMPLATE,
|
||
context=context,
|
||
), get_template(settings.NOTIFICATION_PASSWORD_TEMPLATE).render(context)
|
||
|
||
def confirm_email_template(self, country_code, username, subject):
|
||
"""Get confirm email template"""
|
||
context = {'token': self.confirm_email_token,
|
||
'country_code': country_code}
|
||
context.update(self.base_template(country_code, username, subject))
|
||
return render_to_string(
|
||
template_name=settings.CONFIRM_EMAIL_TEMPLATE,
|
||
context=context), get_template(settings.CONFIRM_EMAIL_TEMPLATE).render(context)
|
||
|
||
def change_email_template(self, country_code, username, subject):
|
||
"""Get change email template"""
|
||
context = {'token': self.change_email_token,
|
||
'country_code': country_code}
|
||
context.update(self.base_template(country_code, username, subject))
|
||
return render_to_string(
|
||
template_name=settings.CHANGE_EMAIL_TEMPLATE,
|
||
context=context), get_template(settings.CHANGE_EMAIL_TEMPLATE).render(context)
|
||
|
||
@property
|
||
def favorite_establishment_ids(self):
|
||
"""Return establishment IDs that in favorites for current user."""
|
||
return self.favorites.by_content_type(app_label='establishment',
|
||
model='establishment') \
|
||
.values_list('object_id', flat=True)
|
||
|
||
@property
|
||
def favorite_recipe_ids(self):
|
||
"""Return recipe IDs that in favorites for current user."""
|
||
return self.favorites.by_content_type(app_label='recipe',
|
||
model='recipe') \
|
||
.values_list('object_id', flat=True)
|
||
|
||
@property
|
||
def favorite_news_ids(self):
|
||
"""Return news IDs that in favorites for current user."""
|
||
return self.favorites.by_content_type(
|
||
app_label='news',
|
||
model='news',
|
||
).values_list('object_id', flat=True)
|
||
|
||
@property
|
||
def favorite_product_ids(self):
|
||
"""Return news IDs that in favorites for current user."""
|
||
return self.favorites.by_content_type(
|
||
app_label='product',
|
||
model='product',
|
||
).values_list('object_id', flat=True)
|
||
|
||
@property
|
||
def subscription_types(self):
|
||
result = []
|
||
for subscription in self.subscriber.all():
|
||
for item in subscription.active_subscriptions:
|
||
result.append(item.id)
|
||
return set(result)
|
||
|
||
@property
|
||
def is_country_admin(self):
|
||
if self.userrole_set:
|
||
return self.userrole_set.country_admin().exists()
|
||
|
||
@property
|
||
def is_establishment_manager(self):
|
||
if self.userrole_set:
|
||
return self.userrole_set.establishment_manager().exists()
|
||
|
||
@property
|
||
def is_establishment_administrator(self):
|
||
if self.userrole_set:
|
||
return self.userrole_set.establishment_administrator().exists()
|
||
|
||
@property
|
||
def administrated_country_codes(self) -> list:
|
||
if self.userrole_set:
|
||
return list(
|
||
self.userrole_set
|
||
.exclude(role__site__isnull=True)
|
||
.values_list('role__site__country__code', flat=True)
|
||
.distinct()
|
||
)
|
||
|
||
def set_roles(self, ids: List[int]):
|
||
"""
|
||
Set user roles
|
||
:param ids: list of role ids
|
||
:return: bool
|
||
"""
|
||
self.roles.set(Role.objects.filter(id__in=ids))
|
||
return self
|
||
|
||
@property
|
||
def country_name(self):
|
||
if self.last_country:
|
||
return self.last_country.country.name_translated
|
||
return None
|
||
|
||
@property
|
||
def locale_roles(self):
|
||
"""
|
||
Str roles, like "Standard user: at, Moderator: ru" for сsv api method
|
||
"""
|
||
result_list = []
|
||
roles = self.roles.all()
|
||
for role in roles:
|
||
result_list.append(f'{role.get_role_display()}: {role.country.code}')
|
||
return ', '.join(result_list)
|
||
|
||
|
||
class UserRoleQueryset(models.QuerySet):
|
||
"""QuerySet for model UserRole."""
|
||
|
||
def _role_counter(self, country_code: str = None) -> dict:
|
||
additional_filters = {
|
||
'state': self.model.VALIDATED,
|
||
}
|
||
|
||
if country_code:
|
||
additional_filters.update({'role__site__country__code': country_code})
|
||
|
||
user_roles = (
|
||
self.filter(**additional_filters)
|
||
.distinct('user_id', 'role__role')
|
||
.values('user_id', 'role__role')
|
||
)
|
||
return dict(Counter([i['role__role'] for i in user_roles]))
|
||
|
||
def country_admin_role(self):
|
||
return self.filter(role__role=self.model.role.field.target_field.model.COUNTRY_ADMIN,
|
||
state=self.model.VALIDATED)
|
||
|
||
def aggregate_role_counter(self, country_code: str = None) -> list:
|
||
_role_choices = dict(Role.ROLE_CHOICES)
|
||
role_counter = []
|
||
|
||
# fill existed roles
|
||
for role, count in self._role_counter(country_code=country_code).items():
|
||
role_counter.append({
|
||
'role': role,
|
||
'role_name': _role_choices[role],
|
||
'count': count,
|
||
})
|
||
|
||
# check by roles
|
||
for role, role_name in _role_choices.items():
|
||
if role not in [i['role'] for i in role_counter]:
|
||
role_counter.append({
|
||
'role': role,
|
||
'role_name': _role_choices[role],
|
||
'count': 0,
|
||
})
|
||
return role_counter
|
||
|
||
def validated(self):
|
||
"""Filter QuerySet by state."""
|
||
return self.filter(state=self.model.VALIDATED)
|
||
|
||
def country_admin(self):
|
||
"""Return status by role and state"""
|
||
return (
|
||
self.filter(role__role=Role.COUNTRY_ADMIN)
|
||
.validated()
|
||
)
|
||
|
||
def establishment_manager(self):
|
||
"""Return status by role and state"""
|
||
return (
|
||
self.filter(role__role=Role.ESTABLISHMENT_MANAGER)
|
||
.validated()
|
||
)
|
||
|
||
def establishment_administrator(self):
|
||
"""Return status by role and state"""
|
||
return (
|
||
self.filter(role__role=Role.ESTABLISHMENT_ADMINISTRATOR)
|
||
.validated()
|
||
)
|
||
|
||
|
||
class UserRole(ProjectBaseMixin):
|
||
"""UserRole model."""
|
||
VALIDATED = 'validated'
|
||
PENDING = 'pending'
|
||
CANCELLED = 'cancelled'
|
||
REJECTED = 'rejected'
|
||
|
||
STATE_CHOICES = (
|
||
(VALIDATED, _('validated')),
|
||
(PENDING, _('pending')),
|
||
(CANCELLED, _('cancelled')),
|
||
(REJECTED, _('rejected'))
|
||
)
|
||
|
||
user = models.ForeignKey(
|
||
'account.User', verbose_name=_('User'), on_delete=models.CASCADE)
|
||
role = models.ForeignKey(
|
||
Role, verbose_name=_('Role'), on_delete=models.SET_NULL, null=True)
|
||
establishment = models.ForeignKey(
|
||
Establishment, verbose_name=_('Establishment'),
|
||
on_delete=models.SET_NULL, null=True, blank=True)
|
||
|
||
state = models.CharField(
|
||
_('state'), choices=STATE_CHOICES, max_length=10, default=PENDING)
|
||
requester = models.ForeignKey('account.User', on_delete=models.SET_NULL,
|
||
blank=True, null=True, default=None,
|
||
related_name='roles_requested',
|
||
help_text='A user (REQUESTER) who requests a '
|
||
'role change for a USER')
|
||
|
||
objects = UserRoleQueryset.as_manager()
|
||
|
||
class Meta:
|
||
unique_together = ['user', 'role', 'establishment', 'state']
|
||
|
||
|
||
class OldRole(models.Model):
|
||
new_role = models.CharField(verbose_name=_('New role'), max_length=512)
|
||
old_role = models.CharField(verbose_name=_('Old role'), max_length=512, null=True)
|
||
|
||
class Meta:
|
||
unique_together = ('new_role', 'old_role')
|