from django_filters.rest_framework import DjangoFilterBackend from rest_framework import generics, permissions, status from rest_framework.response import Response from rest_framework.filters import OrderingFilter import csv from django.http import HttpResponse, HttpResponseNotFound from rest_framework.authtoken.models import Token from account import models, filters from account.models import User from account.serializers import back as serializers from account.serializers.common import RoleBaseSerializer class RoleListView(generics.ListCreateAPIView): serializer_class = RoleBaseSerializer queryset = models.Role.objects.all() filter_class = filters.RoleListFilter class RoleChoiceListView(generics.GenericAPIView): """Return role choices.""" def get(self, request, *args, **kwargs): """Implement GET-method""" return Response(models.Role.role_types(), status=status.HTTP_200_OK) class RoleTabRetrieveView(generics.GenericAPIView): permission_classes = [permissions.IsAdminUser] def get_queryset(self): """Overridden get_queryset method.""" additional_filters = {} if (self.request.user.userrole_set.country_admin_role().exists() and hasattr(self.request, 'country_code')): additional_filters.update({'country__code': self.request.country_code}) return models.Role.objects.filter(**additional_filters)\ .annotate_role_name()\ .values('role_name')\ .annotate_role_counter()\ .values('role_name', 'role_counter') def get(self, request, *args, **kwargs): """Implement GET-method""" data = list(self.get_queryset()) # todo: Need refactoring. Extend data list with non-existed role. for role in models.Role.role_names(): if role not in [role.get('role_name') for role in data]: data.append({'role_name': role, 'role_number': 0}) return Response(data, status=status.HTTP_200_OK) class UserRoleListView(generics.ListCreateAPIView): serializer_class = serializers.UserRoleSerializer queryset = models.UserRole.objects.all() class UserListView(generics.ListCreateAPIView): """User list create view.""" queryset = User.objects.prefetch_related('roles', 'subscriber') serializer_class = serializers.BackUserSerializer permission_classes = (permissions.IsAdminUser,) filter_class = filters.AccountBackOfficeFilter filter_backends = (OrderingFilter, DjangoFilterBackend) ordering_fields = ( 'email_confirmed', 'is_staff', 'is_active', 'is_superuser', 'last_login', 'date_joined', ) class UserRUDView(generics.RetrieveUpdateDestroyAPIView): """User RUD view.""" queryset = User.objects.all() serializer_class = serializers.BackDetailUserSerializer permission_classes = (permissions.IsAdminUser,) lookup_field = 'id' def get_user_csv(request, id): """User CSV file download""" # fields = ["id", "uuid", "nickname", "locale", "country_code", "city", "role", "consent_purpose", "consent_at", # "last_seen_at", "created_at", "updated_at", "email", "is_admin", "ezuser_id", "ez_user_id", # "encrypted_password", "reset_password_token", "reset_password_sent_at", "remember_created_at", # "sign_in_count", "current_sign_in_at", "last_sign_in_at", "current_sign_in_ip", "last_sign_in_ip", # "confirmation_token", "confirmed_at", "confirmation_sent_at", "unconfirmed_email", "webpush_subscription"] # uuid == id # # Не найдены: # consent_purpose # consent_at # ezuser_id # ez_user_id # remember_created_at # sign_in_count # current_sign_in_at # current_sign_in_ip # last_sign_in_ip # confirmed_at # confirmation_sent_at # webpush_subscription # # country_code не получить - клиент не привязан к стране try: user = User.objects.get(id=id) except User.DoesNotExist: return HttpResponseNotFound("User not found") try: roles = " ".join([role for role in user.roles]) except: roles = "" token, _ = Token.objects.get_or_create(user=user) fields = { "id": user.id, "uuid": user.id, "username": getattr(user, "username", ""), "locale": getattr(user, "locale", ""), "city": getattr(user, "city", ""), "role": roles, "created_at": getattr(user, "date_joined", ""), "updated_at": user.last_login, "email": user.email, "is_admin": user.is_superuser, "encrypted_password": user.password, "reset_password_token": token.key, "reset_password_sent_at": token.created, # TODO: не уверен в назначении поля, лучше проверить "last_sign_in_at": user.last_login, # Повтор? "confirmation_token": user.confirm_email_token, "unconfirmed_email": 1 if user.unconfirmed_email else 0 } response = HttpResponse(content_type='text/csv') response['Content-Disposition'] = f'attachment; filename="{user.email}.csv"' writer = csv.writer(response) writer.writerow(fields.keys()) writer.writerow(fields.values()) return response