import calendar from datetime import timedelta import requests from django.conf import settings from django.db.models import F, Count, Sum from django_filters.rest_framework import DjangoFilterBackend from rest_framework import permissions, mixins, status, viewsets from rest_framework.decorators import action from rest_framework.exceptions import NotFound from rest_framework.filters import SearchFilter from rest_framework.permissions import AllowAny, IsAuthenticated from rest_framework.response import Response from external_api.cdek import CDEKClient, CDEKWebhookTypes, CDEK_STATUS_TO_ORDER_STATUS from external_api.poizon import PoizonClient from utils.exceptions import CRMException, Forbidden from store.filters import GiftFilter, ChecklistFilter from store.models import Checklist, Category, PaymentMethod, Promocode, Gift from core.models import GlobalSettings from store.serializers import (ChecklistSerializer, CategorySerializer, CategoryFullSerializer, PaymentMethodSerializer, PromocodeSerializer, ClientUpdateChecklistSerializer, GiftSerializer, ClientCreateChecklistSerializer, ChecklistListSerializer) from account.permissions import ReadOnly, IsManager, IsAdmin def prepare_external_response(r: requests.Response): data = {"status_code": r.status_code, "response": None} try: data["response"] = r.json() except: data["response"] = r.text return Response(data) """ - managers can create/edit/delete orders - auth users can create/edit orders (managers and clients) - client will have customer field auto-populated - managers can set customer field manually - clients can edit orders with customer.id == self.id - anon users can only get order by id, but can't edit """ class ChecklistAPI(viewsets.ModelViewSet): serializer_class = ChecklistSerializer lookup_field = 'id' filterset_class = ChecklistFilter filter_backends = [DjangoFilterBackend, SearchFilter] search_fields = ['id', 'poizon_tracking', 'buyer_phone'] # TODO: search by full_price def permission_denied(self, request, **kwargs): if request.user.is_authenticated and self.action in ["update", "partial_update", "list", "retrieve"]: raise NotFound() super().permission_denied(request, **kwargs) def get_serializer_class(self): # Managers have a full set of fields for editing if getattr(self.request.user, 'is_manager', False) or self.action == 'retrieve': return ChecklistSerializer # Clients can create drafts with small set of fields if self.action == "create": return ClientCreateChecklistSerializer # Then, clients can update small set of fields and cancel orders elif self.action in ['update', 'partial_update', 'destroy']: return ClientUpdateChecklistSerializer # Simplified list serializer elif self.action == "list": return ChecklistListSerializer # Fallback to error self.permission_denied(self.request, **self.kwargs) def get_permissions(self): if self.action == 'retrieve': self.permission_classes = [AllowAny] elif self.action in ['create', 'list', 'update', 'partial_update', 'destroy']: self.permission_classes = [IsAuthenticated] return super().get_permissions() def perform_destroy(self, obj: Checklist): # Non-managers can cancel orders only in certain statuses if (not getattr(self.request.user, 'is_manager', False) and obj.status not in Checklist.Status.CANCELLABLE_ORDER_STATUSES): raise Forbidden("Can't delete the order") obj.cancel() def get_queryset(self): qs = Checklist.objects.with_base_related() \ .annotate_bonus_used() \ .default_ordering() # Non-managers can list only their own orders if not getattr(self.request.user, 'is_manager', False): qs = qs.filter(customer_id=self.request.user.id) return qs def get_object(self): obj: Checklist = super().get_object() # N time maximum in 'neworder' status -> move to drafts if obj.status == Checklist.Status.NEW and obj.buy_time_remaining is not None: if obj.buy_time_remaining <= timedelta(): obj.status = Checklist.Status.DRAFT obj.save() return obj class CategoryAPI(mixins.ListModelMixin, mixins.RetrieveModelMixin, mixins.UpdateModelMixin, viewsets.GenericViewSet): serializer_class = CategorySerializer permission_classes = [IsManager | ReadOnly] lookup_field = 'id' queryset = Category.objects.all() def list(self, request, *args, **kwargs): categories_qs = Category.objects.root_nodes() return Response(CategoryFullSerializer(categories_qs, many=True).data) class PaymentMethodsAPI(mixins.ListModelMixin, mixins.UpdateModelMixin, viewsets.GenericViewSet): serializer_class = PaymentMethodSerializer permission_classes = [IsAdmin | ReadOnly] lookup_field = 'slug' queryset = PaymentMethod.objects.all() pagination_class = None class PromoCodeAPI(viewsets.ModelViewSet): serializer_class = PromocodeSerializer permission_classes = [IsManager] lookup_field = 'name' def get_queryset(self): return Promocode.objects.all() def perform_destroy(self, instance): instance.is_active = False instance.save() return Response(status=status.HTTP_204_NO_CONTENT) class GiftAPI(viewsets.ModelViewSet): serializer_class = GiftSerializer permission_classes = [IsManager | ReadOnly] filterset_class = GiftFilter def get_queryset(self): if getattr(self.request.user, 'is_manager', False): return Gift.objects.all() # For anonymous users, show only available gifts return Gift.objects.filter(available_count__gt=0) class StatisticsAPI(viewsets.GenericViewSet): permission_classes = [IsAdmin] def get_queryset(self): return Checklist.objects.all() \ .select_related('customer') \ .filter(status=Checklist.Status.COMPLETED) \ .annotate(month=F('status_updated_at__month')) # FIXME: stats_by_orders is broken because of _commission_rub annotation @action(url_path='orders', detail=False, methods=['get']) def stats_by_orders(self, request, *args, **kwargs): global_settings = GlobalSettings.load() yuan_rate = global_settings.full_yuan_rate # Prepare query to collect the stats qs = self.get_queryset() \ .values('month') \ .annotate(total=Count('id'), total_completed=Count('id'), total_bought_yuan=Sum('real_price', default=0), total_bought_rub=F('total_bought_yuan') * yuan_rate, total_commission_rub=Sum('_commission_rub', default=0) ) def _create_stats(data: dict): return { "CountOrders": data.get('total', 0), "CountComplete": data.get('total_completed', 0), "SumCommission": data.get('total_commission_rub', 0), "SumOrders1": data.get('total_bought_yuan', 0), "SumOrders2": data.get('total_bought_rub', 0), } result = {} # Add empty stats for i in range(1, 13): month_name = calendar.month_name[i] result[month_name] = _create_stats(dict()) # Add actual stats for stat in qs: month_name = calendar.month_name[stat['month']] result[month_name] = _create_stats(stat) return Response(result) # FIXME: stats_by_categories is broken because of absence of Category.slug field @action(url_path='categories', detail=False, methods=['get']) def stats_by_categories(self, request, *args, **kwargs): all_categories = Category.objects.values_list('slug', flat=True) categories_dict = {c: 0 for c in all_categories} qs = self.get_queryset() \ .select_related('category') \ .values('month', 'category__slug') \ .annotate(total_orders=Count('id')) result = {} # Add empty stats for i in range(1, 13): month = calendar.month_name[i] result[month] = categories_dict.copy() # Add actual stats for stat in qs: month = calendar.month_name[stat['month']] category_slug = stat['category__slug'] total_orders = stat['total_orders'] result[month][category_slug] = total_orders return Response(result) @action(url_path='clients', detail=False, methods=['get']) def stats_by_clients(self, request, *args, **kwargs): options = { "moreone": 1, "moretwo": 2, "morethree": 3, "morefour": 4, "morefive": 5, "moreten": 10, "moretwentyfive": 25, "morefifty": 50, } def _create_empty_stats(): return {k: [] for k in options.keys()} qs = self.get_queryset() \ .filter(customer__isnull=False) \ .values('month', 'customer_id') \ .annotate(order_count=Count('id')) \ .filter(order_count__gt=1) \ .order_by('month') # mapping customer_id -> customer info customer_mapping = {} for order in Checklist.objects.all().select_related('customer'): customer_mapping[order.customer_id] = { 'phone': str(order.customer.phone), 'name': order.customer.first_name, 'telegram': order.customer.telegram } result = {} # Add empty stats for i in range(1, 13): month_name = calendar.month_name[i] result[month_name] = _create_empty_stats() # Add actual stats for order in qs: month_name = calendar.month_name[order['month']] # Collect data for each order count: 1/2/3/4/5/10/25/50 for key, size in reversed(options.items()): if order['order_count'] > size: client_info = customer_mapping[order['customer_id']] client_info["order_count"] = order['order_count'] result[month_name][key].append(client_info) break return Response(result) # TODO: review permissions class CDEKAPI(viewsets.GenericViewSet): client = CDEKClient(settings.CDEK_CLIENT_ID, settings.CDEK_CLIENT_SECRET) permission_classes = [permissions.AllowAny] @action(url_path='orders', detail=False, methods=['get']) def get_order_info(self, request, *args, **kwargs): im_number = request.query_params.get('im_number') if not im_number: raise CRMException('im_number is required') r = self.client.get_order_info(im_number) return prepare_external_response(r) @get_order_info.mapping.post def create_order(self, request, *args, **kwargs): order_data = request.data if not order_data: raise CRMException('json data is required') r = self.client.create_order(order_data) return prepare_external_response(r) @get_order_info.mapping.patch def edit_order(self, request, *args, **kwargs): order_data = request.data if not order_data: raise CRMException('json data is required') r = self.client.edit_order(order_data) return prepare_external_response(r) @action(url_path='calculator/tariff', detail=False, methods=['post']) def calculate_tariff(self, request, *args, **kwargs): data = request.data if not data: raise CRMException('json data is required') r = self.client.calculate_tariff(data) return prepare_external_response(r) @action(url_path='calculator/tarifflist', detail=False, methods=['post']) def calculate_tarifflist(self, request, *args, **kwargs): data = request.data if not data: raise CRMException('json data is required') r = self.client.calculate_tarifflist(data) return prepare_external_response(r) @action(url_path=f'webhook/{settings.CDEK_WEBHOOK_URL_SALT}', detail=False, methods=['post']) def webhook(self, request, *args, **kwargs): data = request.data response = Response() match data.get("type"): case CDEKWebhookTypes.ORDER_STATUS: cdek_number, cdek_status = self.client.process_orderstatus_webhook(data) if cdek_number is None or cdek_status is None: return response order = Checklist.objects.filter(cdek_tracking=cdek_number).first() if order is None: return response # New status or old one new_order_status = CDEK_STATUS_TO_ORDER_STATUS.get(cdek_status, order.status) # Update status if order.status != new_order_status: print(f'Order [{order.id}] status: {order.status} -> {new_order_status}') order.status = new_order_status order.save() case _: pass return response # TODO: review permissions class PoizonAPI(viewsets.GenericViewSet): client = PoizonClient(settings.POIZON_TOKEN) permission_classes = [permissions.AllowAny] @action(url_path='good', detail=False, methods=['post']) def get_good_info(self, request, *args, **kwargs): spu_id = None if 'url' in request.data: spu_id = self.client.get_spu_id(request.data.get('url')) if 'spuId' in request.data: spu_id = request.data.get('spuId') if spu_id is None: raise CRMException('url or spuId is required') r = self.client.get_good_info(spu_id) return prepare_external_response(r)