51 lines
1.7 KiB
Python
51 lines
1.7 KiB
Python
"""SearchIndex tasks."""
|
|
import logging
|
|
from django.db import models
|
|
from celery.schedules import crontab
|
|
from celery.task import periodic_task
|
|
from django_elasticsearch_dsl.registries import registry
|
|
from django_redis import get_redis_connection
|
|
from establishment.models import Establishment
|
|
from news.models import News
|
|
from product.models import Product
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@periodic_task(run_every=crontab(minute=1))
|
|
def update_index():
|
|
"""Updates ES index."""
|
|
try:
|
|
cn = get_redis_connection('es_queue')
|
|
for model in [Establishment, News, Product]:
|
|
model_name = model.__name__.lower()
|
|
while True:
|
|
ids = cn.spop(model_name, 500)
|
|
if not ids:
|
|
break
|
|
qs = model.objects.filter(id__in=ids)
|
|
try:
|
|
doc = registry.get_documents([model]).pop()
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
doc().update(qs)
|
|
except Exception as ex:
|
|
logger.error(f'Updating index failed: {ex}')
|
|
|
|
|
|
def es_update(obj):
|
|
"""Adds object to set of objects for indexing."""
|
|
try:
|
|
cn = get_redis_connection('es_queue')
|
|
allowed_models = [Establishment, News, Product]
|
|
if isinstance(obj, models.QuerySet) and obj.model in allowed_models:
|
|
key = obj.model.__name__.lower()
|
|
cn.sadd(key, *obj.values_list('id', flat=True))
|
|
elif isinstance(obj, models.Model) and obj.__class__ in allowed_models:
|
|
key = obj.__class__.__name__.lower()
|
|
cn.sadd(key, obj.id)
|
|
except Exception as ex:
|
|
logger.warning(f'Send obj to ES failed: {ex}')
|