"""Main app models.""" from typing import Iterable from django.conf import settings from django.contrib.contenttypes import fields as generic from django.contrib.contenttypes.models import ContentType from django.contrib.postgres.fields import JSONField, ArrayField from django.core.validators import EMPTY_VALUES, MinValueValidator from django.db import connections from django.db import models from django.db.models import Q from django.utils.translation import gettext_lazy as _ from mptt.models import MPTTModel from rest_framework import exceptions from configuration.models import TranslationSettings from location.models import Country from main import methods from review.models import Review from tag.models import Tag from utils.exceptions import UnprocessableEntityError from utils.methods import dictfetchall from utils.models import ( ProjectBaseMixin, TJSONField, URLImageMixin, TranslatedFieldsMixin, PlatformMixin, ) class Currency(TranslatedFieldsMixin, models.Model): """Currency model.""" name = TJSONField( _('name'), null=True, blank=True, default=None, help_text='{"en-GB":"some text"}') sign = models.CharField(_('sign'), max_length=255) slug = models.SlugField(max_length=255, unique=True) code = models.CharField(max_length=5, unique=True, null=True, default=None) class Meta: verbose_name = _('currency') verbose_name_plural = _('currencies') def __str__(self): return f'{self.name}' class SiteSettingsQuerySet(models.QuerySet): """Extended queryset for SiteSettings model.""" def with_country(self): return self.filter(country__isnull=False) def by_country_code(self, code): return self.filter(country__code=code) class SiteSettings(ProjectBaseMixin): subdomain = models.CharField(max_length=255, db_index=True, unique=True, verbose_name=_('Subdomain')) country = models.OneToOneField(Country, on_delete=models.PROTECT, null=True, blank=True, default=None, verbose_name=_('Country')) default_site = models.BooleanField(default=False, verbose_name=_('Default site')) pinterest_page_url = models.URLField(blank=True, null=True, default=None, verbose_name=_('Pinterest page URL')) twitter_page_url = models.URLField(blank=True, null=True, default=None, verbose_name=_('Twitter page URL')) facebook_page_url = models.URLField(blank=True, null=True, default=None, verbose_name=_('Facebook page URL')) instagram_page_url = models.URLField(blank=True, null=True, default=None, verbose_name=_('Instagram page URL')) contact_email = models.EmailField(blank=True, null=True, default=None, verbose_name=_('Contact email')) config = JSONField(blank=True, null=True, default=None, verbose_name=_('Config')) ad_config = models.TextField(blank=True, null=True, default=None, verbose_name=_('AD config')) currency = models.ForeignKey(Currency, on_delete=models.PROTECT, null=True, default=None) old_id = models.IntegerField(blank=True, null=True) objects = SiteSettingsQuerySet.as_manager() class Meta: """Meta class.""" verbose_name = _('Site setting') verbose_name_plural = _('Site settings') def __str__(self): return f'ID: "{self.id}". Site: "{self.subdomain}"' @property def published_features(self): return self.feature_set.filter(sitefeature__site_settings=self, sitefeature__published=True) @property def published_sitefeatures(self): return self.sitefeature_set. \ filter(Q(published=True) & Q(feature__source__in=[PlatformMixin.WEB, PlatformMixin.ALL])) @property def site_url(self): return methods.site_url(schema=settings.SCHEMA_URI, subdomain=self.subdomain, domain=settings.SITE_DOMAIN_URI) @property def country_code(self): return self.country.code class Feature(ProjectBaseMixin, PlatformMixin): """Feature model.""" slug = models.SlugField(max_length=255, unique=True) priority = models.IntegerField(blank=True, null=True, default=None) route = models.ForeignKey('PageType', on_delete=models.PROTECT, null=True, default=None) site_settings = models.ManyToManyField(SiteSettings, through='SiteFeature') old_id = models.IntegerField(null=True, blank=True) chosen_tags = generic.GenericRelation(to='tag.ChosenTag') class Meta: """Meta class.""" verbose_name = _('Feature') verbose_name_plural = _('Features') def __str__(self): return f'{self.slug}' @property def get_chosen_tags(self): return Tag.objects.filter(chosen_tags__in=self.chosen_tags.all()).distinct() class SiteFeatureQuerySet(models.QuerySet): """Extended queryset for SiteFeature model.""" def published(self, switcher=True): return self.filter(published=switcher) def by_country_code(self, country_code: str): return self.filter(site_settings__country__code=country_code) def by_sources(self, sources: Iterable[int]): return self.filter(feature__source__in=sources) class SiteFeature(ProjectBaseMixin): """SiteFeature model.""" site_settings = models.ForeignKey(SiteSettings, on_delete=models.CASCADE) feature = models.ForeignKey(Feature, on_delete=models.PROTECT) published = models.BooleanField(default=False, verbose_name=_('Published')) main = models.BooleanField(default=False, help_text='shows on main page', verbose_name=_('Main'), ) backoffice = models.BooleanField(default=False, help_text='shows on backoffice page', verbose_name=_('backoffice'), ) nested = models.ManyToManyField('self', blank=True, symmetrical=False) old_id = models.IntegerField(null=True, blank=True) objects = SiteFeatureQuerySet.as_manager() class Meta: """Meta class.""" verbose_name = _('Site feature') verbose_name_plural = _('Site features') unique_together = ('site_settings', 'feature') class AwardQuerySet(models.QuerySet): def with_base_related(self): return self.prefetch_related('award_type') class Award(TranslatedFieldsMixin, URLImageMixin, models.Model): """Award model.""" WAITING = 0 PUBLISHED = 1 STATE_CHOICES = ( (WAITING, 'waiting'), (PUBLISHED, 'published') ) award_type = models.ForeignKey('main.AwardType', on_delete=models.CASCADE) title = TJSONField( _('title'), null=True, blank=True, default=None, help_text='{"en-GB":"some text"}') vintage_year = models.CharField(_('vintage year'), max_length=255, default='') content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE) object_id = models.PositiveIntegerField() content_object = generic.GenericForeignKey('content_type', 'object_id') state = models.PositiveSmallIntegerField(default=WAITING, choices=STATE_CHOICES, verbose_name=_('State')) old_id = models.IntegerField(null=True, blank=True) objects = AwardQuerySet.as_manager() def __str__(self): title = 'None' lang = TranslationSettings.get_solo().default_language if self.title and lang in self.title: title = self.title[lang] return f'id:{self.id}-{title}' class AwardTypeQuerySet(models.QuerySet): """QuerySet for model AwardType.""" def by_country_code(self, country_code: str): """Filter QuerySet by country code.""" return self.filter(country__code=country_code) class AwardType(models.Model): """AwardType model.""" country = models.ForeignKey( 'location.Country', verbose_name=_('country'), on_delete=models.CASCADE) name = models.CharField(_('name'), max_length=255) old_id = models.IntegerField(null=True, blank=True) years = ArrayField( models.IntegerField(validators=[MinValueValidator(1980)]), verbose_name=_('years'), blank=True, null=True, ) objects = AwardTypeQuerySet.as_manager() def __str__(self): return self.name class CarouselQuerySet(models.QuerySet): """Carousel QuerySet.""" def is_parsed(self): """Parsed carousel objects.""" return self.filter(is_parse=True) def active(self): """Active carousel objects.""" return self.filter(active=True) def by_country_code(self, code): """Filter collection by country code.""" if code in settings.INTERNATIONAL_COUNTRY_CODES: return self.filter(is_international=True) return self.filter(country__code=code) def get_international(self): return self.filter(is_international=True) class Carousel(models.Model): """Carousel model.""" content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, blank=True, null=True, default=None) object_id = models.PositiveIntegerField(blank=True, null=True, default=None) content_object = generic.GenericForeignKey('content_type', 'object_id') is_international = models.BooleanField(default=False, verbose_name=_('is international')) old_id = models.PositiveIntegerField(_('old id'), blank=True, null=True, default=None) title = models.CharField(_('old title'), max_length=255, blank=True, null=True, default=None) link = models.CharField(_('old link'), max_length=255, blank=True, null=True, default=None) attachment_suffix_url = models.TextField(_('old attachment_suffix_url'), blank=True, null=True, default=None) description = models.CharField(_('old description'), max_length=255, blank=True, null=True, default=None) link_title = models.CharField(_('old link_title'), max_length=255, blank=True, null=True, default=None) country = models.ForeignKey( Country, blank=True, null=True, on_delete=models.SET_NULL, verbose_name=_('country') ) active = models.BooleanField(_('old active'), default=False) is_parse = models.BooleanField(_('is parse'), default=False) objects = CarouselQuerySet.as_manager() class Meta: """Meta class.""" verbose_name = _('Carousel') verbose_name_plural = _('Carousel') @property def name(self): # Check if Generic obj has name or title if hasattr(self.content_object, 'name'): return self.content_object.name if hasattr(self.content_object, 'title'): return self.content_object.title_translated @property def awards(self): if hasattr(self.content_object, 'awards'): return self.content_object.awards @property def vintage_year(self): if hasattr(self.content_object, 'reviews'): last_review = self.content_object.reviews.by_status(Review.READY).last() if last_review: return last_review.vintage @property def toque_number(self): if hasattr(self.content_object, 'toque_number'): return self.content_object.toque_number @property def public_mark(self): if hasattr(self.content_object, 'public_mark'): return self.content_object.public_mark @property def image_url(self): if self.attachment_suffix_url: return f'https://s3.eu-central-1.amazonaws.com/gm-test.com/media/{self.attachment_suffix_url}' if hasattr(self.content_object, 'image_url'): return self.content_object.image_url @property def slug(self): if hasattr(self.content_object, 'slugs'): try: return next(iter(self.content_object.slugs.values())) except StopIteration: return None if hasattr(self.content_object, 'slug'): return self.content_object.slug @property def the_most_recent_award(self): if hasattr(self.content_object, 'the_most_recent_award'): return self.content_object.the_most_recent_award @property def model_name(self): from establishment.models import Establishment from news.models import News if isinstance(self.content_object, Establishment): return self.content_object.establishment_type.index_name elif isinstance(self.content_object, News): return self.content_type.model elif self.link not in EMPTY_VALUES: return 'external' return None class PageQuerySet(models.QuerySet): """QuerySet for model Page.""" def by_platform(self, platform: int): """Filter by platform.""" return self.filter(source__in=[Page.ALL, platform]) class Page(URLImageMixin, PlatformMixin, ProjectBaseMixin): """Page model.""" advertisement = models.ForeignKey('advertisement.Advertisement', on_delete=models.PROTECT, null=True, related_name='pages', verbose_name=_('advertisement')) width = models.PositiveIntegerField(null=True, verbose_name=_('Block width')) # 300 height = models.PositiveIntegerField(null=True, verbose_name=_('Block height')) # 250 objects = PageQuerySet.as_manager() class Meta: """Meta class.""" verbose_name = _('page') verbose_name_plural = _('pages') unique_together = ('advertisement', 'source') def __str__(self): """Overridden dunder method.""" return self.get_source_display() class PageTypeQuerySet(models.QuerySet): """QuerySet for model PageType.""" class PageType(ProjectBaseMixin): """Page type model.""" name = models.CharField(max_length=255, unique=True, verbose_name=_('name')) objects = PageTypeQuerySet.as_manager() class Meta: """Meta class.""" verbose_name = _('page type') verbose_name_plural = _('page types') def __str__(self): """Overridden dunder method.""" return self.name class FooterLink(ProjectBaseMixin): link = models.URLField(_('link')) title = models.CharField(_('title'), max_length=255) class Footer(ProjectBaseMixin): site = models.ForeignKey( 'main.SiteSettings', related_name='footers', verbose_name=_('footer'), on_delete=models.PROTECT ) about_us = models.TextField(_('about_us')) copyright = models.TextField(_('copyright')) links = models.ManyToManyField(FooterLink, verbose_name=_('links'), related_name='link_footer') class PanelQuerySet(models.QuerySet): """Panels QuerySet.""" class Panel(ProjectBaseMixin): """Custom panel model with stored SQL query.""" TABLE = 'table' MAILING = 'table' DISPLAY_CHOICES = ( (TABLE, _('table')), (MAILING, _('mailing')) ) name = models.CharField(_('name'), max_length=255) display = models.CharField( _('display'), max_length=255, choices=DISPLAY_CHOICES, blank=True, null=True, default=None ) description = models.CharField( _('description'), max_length=255, blank=True, null=True, default=None) query = models.TextField(_('query'), blank=True, null=True, default=None) user = models.ForeignKey( 'account.User', verbose_name=_('user'), null=True, on_delete=models.SET_NULL) site = models.ForeignKey( 'main.SiteSettings', verbose_name=_('site'), null=True, on_delete=models.SET_NULL) old_id = models.IntegerField( _('old id'), null=True, blank=True, default=None) objects = PanelQuerySet.as_manager() class Meta: verbose_name = _('panel') verbose_name_plural = _('panels') def __str__(self): return self.name def execute_query(self, request): """Execute query""" raw = self.query page = int(request.query_params.get('page', 0)) page_size = int(request.query_params.get('page_size', 10)) if raw: data = { "count": 0, "next": 2, "previous": None, "columns": None, "results": [] } with connections['default'].cursor() as cursor: count = self._raw_count(raw) start = page * page_size cursor.execute(*self.set_limits(start, page_size)) data["count"] = count data["next"] = self.get_next_page(count, page, page_size) data["previous"] = self.get_previous_page(count, page) data["results"] = dictfetchall(cursor) data["columns"] = self._raw_columns(cursor) return data def get_next_page(self, count, page, page_size): max_page = count / page_size - 1 if not 0 <= page <= max_page: raise exceptions.NotFound('Invalid page.') if max_page > page: return page + 1 return None def get_previous_page(self, count, page): if page > 0: return page - 1 return None @staticmethod def _raw_execute(row): with connections['default'].cursor() as cursor: try: cursor.execute(row) return cursor.execute(row) except Exception as er: # TODO: log raise UnprocessableEntityError() def _raw_count(self, subquery): if ';' in subquery: subquery = subquery.replace(';', '') _count_query = f"""SELECT count(*) from ({subquery}) as t;""" # cursor = self._raw_execute(_count_query) with connections['default'].cursor() as cursor: cursor.execute(_count_query) row = cursor.fetchone() return row[0] @staticmethod def _raw_columns(cursor): columns = [col[0] for col in cursor.description] return columns def get_headers(self): with connections['default'].cursor() as cursor: try: cursor.execute(self.query) except Exception as er: raise UnprocessableEntityError() return self._raw_columns(cursor) def get_data(self): with connections['default'].cursor() as cursor: cursor.execute(self.query) return cursor.fetchall() def _raw_page(self, raw, request): page = request.query_params.get('page', 0) page_size = request.query_params.get('page_size', 0) raw = f"""{raw} LIMIT {page_size} OFFSET {page}""" return raw def set_limits(self, start, limit, params=tuple()): limit_offset = '' new_params = tuple() if start > 0: new_params += (start,) limit_offset = ' OFFSET %s' if limit is not None: new_params = (limit,) + new_params limit_offset = ' LIMIT %s' + limit_offset params = params + new_params query = self.query + limit_offset return query, params class NavigationBarPermission(ProjectBaseMixin): """Model for navigation bar item permissions.""" READ = 0 WRITE = 1 PERMISSION_MODES = ( (READ, _('read')), (WRITE, _('write')), ) sections = models.ManyToManyField('main.SiteFeature', verbose_name=_('sections')) permission_mode = models.PositiveSmallIntegerField(choices=PERMISSION_MODES, default=READ, help_text='READ - allows only retrieve data,' 'WRITE - allows to perform any ' 'operations over the object', verbose_name=_('permission mode')) class Meta: """Meta class.""" verbose_name = _('Navigation bar item permission') verbose_name_plural = _('Navigation bar item permissions')