533 lines
22 KiB
Python
533 lines
22 KiB
Python
"""News app models."""
|
|
import uuid
|
|
from datetime import datetime
|
|
|
|
import elasticsearch_dsl
|
|
from django.conf import settings
|
|
from django.contrib.contenttypes import fields as generic
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.contrib.postgres.fields import HStoreField
|
|
from django.contrib.postgres.search import TrigramSimilarity
|
|
from django.db import models
|
|
from django.db.models import Case, F, Q, When
|
|
from django.db.models.functions import Cast
|
|
from django.urls.exceptions import NoReverseMatch
|
|
from django.utils import timezone
|
|
from django.utils.translation import gettext_lazy as _
|
|
from rest_framework.reverse import reverse
|
|
|
|
from main.models import Carousel
|
|
from rating.models import Rating
|
|
from utils.models import (
|
|
BaseAttributes, FavoritesMixin, GalleryMixin, HasTagsMixin, IntermediateGalleryModelMixin,
|
|
ProjectBaseMixin,
|
|
TJSONField, TranslatedFieldsMixin, TypeDefaultImageMixin,
|
|
)
|
|
from utils.querysets import TranslationQuerysetMixin
|
|
from location.models import Country
|
|
from utils.parsers import NewsSlug
|
|
|
|
|
|
class Agenda(ProjectBaseMixin, TranslatedFieldsMixin):
|
|
"""News agenda model"""
|
|
start_date = models.DateField(default=timezone.now, editable=True,
|
|
verbose_name=_('Start date'))
|
|
start_time = models.TimeField(default=timezone.now, editable=True,
|
|
verbose_name=_('Start time'))
|
|
end_date = models.DateField(default=timezone.now, editable=True,
|
|
verbose_name=_('End date'))
|
|
end_time = models.TimeField(default=timezone.now, editable=True,
|
|
verbose_name=_('End time'))
|
|
address = models.TextField(verbose_name=_('event address'), default=None, blank=True)
|
|
event_name = TJSONField(blank=True, null=True, default=None,
|
|
verbose_name=_('event name'),
|
|
help_text='{"en-GB":"some text"}')
|
|
content = TJSONField(blank=True, null=True, default=None,
|
|
verbose_name=_('content'),
|
|
help_text='{"en-GB":"some text"}')
|
|
|
|
|
|
class NewsBanner(ProjectBaseMixin, TranslatedFieldsMixin):
|
|
"""News banner model"""
|
|
title = TJSONField(blank=True, null=True, default=None,
|
|
verbose_name=_('title'),
|
|
help_text='{"en-GB":"some text"}')
|
|
image_url = models.URLField(verbose_name=_('Image URL path'),
|
|
blank=True, null=True, default=None)
|
|
content_url = models.URLField(verbose_name=_('Content URL path'),
|
|
blank=True, null=True, default=None)
|
|
|
|
|
|
class NewsTypeQuerySet(models.QuerySet):
|
|
"""QuerySet for model NewsType"""
|
|
|
|
def with_base_related(self):
|
|
return self.select_related('default_image')
|
|
|
|
|
|
class NewsType(ProjectBaseMixin, TypeDefaultImageMixin):
|
|
"""NewsType model."""
|
|
|
|
name = models.CharField(_('name'), max_length=250)
|
|
tag_categories = models.ManyToManyField('tag.TagCategory',
|
|
related_name='news_types')
|
|
|
|
default_image = models.ForeignKey('gallery.Image', on_delete=models.SET_NULL,
|
|
related_name='news_types',
|
|
blank=True, null=True, default=None,
|
|
verbose_name='default image')
|
|
|
|
objects = NewsTypeQuerySet.as_manager()
|
|
|
|
class Meta:
|
|
"""Meta class."""
|
|
|
|
verbose_name_plural = _('news types')
|
|
verbose_name = _('news type')
|
|
|
|
def __str__(self):
|
|
"""Overrided __str__ method."""
|
|
return self.name
|
|
|
|
|
|
class NewsQuerySet(TranslationQuerysetMixin):
|
|
"""QuerySet for model News"""
|
|
|
|
def sort_by_start(self):
|
|
"""Return qs sorted by start DESC"""
|
|
return self.order_by('-publication_date', '-publication_time')
|
|
|
|
def rating_value(self):
|
|
return self.annotate(rating=models.Count('ratings__ip', distinct=True))
|
|
|
|
def with_base_related(self):
|
|
"""Return qs with related objects."""
|
|
return self.select_related('news_type', 'country').prefetch_related('tags', 'tags__translation', 'gallery')
|
|
|
|
def with_extended_related(self):
|
|
"""Return qs with related objects."""
|
|
return self.select_related('created_by', 'agenda', 'banner')
|
|
|
|
def by_type(self, news_type):
|
|
"""Filter News by type"""
|
|
return self.filter(news_type__name=news_type)
|
|
|
|
def by_tags(self, tags):
|
|
return self.filter(tags__in=tags)
|
|
|
|
def by_country_code(self, code):
|
|
"""Filter collection by country code."""
|
|
return self.filter(country__code=code)
|
|
|
|
def recipe_news(self):
|
|
"""Returns news with tag 'cook' qs."""
|
|
return self.filter(tags__value=News.RECIPES_TAG_VALUE)
|
|
|
|
def international_news(self):
|
|
"""Returns only international news qs."""
|
|
return self.filter(tags__value=News.INTERNATIONAL_TAG_VALUE)
|
|
|
|
def published(self):
|
|
"""Return only published news"""
|
|
now = timezone.now()
|
|
date_now = now.date()
|
|
time_now = now.time()
|
|
return self.exclude(models.Q(publication_date__isnull=True) | models.Q(publication_time__isnull=True)). \
|
|
filter(models.Q(models.Q(end__gte=now) |
|
|
models.Q(end__isnull=True)),
|
|
state__in=self.model.PUBLISHED_STATES) \
|
|
.annotate(visible_now=Case(
|
|
When(publication_date__gt=date_now, then=False),
|
|
When(Q(publication_date=date_now) & Q(publication_time__gt=time_now), then=False),
|
|
default=True,
|
|
output_field=models.BooleanField()
|
|
)) \
|
|
.exclude(visible_now=False)
|
|
|
|
# todo: filter by best score
|
|
# todo: filter by country?
|
|
def should_read(self, news, user):
|
|
return self.model.objects.exclude(pk=news.pk).published(). \
|
|
annotate_in_favorites(user). \
|
|
filter(country=news.country). \
|
|
with_base_related().by_type(news.news_type).distinct().order_by('?')
|
|
|
|
def same_theme(self, news, user):
|
|
return self.model.objects.exclude(pk=news.pk).published(). \
|
|
annotate_in_favorites(user). \
|
|
with_base_related().by_type(news.news_type). \
|
|
by_tags(news.tags.all()).distinct().sort_by_start()
|
|
|
|
def annotate_in_favorites(self, user):
|
|
"""Annotate flag in_favorites"""
|
|
favorite_news_ids = []
|
|
if user.is_authenticated:
|
|
favorite_news_ids = user.favorite_news_ids
|
|
return self.annotate(
|
|
in_favorites=Case(
|
|
When(id__in=favorite_news_ids, then=True),
|
|
default=False,
|
|
output_field=models.BooleanField(default=False)
|
|
)
|
|
)
|
|
|
|
def by_locale(self, locale):
|
|
return self.filter(title__icontains=locale)
|
|
|
|
def es_search(self, search_value: str, relevance_order=True):
|
|
from search_indexes.documents import NewsDocument
|
|
from search_indexes.utils import OBJECT_FIELD_PROPERTIES
|
|
search_value = search_value.lower()
|
|
search_fields = ('description', 'title', 'subtitle')
|
|
field_to_boost = {
|
|
'title': 3.0,
|
|
'subtitle': 2.0,
|
|
'description': 1.0,
|
|
}
|
|
search_keys = {}
|
|
for field in search_fields:
|
|
for locale in OBJECT_FIELD_PROPERTIES.keys():
|
|
search_keys.update({f'{field}.{locale}': field_to_boost[field]})
|
|
_query = None
|
|
for key, boost in search_keys.items():
|
|
if _query is None:
|
|
_query = elasticsearch_dsl.Q('match', **{
|
|
key: {
|
|
'query': search_value, 'fuzziness': 'auto:2,5',
|
|
'boost': boost
|
|
}
|
|
})
|
|
else:
|
|
_query |= elasticsearch_dsl.Q('match', **{
|
|
key: {
|
|
'query': search_value, 'fuzziness': 'auto:2,5',
|
|
'boost': boost,
|
|
}
|
|
})
|
|
_query |= elasticsearch_dsl.Q('wildcard', **{key: {'value': f'*{search_value}*', 'boost': boost + 30}})
|
|
search = NewsDocument.search().query('bool', should=_query)[0:10000].execute()
|
|
ids = [result.meta.id for result in search]
|
|
qs = self.filter(id__in=ids)
|
|
if relevance_order:
|
|
ids_order = enumerate(ids)
|
|
preserved = Case(*[When(pk=pk, then=pos) for pos, pk in ids_order])
|
|
qs = qs.order_by(preserved)
|
|
return qs
|
|
|
|
def trigram_search(self, search_value: str):
|
|
"""Search with mistakes by description or title or subtitle."""
|
|
return self.annotate(
|
|
description_str=Cast('description', models.TextField()),
|
|
title_str=Cast('title', models.TextField()),
|
|
subtitle_str=Cast('subtitle', models.TextField()),
|
|
search_contains_match=Case(
|
|
models.When(Q(description_str__icontains=search_value) | Q(title_str__icontains=search_value) | Q(
|
|
subtitle_str__icontains=search_value), then=100),
|
|
default=0,
|
|
output_field=models.FloatField(),
|
|
),
|
|
description_similarity=models.Case(
|
|
models.When(
|
|
Q(description__isnull=False),
|
|
then=TrigramSimilarity('description_str', search_value.lower()),
|
|
),
|
|
default=0,
|
|
output_field=models.FloatField()
|
|
),
|
|
title_similarity=models.Case(
|
|
models.When(
|
|
Q(title__isnull=False),
|
|
then=TrigramSimilarity('title_str', search_value.lower()),
|
|
),
|
|
default=0,
|
|
output_field=models.FloatField()
|
|
),
|
|
subtitle_similarity=models.Case(
|
|
models.When(
|
|
Q(subtitle__isnull=False),
|
|
then=TrigramSimilarity('subtitle_str', search_value.lower()),
|
|
),
|
|
default=0,
|
|
output_field=models.FloatField()
|
|
),
|
|
relevance=(F('search_contains_match') + F('description_similarity') + F('title_similarity') + F(
|
|
'subtitle_similarity'))
|
|
).filter(relevance__gte=0.3).order_by('-relevance')
|
|
|
|
def available_news(self, user, country_code: str):
|
|
"""Return QuerySet with news that user has access."""
|
|
return self.filter(site__country__code=country_code) if not user.is_superuser else self
|
|
|
|
|
|
class News(GalleryMixin, BaseAttributes, TranslatedFieldsMixin, HasTagsMixin,
|
|
FavoritesMixin):
|
|
"""News model."""
|
|
|
|
STR_FIELD_NAME = 'title'
|
|
|
|
# TEMPLATE CHOICES
|
|
NEWSPAPER = 0
|
|
MAIN_PDF_ERB = 1
|
|
MAIN = 2
|
|
|
|
TEMPLATE_CHOICES = (
|
|
(NEWSPAPER, 'newspaper'),
|
|
(MAIN_PDF_ERB, 'main.pdf.erb'),
|
|
(MAIN, 'main'),
|
|
)
|
|
|
|
# STATE CHOICES
|
|
PUBLISHED = 0
|
|
UNPUBLISHED = 1
|
|
|
|
PUBLISHED_STATES = [PUBLISHED]
|
|
|
|
STATE_CHOICES = (
|
|
(PUBLISHED, _('published')), # shown everywhere
|
|
(UNPUBLISHED, _('not published')), # newly created news
|
|
)
|
|
|
|
INTERNATIONAL_TAG_VALUE = 'international'
|
|
RECIPES_TAG_VALUE = 'cook'
|
|
|
|
old_id = models.PositiveIntegerField(_('old id'), blank=True, null=True, default=None)
|
|
news_type = models.ForeignKey(NewsType, on_delete=models.PROTECT,
|
|
verbose_name=_('news type'), related_name='news')
|
|
title = TJSONField(blank=True, null=True, default=None,
|
|
verbose_name=_('title'),
|
|
help_text='{"en-GB":"some text"}')
|
|
backoffice_title = models.TextField(null=True, default=None,
|
|
verbose_name=_('Title for searching via BO'))
|
|
subtitle = TJSONField(blank=True, null=True, default=None,
|
|
verbose_name=_('subtitle'),
|
|
help_text='{"en-GB":"some text"}')
|
|
description = TJSONField(blank=True, null=True, default=None,
|
|
verbose_name=_('description'),
|
|
help_text='{"en-GB":"some text"}')
|
|
locale_to_description_is_active = HStoreField(null=True, default=dict, blank=True,
|
|
verbose_name=_('Is description for certain locale active'),
|
|
help_text='{"en-GB": true, "fr-FR": false}')
|
|
publication_date = models.DateField(blank=True, null=True, verbose_name=_('News publication date'),
|
|
help_text=_('date since when news item is published'))
|
|
publication_time = models.TimeField(blank=True, null=True, verbose_name=_('News publication time'),
|
|
help_text=_('time since when news item is published'))
|
|
end = models.DateTimeField(blank=True, null=True, default=None,
|
|
verbose_name=_('End'))
|
|
slugs = HStoreField(null=True, blank=True, default=dict,
|
|
verbose_name=_('Slugs for current news obj'),
|
|
help_text='{"en-GB":"some slug"}')
|
|
state = models.PositiveSmallIntegerField(default=UNPUBLISHED, choices=STATE_CHOICES,
|
|
verbose_name=_('State'))
|
|
is_highlighted = models.BooleanField(default=False,
|
|
verbose_name=_('Is highlighted'))
|
|
template = models.PositiveIntegerField(choices=TEMPLATE_CHOICES, default=NEWSPAPER)
|
|
address = models.ForeignKey('location.Address', blank=True, null=True,
|
|
default=None, verbose_name=_('address'),
|
|
on_delete=models.SET_NULL)
|
|
country = models.ForeignKey('location.Country', blank=True, null=True,
|
|
on_delete=models.SET_NULL,
|
|
verbose_name=_('country'))
|
|
tags = models.ManyToManyField('tag.Tag', related_name='news',
|
|
verbose_name=_('Tags'))
|
|
gallery = models.ManyToManyField('gallery.Image', through='news.NewsGallery')
|
|
views_count = models.OneToOneField('rating.ViewCount', blank=True, null=True, on_delete=models.SET_NULL,
|
|
related_name='news')
|
|
ratings = generic.GenericRelation(Rating)
|
|
favorites = generic.GenericRelation(to='favorites.Favorites')
|
|
carousels = generic.GenericRelation(to='main.Carousel')
|
|
agenda = models.ForeignKey('news.Agenda', blank=True, null=True,
|
|
on_delete=models.SET_NULL,
|
|
verbose_name=_('agenda'))
|
|
|
|
banner = models.ForeignKey('news.NewsBanner', blank=True, null=True,
|
|
on_delete=models.SET_NULL,
|
|
verbose_name=_('banner'))
|
|
site = models.ForeignKey('main.SiteSettings', blank=True, null=True,
|
|
on_delete=models.SET_NULL, verbose_name=_('site settings'))
|
|
duplication_date = models.DateTimeField(blank=True, null=True, default=None,
|
|
verbose_name=_('Duplication datetime'))
|
|
duplication_uuid = models.UUIDField(default=uuid.uuid4, editable=True, unique=False,
|
|
verbose_name=_('Field to detect doubles'))
|
|
objects = NewsQuerySet.as_manager()
|
|
|
|
class Meta:
|
|
"""Meta class."""
|
|
|
|
verbose_name = _('news')
|
|
verbose_name_plural = _('news')
|
|
|
|
def __str__(self):
|
|
return f'news: {next(iter(self.slugs.values()))}'
|
|
|
|
def create_duplicate(self, new_country, view_count_model):
|
|
country_codes = list(Country.objects.all().values_list('code', flat=True))
|
|
all_slugs = {slug_value
|
|
for slug_dict in News.objects.all().values_list('slugs', flat=True)
|
|
for slug_value in slug_dict.values()}
|
|
|
|
new_slugs = {}
|
|
for locale, raw_slug in self.slugs.items():
|
|
slug = NewsSlug.parse(raw_slug, country_codes)
|
|
similar_slugs = sorted(x for x in all_slugs if NewsSlug.parse(x, country_codes).value == slug.value)
|
|
if len(similar_slugs) == 0:
|
|
# It is impossible because at least current instance has slug
|
|
raise ValueError('Duplicating unsaved object')
|
|
else:
|
|
last_slug = NewsSlug.parse(similar_slugs[-1], country_codes)
|
|
new_slug = NewsSlug(slug.value, new_country.code, last_slug.count)
|
|
if last_slug.country_code is not None:
|
|
new_slug.count += 1
|
|
|
|
new_slugs[locale] = str(new_slug)
|
|
|
|
self.pk = None
|
|
self.state = self.UNPUBLISHED
|
|
self.slugs = new_slugs
|
|
self.country = new_country
|
|
self.views_count = view_count_model
|
|
self.duplication_date = timezone.now()
|
|
self.save()
|
|
|
|
@property
|
|
def must_of_the_week(self) -> bool:
|
|
"""Detects whether current item in carousel"""
|
|
kwargs = {
|
|
'content_type': ContentType.objects.get_for_model(self),
|
|
'object_id': self.pk,
|
|
'country': self.country,
|
|
}
|
|
return Carousel.objects.filter(**kwargs).exists()
|
|
|
|
@property
|
|
def publication_datetime(self):
|
|
"""Represents datetime object combined from `publication_date` & `publication_time` fields"""
|
|
try:
|
|
return datetime.combine(date=self.publication_date, time=self.publication_time)
|
|
except TypeError:
|
|
return None
|
|
|
|
@property
|
|
def duplicates(self):
|
|
"""Duplicates for this news item excluding same country code labeled"""
|
|
return News.objects.filter(duplication_uuid=self.duplication_uuid).exclude(country=self.country)
|
|
|
|
@property
|
|
def has_any_desc_active(self):
|
|
"""Detects whether news item has any active description"""
|
|
return any(list(map(lambda v: v.lower() == 'true' if isinstance(v, str) else v,
|
|
self.locale_to_description_is_active.values())))
|
|
|
|
@property
|
|
def is_publish(self):
|
|
return self.state in self.PUBLISHED_STATES
|
|
|
|
@property
|
|
def is_international(self):
|
|
return self.INTERNATIONAL_TAG_VALUE in map(lambda tag: tag.value, self.tags.all())
|
|
|
|
@property
|
|
def web_url(self):
|
|
try:
|
|
return reverse('web:news:rud', kwargs={'slug': next(iter(self.slugs.values()))})
|
|
except NoReverseMatch as e:
|
|
return None # no active links
|
|
|
|
def should_read(self, user):
|
|
return self.__class__.objects.should_read(self, user)[:3]
|
|
|
|
def same_theme(self, user):
|
|
return self.__class__.objects.same_theme(self, user)[:3]
|
|
|
|
@property
|
|
def main_image(self):
|
|
qs = self.news_gallery.main_image()
|
|
image_model = qs.order_by('-id').first()
|
|
if image_model is not None:
|
|
return image_model.image
|
|
|
|
@property
|
|
def image_url(self):
|
|
return self.main_image.image.url if self.main_image else None
|
|
|
|
@property
|
|
def preview_image_url(self):
|
|
if self.main_image:
|
|
return self.main_image.get_image_url(thumbnail_key='news_preview')
|
|
|
|
@property
|
|
def view_counter(self):
|
|
count_value = 0
|
|
if self.views_count:
|
|
count_value = self.views_count.count
|
|
return count_value
|
|
|
|
# todo: remove in future
|
|
@property
|
|
def crop_gallery(self):
|
|
if hasattr(self, 'gallery'):
|
|
gallery = []
|
|
images = self.gallery.all()
|
|
model_name = self._meta.model_name.lower()
|
|
crop_parameters = [p for p in settings.SORL_THUMBNAIL_ALIASES
|
|
if p.startswith(model_name)]
|
|
for image in images:
|
|
d = {
|
|
'id': image.id,
|
|
'title': image.title,
|
|
'original_url': image.image.url,
|
|
'orientation_display': image.get_orientation_display(),
|
|
'auto_crop_images': {},
|
|
}
|
|
for crop in crop_parameters:
|
|
d['auto_crop_images'].update(
|
|
{f'{crop[len(f"{model_name}_"):]}_url': image.get_image_url(crop)})
|
|
gallery.append(d)
|
|
return gallery
|
|
|
|
@property
|
|
def crop_main_image(self):
|
|
if hasattr(self, 'main_image') and self.main_image:
|
|
image = self.main_image
|
|
model_name = self._meta.model_name.lower()
|
|
image_property = {
|
|
'id': image.id,
|
|
'title': image.title,
|
|
'original_url': image.image.url,
|
|
'orientation_display': image.get_orientation_display(),
|
|
'auto_crop_images': {},
|
|
}
|
|
crop_parameters = [p for p in settings.SORL_THUMBNAIL_ALIASES
|
|
if p.startswith(self._meta.model_name.lower())]
|
|
for crop in crop_parameters:
|
|
image_property['auto_crop_images'].update(
|
|
{f'{crop[len(f"{model_name}_"):]}_url': image.get_image_url(crop)})
|
|
return image_property
|
|
|
|
@property
|
|
def descriptions(self):
|
|
"""Read-only list field for backoffice news representation"""
|
|
return [{
|
|
'locale': locale,
|
|
'slug': (self.slugs or {}).get(locale),
|
|
'status': 'active' if (self.locale_to_description_is_active or {}).get(locale) else 'inactive',
|
|
'title': (self.title or {}).get(locale),
|
|
'text': desc
|
|
} for locale, desc in self.description.items()] if self.description else []
|
|
|
|
|
|
class NewsGallery(IntermediateGalleryModelMixin):
|
|
news = models.ForeignKey(News, null=True,
|
|
related_name='news_gallery',
|
|
on_delete=models.CASCADE,
|
|
verbose_name=_('news'))
|
|
image = models.ForeignKey('gallery.Image', null=True,
|
|
related_name='news_gallery',
|
|
on_delete=models.CASCADE,
|
|
verbose_name=_('gallery'))
|
|
|
|
class Meta:
|
|
"""NewsGallery meta class."""
|
|
verbose_name = _('news gallery')
|
|
verbose_name_plural = _('news galleries')
|
|
unique_together = [['news', 'image'], ]
|