"""Account models""" from collections import Counter from datetime import datetime from typing import List from django.conf import settings from django.contrib.auth.models import AbstractUser, UserManager as BaseUserManager from django.contrib.postgres.indexes import GinIndex from django.contrib.postgres.search import TrigramSimilarity from django.core.mail import send_mail from django.db import models from django.template.loader import render_to_string, get_template from django.utils import timezone from django.utils.encoding import force_bytes from django.utils.html import mark_safe from django.utils.http import urlsafe_base64_encode from django.utils.translation import ugettext_lazy as _ from phonenumber_field.modelfields import PhoneNumberField from rest_framework.authtoken.models import Token from authorization.models import Application from establishment.models import Establishment, EstablishmentSubType from location.models import Country from main.models import SiteSettings from utils.models import GMTokenGenerator from utils.models import ImageMixin, ProjectBaseMixin, PlatformMixin, PhoneModelMixin from utils.tokens import GMRefreshToken class RoleQuerySet(models.QuerySet): def annotate_role_name(self): return self.annotate(role_name=models.Case(*self.model.role_condition_expressions(), output_field=models.CharField())) def annotate_role_counter(self): return self.annotate( role_counter=models.Count('userrole', distinct=True, filter=models.Q(userrole__state=UserRole.VALIDATED))) class Role(ProjectBaseMixin): """Base Role model.""" STANDARD_USER = 1 MODERATOR = 2 COUNTRY_ADMIN = 3 CONTENT_PAGE_MANAGER = 4 ESTABLISHMENT_MANAGER = 5 REVIEW_MANAGER = 6 RESTAURANT_INSPECTOR = 7 SALES_MAN = 8 WINERY_WINE_INSPECTOR = 9 SELLER = 10 DISTILLERY_LIQUOR_INSPECTOR = 11 PRODUCER_FOOD_INSPECTOR = 12 ESTABLISHMENT_ADMINISTRATOR = 13 ARTISAN_INSPECTOR = 14 ROLE_CHOICES = ( (STANDARD_USER, _('Standard user')), (MODERATOR, _('Moderator')), (COUNTRY_ADMIN, _('Country admin')), (CONTENT_PAGE_MANAGER, _('Content page manager')), (ESTABLISHMENT_MANAGER, _('Establishment manager')), (REVIEW_MANAGER, _('Review manager')), (RESTAURANT_INSPECTOR, _('Restaurant inspector')), (SALES_MAN, _('Sales man')), (WINERY_WINE_INSPECTOR, _('Winery & Wine inspector')), (SELLER, _('Seller')), (DISTILLERY_LIQUOR_INSPECTOR, _('Distillery & Liquor inspector')), (PRODUCER_FOOD_INSPECTOR, _('Producer & Food inspector')), (ESTABLISHMENT_ADMINISTRATOR, _('Establishment administrator')), (ARTISAN_INSPECTOR, _('Artisan inspector')), ) role = models.PositiveIntegerField(verbose_name=_('Role'), choices=ROLE_CHOICES, null=False, blank=False) country = models.ForeignKey(Country, verbose_name=_('Country'), null=True, blank=True, on_delete=models.SET_NULL) site = models.ForeignKey(SiteSettings, verbose_name=_('Site settings'), null=True, blank=True, on_delete=models.SET_NULL) establishment_subtype = models.ForeignKey(EstablishmentSubType, verbose_name=_('Establishment subtype'), null=True, blank=True, on_delete=models.SET_NULL) navigation_bar_permission = models.ForeignKey('main.NavigationBarPermission', blank=True, null=True, on_delete=models.SET_NULL, help_text='navigation bar item permission', verbose_name=_('navigation bar permission')) objects = RoleQuerySet.as_manager() @classmethod def role_types(cls): roles = [] for role, display_name in dict(cls.ROLE_CHOICES).items(): roles.append({'role_name': role, 'role_counter': display_name, 'role': 0}) return roles class UserManager(BaseUserManager): """Extended manager for User model.""" use_in_migrations = False def make(self, email: str, password: str, newsletter: bool, username: str = '', email_confirmed=False) -> object: """Register new user""" obj = self.model( username=username, email=email.lower(), newsletter=newsletter, email_confirmed=email_confirmed) obj.set_password(password) obj.save() return obj def invite_for_team(self, email: str, establishment: Establishment, country_code: str): created = False user = User.objects.filter(email=email).first() if user is None: from utils.methods import string_random user = self.make(email, string_random(), True, string_random(), email_confirmed=True) created = True user_role_defaults = {'for_team': True, 'state': UserRole.PENDING} role, is_role_created = Role.objects.get_or_create(role=Role.ESTABLISHMENT_ADMINISTRATOR) user_role, is_user_role_created = UserRole.objects.get_or_create(user=user, establishment=establishment, role=role, defaults=user_role_defaults) if not is_user_role_created: from rest_framework.serializers import ValidationError raise ValidationError({'detail': f'User with this role already exists. State: {user_role.state}'}) if created: from account.tasks import send_team_invite_to_new_user if settings.USE_CELERY: send_team_invite_to_new_user.delay(user.pk, country_code, user_role.pk, establishment.name) else: send_team_invite_to_new_user(user.pk, country_code, user_role.pk, establishment.name) else: from account.tasks import send_team_invite_to_existing_user if settings.USE_CELERY: send_team_invite_to_existing_user.delay(user.pk, country_code, user_role.pk, establishment.name) else: send_team_invite_to_existing_user(user.pk, country_code, user_role.pk, establishment.name) return user class UserQuerySet(models.QuerySet): """Extended queryset for User model.""" def with_base_related(self): """Return QuerySet with base related.""" return self.select_related('last_country', 'last_country__country') def with_extend_related(self): """Return QuerySet with extend related.""" return self.with_base_related().prefetch_related('roles', 'subscriber') def active(self, switcher=True): """Filter only active users.""" return self.filter(is_active=switcher) def by_oauth2_access_token(self, token): """Find users by access token""" return self.filter(oauth2_provider_accesstoken__token=token, oauth2_provider_accesstoken__expires__gt=timezone.now()) def by_oauth2_refresh_token(self, token): """Find users by access token""" return self.filter(oauth2_provider_refreshtoken__token=token, oauth2_provider_refreshtoken__expires__gt=timezone.now()) def by_role(self, role): """Filter by role.""" return self.filter(userrole__role=role) def by_roles(self, roles: list): """Filter by roles.""" return self.filter(userrole__role__in=roles) def establishment_admin(self, establishment): role = Role.objects.filter(role=Role.ESTABLISHMENT_MANAGER).first() return self.by_role(role).filter(userrole__establishment=establishment) def full_text_search(self, search_value: str): return self.annotate( username_similarity=models.Case( models.When( models.Q(username__isnull=False), then=TrigramSimilarity('username', search_value.lower()) ), default=0, output_field=models.FloatField() ), first_name_similarity=models.Case( models.When( models.Q(first_name__isnull=False), then=TrigramSimilarity('first_name', search_value.lower()) ), default=0, output_field=models.FloatField() ), last_name_similarity=models.Case( models.When( models.Q(last_name__isnull=False), then=TrigramSimilarity('last_name', search_value.lower()) ), default=0, output_field=models.FloatField() ), email_similarity=models.Case( models.When( models.Q(email__isnull=False), then=TrigramSimilarity('email', search_value.lower()) ), default=0, output_field=models.FloatField() ), phone_similarity=models.Case( models.When( models.Q(phone__isnull=False), then=TrigramSimilarity('phone', search_value.lower()) ), default=0, output_field=models.FloatField() ), relevance=( models.F('username_similarity') + models.F('first_name_similarity') + models.F('last_name_similarity') + models.F('email_similarity') + models.F('phone_similarity') ), ).filter(relevance__gte=0.1).order_by('-relevance') def by_role_country_code(self, country_code: str): """Filter by role country code.""" return self.filter(userrole__role__country__code=country_code).distinct() class User(PhoneModelMixin, AbstractUser): """Base user model.""" username = models.CharField( _('username'), max_length=150, blank=True, null=True, default=None, help_text=_('Required. 150 characters or fewer. Letters, digits and ./+/-/_ only.'), error_messages={ 'unique': _("A user with that username already exists."), }, ) image_url = models.URLField(verbose_name=_('Image URL path'), blank=True, null=True, default=None, max_length=500) cropped_image_url = models.URLField(verbose_name=_('Cropped image URL path'), blank=True, null=True, default=None) email = models.EmailField(_('email address'), unique=True, null=True, default=None) unconfirmed_email = models.EmailField(_('unconfirmed email'), blank=True, null=True, default=None) email_confirmed = models.BooleanField(_('email status'), default=False) confirmed_at = models.DateTimeField(_('confirmed at'), null=True, blank=True, default=None) newsletter = models.NullBooleanField(default=True) old_id = models.IntegerField(null=True, blank=True, default=None) locale = models.CharField(max_length=10, blank=True, default=None, null=True, verbose_name=_('User last used locale')) city = models.TextField(default=None, blank=True, null=True, verbose_name=_('User last visited from city')) last_ip = models.GenericIPAddressField(_('last IP address'), blank=True, null=True, default=None) last_country = models.ForeignKey( SiteSettings, verbose_name=_('last site settings'), blank=True, null=True, default=None, on_delete=models.SET_NULL, ) phone = PhoneNumberField(blank=True, null=True, default=None, verbose_name=_('Phone')) EMAIL_FIELD = 'email' USERNAME_FIELD = 'email' REQUIRED_FIELDS = ['username'] roles = models.ManyToManyField( Role, verbose_name=_('Roles'), symmetrical=False, through_fields=('user', 'role'), through='UserRole') objects = UserManager.from_queryset(UserQuerySet)() class Meta: """Meta class.""" verbose_name = _('User') verbose_name_plural = _('Users') indexes = [ GinIndex(fields=['username']), GinIndex(fields=['first_name']), GinIndex(fields=['last_name']), GinIndex(fields=['email']), GinIndex(fields=['phone']), ] def __str__(self): """String method.""" return "%s:%s" % (self.email, self.get_short_name()) def get_user_info(self): """Get information about user""" return { "username": self.username, "first_name": self.first_name if self.first_name else None, "last_name": self.last_name if self.last_name else None, "email": self.email if self.email else None, "newsletter": self.newsletter, "is_active": self.is_active } def change_status(self, switcher: bool = False): """Method to set user status to active or inactive""" self.is_active = switcher self.save() def create_jwt_tokens(self, source: int = None): """Create JWT tokens for user""" token = GMRefreshToken.for_user(self, source) return { 'access_token': str(token.access_token), 'refresh_token': str(token), } def expire_access_tokens(self): """Expire all access tokens""" self.access_tokens.update(expires_at=timezone.now()) def expire_refresh_tokens(self): """Expire all refresh tokens""" self.refresh_tokens.update(expires_at=timezone.now()) def confirm_email(self): """Method to confirm user email address""" if self.unconfirmed_email is not None: self.email = self.unconfirmed_email self.unconfirmed_email = None self.email_confirmed = True self.confirmed_at = datetime.now() self.save() def approve(self): """Set user is_active status to True""" self.is_active = True self.save() def get_body_email_message(self, subject: str, message: str, emails=None): """Prepare the body of the email message""" return { 'subject': subject, 'message': str(message[0]), 'html_message': message[1], 'from_email': settings.EMAIL_HOST_USER, 'recipient_list': emails if emails else [self.email, ], } def send_email(self, subject: str, message: str, emails=None): """Send an email to reset user password""" send_mail(**self.get_body_email_message(subject=subject, message=message, emails=emails)) @property def confirm_email_token(self): """Make a token for finish signup.""" return GMTokenGenerator(purpose=GMTokenGenerator.CONFIRM_EMAIL).make_token(self) @property def change_email_token(self): """Make a token for change email.""" return GMTokenGenerator(purpose=GMTokenGenerator.CHANGE_EMAIL).make_token(self) @property def reset_password_token(self): """Make a token for finish signup.""" return GMTokenGenerator(purpose=GMTokenGenerator.RESET_PASSWORD).make_token(self) @property def get_user_uidb64(self): """Get base64 value for user by primary key identifier""" return urlsafe_base64_encode(force_bytes(self.pk)) def base_template(self, country_code='www', username='', subject=''): """Base email template""" socials = SiteSettings.objects.by_country_code(country_code).first() return { 'title': subject, 'domain_uri': settings.DOMAIN_URI, 'uidb64': self.get_user_uidb64, 'site_name': settings.SITE_NAME, 'year': datetime.now().year, 'twitter_page_url': socials.twitter_page_url if socials else '#', 'instagram_page_url': socials.instagram_page_url if socials else '#', 'facebook_page_url': socials.facebook_page_url if socials else '#', 'contact_email': socials.contact_email if socials else '-', 'send_to': username, } @property def image_tag(self): return mark_safe(f'') @property def cropped_image_tag(self): return mark_safe(f'') def reset_password_template(self, country_code, username, subject): """Get reset password template""" context = {'token': self.reset_password_token, 'country_code': country_code} context.update(self.base_template(country_code, username, subject)) return render_to_string( template_name=settings.RESETTING_TOKEN_TEMPLATE, context=context), get_template(settings.RESETTING_TOKEN_TEMPLATE).render(context) def invite_new_establishment_member_template(self, country_code, username, subject, restaurant_name, user_role_id): """Template for newly created user establishment team invite""" context = {'token': self.reset_password_token, 'country_code': country_code, 'restaurant_name': restaurant_name, 'user_role_id': user_role_id} context.update(self.base_template(country_code, username, subject)) return render_to_string( template_name=settings.NEW_USER_FOR_ESTABLISHMENT_TEAM_TEMPLATE, context=context), get_template(settings.NEW_USER_FOR_ESTABLISHMENT_TEAM_TEMPLATE).render(context) def invite_establishment_member_template(self, country_code, username, subject, restaurant_name, user_role_id): """Template existing user establishment team invite""" context = {'token': self.reset_password_token, 'country_code': country_code, 'restaurant_name': restaurant_name, 'user_role_id': user_role_id} context.update(self.base_template(country_code, username, subject)) return render_to_string( template_name=settings.EXISTING_USER_FOR_ESTABLISHMENT_TEAM_TEMPLATE, context=context), get_template(settings.EXISTING_USER_FOR_ESTABLISHMENT_TEAM_TEMPLATE).render(context) def establishment_team_role_revoked(self, country_code, username, subject, restaurant_name): """Template to notify user that his/her establishment role is revoked""" context = {'token': self.reset_password_token, 'country_code': country_code, 'restaurant_name': restaurant_name} context.update(self.base_template(country_code, username, subject)) return render_to_string( template_name=settings.ESTABLISHMENT_TEAM_ROLE_REVOKED_TEMPLATE, context=context), get_template(settings.ESTABLISHMENT_TEAM_ROLE_REVOKED_TEMPLATE).render(context) def notify_password_changed_template(self, country_code, username, subject): """Get notification email template""" context = {'country_code': country_code} context.update(self.base_template(country_code, username, subject)) return render_to_string( template_name=settings.NOTIFICATION_PASSWORD_TEMPLATE, context=context, ), get_template(settings.NOTIFICATION_PASSWORD_TEMPLATE).render(context) def confirm_email_template(self, country_code, username, subject): """Get confirm email template""" context = {'token': self.confirm_email_token, 'country_code': country_code} context.update(self.base_template(country_code, username, subject)) return render_to_string( template_name=settings.CONFIRM_EMAIL_TEMPLATE, context=context), get_template(settings.CONFIRM_EMAIL_TEMPLATE).render(context) def change_email_template(self, country_code, username, subject): """Get change email template""" context = {'token': self.change_email_token, 'country_code': country_code} context.update(self.base_template(country_code, username, subject)) return render_to_string( template_name=settings.CHANGE_EMAIL_TEMPLATE, context=context), get_template(settings.CHANGE_EMAIL_TEMPLATE).render(context) @property def favorite_establishment_ids(self): """Return establishment IDs that in favorites for current user.""" return self.favorites.by_content_type(app_label='establishment', model='establishment') \ .values_list('object_id', flat=True) @property def favorite_recipe_ids(self): """Return recipe IDs that in favorites for current user.""" return self.favorites.by_content_type(app_label='recipe', model='recipe') \ .values_list('object_id', flat=True) @property def favorite_news_ids(self): """Return news IDs that in favorites for current user.""" return self.favorites.by_content_type( app_label='news', model='news', ).values_list('object_id', flat=True) @property def favorite_product_ids(self): """Return news IDs that in favorites for current user.""" return self.favorites.by_content_type( app_label='product', model='product', ).values_list('object_id', flat=True) @property def subscription_types(self): result = [] for subscription in self.subscriber.all(): for item in subscription.active_subscriptions: result.append(item.id) return set(result) @property def is_country_admin(self): if self.userrole_set: return self.userrole_set.country_admin().exists() @property def is_establishment_manager(self): if self.userrole_set: return self.userrole_set.establishment_manager().exists() @property def is_establishment_administrator(self): if self.userrole_set: return self.userrole_set.establishment_administrator().exists() @property def administrated_country_codes(self) -> list: if self.userrole_set: return list( self.userrole_set .exclude(role__site__isnull=True) .values_list('role__site__country__code', flat=True) .distinct() ) def set_roles(self, ids: List[int]): """ Set user roles :param ids: list of role ids :return: bool """ self.roles.set(Role.objects.filter(id__in=ids)) return self @property def country_name(self): if self.last_country: return self.last_country.country.name_translated return None @property def locale_roles(self): """ Str roles, like "Standard user: at, Moderator: ru" for сsv api method """ result_list = [] roles = self.roles.all() for role in roles: result_list.append(f'{role.get_role_display()}: {role.country.code}') return ', '.join(result_list) class UserRoleQueryset(models.QuerySet): """QuerySet for model UserRole.""" def _role_counter(self, country_code: str = None) -> dict: additional_filters = { 'state': self.model.VALIDATED, } if country_code: additional_filters.update({'role__site__country__code': country_code}) user_roles = ( self.filter(**additional_filters) .distinct('user_id', 'role__role') .values('user_id', 'role__role') ) return dict(Counter([i['role__role'] for i in user_roles])) def country_admin_role(self): return self.filter(role__role=self.model.role.field.target_field.model.COUNTRY_ADMIN, state=self.model.VALIDATED) def aggregate_role_counter(self, country_code: str = None) -> list: _role_choices = dict(Role.ROLE_CHOICES) role_counter = [] # fill existed roles for role, count in self._role_counter(country_code=country_code).items(): role_counter.append({ 'role': role, 'role_name': _role_choices[role], 'count': count, }) # check by roles for role, role_name in _role_choices.items(): if role not in [i['role'] for i in role_counter]: role_counter.append({ 'role': role, 'role_name': _role_choices[role], 'count': 0, }) return role_counter def validated(self): """Filter QuerySet by state.""" return self.filter(state=self.model.VALIDATED) def country_admin(self): """Return status by role and state""" return ( self.filter(role__role=Role.COUNTRY_ADMIN) .validated() ) def establishment_manager(self): """Return status by role and state""" return ( self.filter(role__role=Role.ESTABLISHMENT_MANAGER) .validated() ) def establishment_administrator(self): """Return status by role and state""" return ( self.filter(role__role=Role.ESTABLISHMENT_ADMINISTRATOR) .validated() ) class UserRole(ProjectBaseMixin): """UserRole model.""" VALIDATED = 'validated' PENDING = 'pending' CANCELLED = 'cancelled' REJECTED = 'rejected' STATE_CHOICES = ( (VALIDATED, _('validated')), (PENDING, _('pending')), (CANCELLED, _('cancelled')), (REJECTED, _('rejected')) ) user = models.ForeignKey( 'account.User', verbose_name=_('User'), on_delete=models.CASCADE) role = models.ForeignKey( Role, verbose_name=_('Role'), on_delete=models.SET_NULL, null=True) establishment = models.ForeignKey( Establishment, verbose_name=_('Establishment'), on_delete=models.SET_NULL, null=True, blank=True) state = models.CharField( _('state'), choices=STATE_CHOICES, max_length=10, default=PENDING) requester = models.ForeignKey('account.User', on_delete=models.SET_NULL, blank=True, null=True, default=None, related_name='roles_requested', help_text='A user (REQUESTER) who requests a ' 'role change for a USER') for_team = models.BooleanField(verbose_name=_('is this role for team membership'), default=False) objects = UserRoleQueryset.as_manager() class Meta: unique_together = ['user', 'role', 'establishment', 'state'] class OldRole(models.Model): new_role = models.CharField(verbose_name=_('New role'), max_length=512) old_role = models.CharField(verbose_name=_('Old role'), max_length=512, null=True) class Meta: unique_together = ('new_role', 'old_role')