515 lines
18 KiB
Python
515 lines
18 KiB
Python
"""Main app models."""
|
|
from typing import Iterable
|
|
|
|
from django.conf import settings
|
|
from django.contrib.contenttypes import fields as generic
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.contrib.postgres.fields import JSONField
|
|
from django.core.validators import EMPTY_VALUES
|
|
from django.db import connections, connection
|
|
from django.db import models
|
|
from django.db.models import Q
|
|
from django.utils.translation import gettext_lazy as _
|
|
from rest_framework import exceptions
|
|
|
|
from configuration.models import TranslationSettings
|
|
from location.models import Country
|
|
from main import methods
|
|
from review.models import Review
|
|
from utils.exceptions import UnprocessableEntityError
|
|
from utils.methods import dictfetchall
|
|
from utils.models import (ProjectBaseMixin, TJSONField, URLImageMixin,
|
|
TranslatedFieldsMixin, PlatformMixin)
|
|
|
|
|
|
class Currency(TranslatedFieldsMixin, models.Model):
|
|
"""Currency model."""
|
|
name = TJSONField(
|
|
_('name'), null=True, blank=True,
|
|
default=None, help_text='{"en-GB":"some text"}')
|
|
sign = models.CharField(_('sign'), max_length=255)
|
|
slug = models.SlugField(max_length=255, unique=True)
|
|
code = models.CharField(max_length=5, unique=True, null=True, default=None)
|
|
|
|
class Meta:
|
|
verbose_name = _('currency')
|
|
verbose_name_plural = _('currencies')
|
|
|
|
def __str__(self):
|
|
return f'{self.name}'
|
|
|
|
|
|
class SiteSettingsQuerySet(models.QuerySet):
|
|
"""Extended queryset for SiteSettings model."""
|
|
|
|
def with_country(self):
|
|
return self.filter(country__isnull=False)
|
|
|
|
def by_country_code(self, code):
|
|
return self.filter(country__code=code)
|
|
|
|
|
|
class SiteSettings(ProjectBaseMixin):
|
|
subdomain = models.CharField(max_length=255, db_index=True, unique=True,
|
|
verbose_name=_('Subdomain'))
|
|
country = models.OneToOneField(Country, on_delete=models.PROTECT,
|
|
null=True, blank=True, default=None,
|
|
verbose_name=_('Country'))
|
|
default_site = models.BooleanField(default=False,
|
|
verbose_name=_('Default site'))
|
|
pinterest_page_url = models.URLField(blank=True, null=True, default=None,
|
|
verbose_name=_('Pinterest page URL'))
|
|
twitter_page_url = models.URLField(blank=True, null=True, default=None,
|
|
verbose_name=_('Twitter page URL'))
|
|
facebook_page_url = models.URLField(blank=True, null=True, default=None,
|
|
verbose_name=_('Facebook page URL'))
|
|
instagram_page_url = models.URLField(blank=True, null=True, default=None,
|
|
verbose_name=_('Instagram page URL'))
|
|
contact_email = models.EmailField(blank=True, null=True, default=None,
|
|
verbose_name=_('Contact email'))
|
|
config = JSONField(blank=True, null=True, default=None,
|
|
verbose_name=_('Config'))
|
|
ad_config = models.TextField(blank=True, null=True, default=None,
|
|
verbose_name=_('AD config'))
|
|
currency = models.ForeignKey(Currency, on_delete=models.PROTECT, null=True, default=None)
|
|
|
|
old_id = models.IntegerField(blank=True, null=True)
|
|
|
|
objects = SiteSettingsQuerySet.as_manager()
|
|
|
|
class Meta:
|
|
"""Meta class."""
|
|
|
|
verbose_name = _('Site setting')
|
|
verbose_name_plural = _('Site settings')
|
|
|
|
def __str__(self):
|
|
return f'ID: "{self.id}". Site: "{self.subdomain}"'
|
|
|
|
@property
|
|
def published_features(self):
|
|
return self.feature_set.filter(sitefeature__site_settings=self,
|
|
sitefeature__published=True)
|
|
|
|
@property
|
|
def published_sitefeatures(self):
|
|
return self.sitefeature_set. \
|
|
filter(Q(published=True) &
|
|
Q(feature__source__in=[PlatformMixin.WEB, PlatformMixin.ALL]))
|
|
|
|
@property
|
|
def site_url(self):
|
|
return methods.site_url(schema=settings.SCHEMA_URI,
|
|
subdomain=self.subdomain,
|
|
domain=settings.SITE_DOMAIN_URI)
|
|
|
|
|
|
class Feature(ProjectBaseMixin, PlatformMixin):
|
|
"""Feature model."""
|
|
|
|
slug = models.SlugField(max_length=255, unique=True)
|
|
priority = models.IntegerField(unique=True, null=True, default=None)
|
|
route = models.ForeignKey('PageType', on_delete=models.PROTECT, null=True, default=None)
|
|
site_settings = models.ManyToManyField(SiteSettings, through='SiteFeature')
|
|
old_id = models.IntegerField(null=True, blank=True)
|
|
|
|
class Meta:
|
|
"""Meta class."""
|
|
verbose_name = _('Feature')
|
|
verbose_name_plural = _('Features')
|
|
|
|
def __str__(self):
|
|
return f'{self.slug}'
|
|
|
|
|
|
class SiteFeatureQuerySet(models.QuerySet):
|
|
"""Extended queryset for SiteFeature model."""
|
|
|
|
def published(self, switcher=True):
|
|
return self.filter(published=switcher)
|
|
|
|
def by_country_code(self, country_code: str):
|
|
return self.filter(site_settings__country__code=country_code)
|
|
|
|
def by_sources(self, sources: Iterable[int]):
|
|
return self.filter(feature__source__in=sources)
|
|
|
|
|
|
class SiteFeature(ProjectBaseMixin):
|
|
"""SiteFeature model."""
|
|
|
|
site_settings = models.ForeignKey(SiteSettings, on_delete=models.CASCADE)
|
|
feature = models.ForeignKey(Feature, on_delete=models.PROTECT)
|
|
published = models.BooleanField(default=False, verbose_name=_('Published'))
|
|
main = models.BooleanField(default=False, verbose_name=_('Main'))
|
|
nested = models.ManyToManyField('self', symmetrical=False)
|
|
old_id = models.IntegerField(null=True, blank=True)
|
|
|
|
objects = SiteFeatureQuerySet.as_manager()
|
|
|
|
class Meta:
|
|
"""Meta class."""
|
|
|
|
verbose_name = _('Site feature')
|
|
verbose_name_plural = _('Site features')
|
|
unique_together = ('site_settings', 'feature')
|
|
|
|
|
|
class Award(TranslatedFieldsMixin, URLImageMixin, models.Model):
|
|
"""Award model."""
|
|
WAITING = 0
|
|
PUBLISHED = 1
|
|
|
|
STATE_CHOICES = (
|
|
(WAITING, 'waiting'),
|
|
(PUBLISHED, 'published')
|
|
)
|
|
|
|
award_type = models.ForeignKey('main.AwardType', on_delete=models.CASCADE)
|
|
title = TJSONField(
|
|
_('title'), null=True, blank=True,
|
|
default=None, help_text='{"en-GB":"some text"}')
|
|
vintage_year = models.CharField(_('vintage year'), max_length=255, default='')
|
|
|
|
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
|
|
object_id = models.PositiveIntegerField()
|
|
content_object = generic.GenericForeignKey('content_type', 'object_id')
|
|
|
|
state = models.PositiveSmallIntegerField(default=WAITING, choices=STATE_CHOICES,
|
|
verbose_name=_('State'))
|
|
|
|
old_id = models.IntegerField(null=True, blank=True)
|
|
|
|
def __str__(self):
|
|
title = 'None'
|
|
lang = TranslationSettings.get_solo().default_language
|
|
if self.title and lang in self.title:
|
|
title = self.title[lang]
|
|
return f'id:{self.id}-{title}'
|
|
|
|
|
|
class AwardType(models.Model):
|
|
"""AwardType model."""
|
|
country = models.ForeignKey(
|
|
'location.Country', verbose_name=_('country'), on_delete=models.CASCADE)
|
|
name = models.CharField(_('name'), max_length=255)
|
|
old_id = models.IntegerField(null=True, blank=True)
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
|
|
class CarouselQuerySet(models.QuerySet):
|
|
"""Carousel QuerySet."""
|
|
|
|
def is_parsed(self):
|
|
"""Parsed carousel objects."""
|
|
return self.filter(is_parse=True)
|
|
|
|
def active(self):
|
|
"""Active carousel objects."""
|
|
return self.filter(active=True)
|
|
|
|
def by_country_code(self, code):
|
|
"""Filter collection by country code."""
|
|
return self.filter(country__code=code)
|
|
|
|
|
|
class Carousel(models.Model):
|
|
"""Carousel model."""
|
|
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, blank=True, null=True, default=None)
|
|
object_id = models.PositiveIntegerField(blank=True, null=True, default=None)
|
|
content_object = generic.GenericForeignKey('content_type', 'object_id')
|
|
|
|
old_id = models.PositiveIntegerField(_('old id'), blank=True, null=True, default=None)
|
|
title = models.CharField(_('old title'), max_length=255, blank=True, null=True, default=None)
|
|
link = models.CharField(_('old link'), max_length=255, blank=True, null=True, default=None)
|
|
attachment_suffix_url = models.TextField(_('old attachment_suffix_url'), blank=True, null=True, default=None)
|
|
description = models.CharField(_('old description'), max_length=255, blank=True, null=True, default=None)
|
|
link_title = models.CharField(_('old link_title'), max_length=255, blank=True, null=True, default=None)
|
|
country = models.ForeignKey(
|
|
Country,
|
|
blank=True,
|
|
null=True,
|
|
on_delete=models.SET_NULL,
|
|
verbose_name=_('country')
|
|
)
|
|
active = models.BooleanField(_('old active'), default=False)
|
|
is_parse = models.BooleanField(_('is parse'), default=False)
|
|
|
|
objects = CarouselQuerySet.as_manager()
|
|
|
|
class Meta:
|
|
"""Meta class."""
|
|
verbose_name = _('Carousel')
|
|
verbose_name_plural = _('Carousel')
|
|
|
|
@property
|
|
def name(self):
|
|
# Check if Generic obj has name or title
|
|
if hasattr(self.content_object, 'name'):
|
|
return self.content_object.name
|
|
if hasattr(self.content_object, 'title'):
|
|
return self.content_object.title_translated
|
|
|
|
@property
|
|
def awards(self):
|
|
if hasattr(self.content_object, 'awards'):
|
|
return self.content_object.awards
|
|
|
|
@property
|
|
def vintage_year(self):
|
|
if hasattr(self.content_object, 'reviews'):
|
|
last_review = self.content_object.reviews.by_status(Review.READY).last()
|
|
if last_review:
|
|
return last_review.vintage
|
|
|
|
@property
|
|
def toque_number(self):
|
|
if hasattr(self.content_object, 'toque_number'):
|
|
return self.content_object.toque_number
|
|
|
|
@property
|
|
def public_mark(self):
|
|
if hasattr(self.content_object, 'public_mark'):
|
|
return self.content_object.public_mark
|
|
|
|
@property
|
|
def image_url(self):
|
|
if self.attachment_suffix_url:
|
|
return f'https://s3.eu-central-1.amazonaws.com/gm-test.com/media/{self.attachment_suffix_url}'
|
|
if hasattr(self.content_object, 'image_url'):
|
|
return self.content_object.image_url
|
|
|
|
@property
|
|
def slug(self):
|
|
if hasattr(self.content_object, 'slugs'):
|
|
try:
|
|
return next(iter(self.content_object.slugs.values()))
|
|
except StopIteration:
|
|
return None
|
|
if hasattr(self.content_object, 'slug'):
|
|
return self.content_object.slug
|
|
|
|
@property
|
|
def the_most_recent_award(self):
|
|
if hasattr(self.content_object, 'the_most_recent_award'):
|
|
return self.content_object.the_most_recent_award
|
|
|
|
@property
|
|
def model_name(self):
|
|
from establishment.models import Establishment
|
|
from news.models import News
|
|
if isinstance(self.content_object, Establishment):
|
|
return self.content_object.establishment_type.index_name
|
|
elif isinstance(self.content_object, News):
|
|
return self.content_type.model
|
|
elif self.link not in EMPTY_VALUES:
|
|
return 'external'
|
|
return None
|
|
|
|
|
|
class PageQuerySet(models.QuerySet):
|
|
"""QuerySet for model Page."""
|
|
|
|
def by_platform(self, platform: int):
|
|
"""Filter by platform."""
|
|
return self.filter(source__in=[Page.ALL, platform])
|
|
|
|
|
|
class Page(URLImageMixin, PlatformMixin, ProjectBaseMixin):
|
|
"""Page model."""
|
|
advertisement = models.ForeignKey('advertisement.Advertisement',
|
|
on_delete=models.PROTECT, null=True,
|
|
related_name='pages',
|
|
verbose_name=_('advertisement'))
|
|
width = models.PositiveIntegerField(null=True,
|
|
verbose_name=_('Block width')) # 300
|
|
height = models.PositiveIntegerField(null=True,
|
|
verbose_name=_('Block height')) # 250
|
|
|
|
objects = PageQuerySet.as_manager()
|
|
|
|
class Meta:
|
|
"""Meta class."""
|
|
verbose_name = _('page')
|
|
verbose_name_plural = _('pages')
|
|
unique_together = ('advertisement', 'source')
|
|
|
|
def __str__(self):
|
|
"""Overridden dunder method."""
|
|
return self.get_source_display()
|
|
|
|
|
|
class PageTypeQuerySet(models.QuerySet):
|
|
"""QuerySet for model PageType."""
|
|
|
|
|
|
class PageType(ProjectBaseMixin):
|
|
"""Page type model."""
|
|
|
|
name = models.CharField(max_length=255, unique=True,
|
|
verbose_name=_('name'))
|
|
|
|
objects = PageTypeQuerySet.as_manager()
|
|
|
|
class Meta:
|
|
"""Meta class."""
|
|
verbose_name = _('page type')
|
|
verbose_name_plural = _('page types')
|
|
|
|
def __str__(self):
|
|
"""Overridden dunder method."""
|
|
return self.name
|
|
|
|
|
|
class FooterLink(ProjectBaseMixin):
|
|
link = models.URLField(_('link'))
|
|
title = models.CharField(_('title'), max_length=255)
|
|
|
|
|
|
class Footer(ProjectBaseMixin):
|
|
site = models.ForeignKey(
|
|
'main.SiteSettings', related_name='footers', verbose_name=_('footer'),
|
|
on_delete=models.PROTECT
|
|
)
|
|
about_us = models.TextField(_('about_us'))
|
|
copyright = models.TextField(_('copyright'))
|
|
links = models.ManyToManyField(FooterLink, verbose_name=_('links'), related_name='link_footer')
|
|
|
|
|
|
class PanelQuerySet(models.QuerySet):
|
|
"""Panels QuerySet."""
|
|
|
|
|
|
class Panel(ProjectBaseMixin):
|
|
"""Custom panel model with stored SQL query."""
|
|
TABLE = 'table'
|
|
MAILING = 'table'
|
|
|
|
DISPLAY_CHOICES = (
|
|
(TABLE, _('table')),
|
|
(MAILING, _('mailing'))
|
|
)
|
|
name = models.CharField(_('name'), max_length=255)
|
|
display = models.CharField(
|
|
_('display'), max_length=255, choices=DISPLAY_CHOICES,
|
|
blank=True, null=True, default=None
|
|
)
|
|
description = models.CharField(
|
|
_('description'), max_length=255, blank=True, null=True, default=None)
|
|
query = models.TextField(_('query'), blank=True, null=True, default=None)
|
|
user = models.ForeignKey(
|
|
'account.User', verbose_name=_('user'), null=True,
|
|
on_delete=models.SET_NULL)
|
|
site = models.ForeignKey(
|
|
'main.SiteSettings', verbose_name=_('site'), null=True,
|
|
on_delete=models.SET_NULL)
|
|
old_id = models.IntegerField(
|
|
_('old id'), null=True, blank=True, default=None)
|
|
|
|
objects = PanelQuerySet.as_manager()
|
|
|
|
class Meta:
|
|
verbose_name = _('panel')
|
|
verbose_name_plural = _('panels')
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
def execute_query(self, request):
|
|
"""Execute query"""
|
|
raw = self.query
|
|
page = int(request.query_params.get('page', 0))
|
|
page_size = int(request.query_params.get('page_size', 10))
|
|
|
|
if raw:
|
|
data = {
|
|
"count": 0,
|
|
"next": 2,
|
|
"previous": None,
|
|
"columns": None,
|
|
"results": []
|
|
|
|
}
|
|
with connections['default'].cursor() as cursor:
|
|
count = self._raw_count(raw)
|
|
start = page*page_size
|
|
cursor.execute(*self.set_limits(start, page_size))
|
|
data["count"] = count
|
|
data["next"] = self.get_next_page(count, page, page_size)
|
|
data["previous"] = self.get_previous_page(count, page)
|
|
data["results"] = dictfetchall(cursor)
|
|
data["columns"] = self._raw_columns(cursor)
|
|
return data
|
|
|
|
def get_next_page(self, count, page, page_size):
|
|
max_page = count/page_size-1
|
|
if not 0 <= page <= max_page:
|
|
raise exceptions.NotFound('Invalid page.')
|
|
if max_page > page:
|
|
return page + 1
|
|
return None
|
|
|
|
def get_previous_page(self, count, page):
|
|
if page > 0:
|
|
return page - 1
|
|
return None
|
|
|
|
@staticmethod
|
|
def _raw_execute(row):
|
|
with connections['default'].cursor() as cursor:
|
|
try:
|
|
cursor.execute(row)
|
|
return cursor.execute(row)
|
|
except Exception as er:
|
|
# TODO: log
|
|
raise UnprocessableEntityError()
|
|
|
|
def _raw_count(self, subquery):
|
|
if ';' in subquery:
|
|
subquery = subquery.replace(';', '')
|
|
_count_query = f"""SELECT count(*) from ({subquery}) as t;"""
|
|
# cursor = self._raw_execute(_count_query)
|
|
with connections['default'].cursor() as cursor:
|
|
cursor.execute(_count_query)
|
|
row = cursor.fetchone()
|
|
return row[0]
|
|
|
|
@staticmethod
|
|
def _raw_columns(cursor):
|
|
columns = [col[0] for col in cursor.description]
|
|
return columns
|
|
|
|
def get_headers(self):
|
|
with connections['default'].cursor() as cursor:
|
|
try:
|
|
cursor.execute(self.query)
|
|
except Exception as er:
|
|
raise UnprocessableEntityError()
|
|
return self._raw_columns(cursor)
|
|
|
|
def get_data(self):
|
|
with connections['default'].cursor() as cursor:
|
|
cursor.execute(self.query)
|
|
return cursor.fetchall()
|
|
|
|
def _raw_page(self, raw, request):
|
|
page = request.query_params.get('page', 0)
|
|
page_size = request.query_params.get('page_size', 0)
|
|
raw = f"""{raw} LIMIT {page_size} OFFSET {page}"""
|
|
return raw
|
|
|
|
def set_limits(self, start, limit, params=tuple()):
|
|
limit_offset = ''
|
|
new_params = tuple()
|
|
if start > 0:
|
|
new_params += (start,)
|
|
limit_offset = ' OFFSET %s'
|
|
if limit is not None:
|
|
new_params = (limit,) + new_params
|
|
limit_offset = ' LIMIT %s' + limit_offset
|
|
params = params + new_params
|
|
query = self.query + limit_offset
|
|
return query, params
|