"""Project custom permissions""" from rest_framework import permissions from rest_framework.permissions import SAFE_METHODS as SAFE_HTTP_METHODS from rest_framework_simplejwt.tokens import AccessToken from account.models import UserRole, Role from authorization.models import JWTRefreshToken from establishment.models import Establishment from product.models import Product from utils.tokens import GMRefreshToken class IsAuthenticatedAndTokenIsValid(permissions.BasePermission): """ Check if user has a valid token and authenticated """ def has_permission(self, request, view): """Check permissions by access token and default REST permission IsAuthenticated""" user = request.user access_token = request.COOKIES.get('access_token') if user.is_authenticated and access_token: access_token = AccessToken(access_token) valid_tokens = user.access_tokens.valid() \ .by_jti(jti=access_token.payload.get('jti')) return valid_tokens.exists() else: return False class IsRefreshTokenValid(permissions.BasePermission): """ Check if user has a valid refresh token and authenticated """ def has_permission(self, request, view): """Check permissions by refresh token and default REST permission IsAuthenticated""" refresh_token = request.COOKIES.get('refresh_token') if refresh_token: refresh_token = GMRefreshToken(refresh_token) refresh_token_qs = JWTRefreshToken.objects.valid() \ .by_jti(jti=refresh_token.payload.get('jti')) return refresh_token_qs.exists() else: return False def has_object_permission(self, request, view, obj): # Read permissions are allowed to all request, # so we'll always allow GET, HEAD or OPTIONS requests. if request.method in SAFE_HTTP_METHODS or \ obj.user == request.user or request.user.is_superuser: return True return False class IsGuest(permissions.BasePermission): """ Object-level permission to only allow owners of an object to edit it. """ def has_permission(self, request, view): rules = [ request.user.is_anonymous, request.method in SAFE_HTTP_METHODS ] return all(rules) class IsReadOnly(permissions.BasePermission): """ Allows getting access to resource only if request method in SAFE_METHODs. """ def has_permission(self, request, view): return request.method in SAFE_HTTP_METHODS class IsApprovedUser(IsAuthenticatedAndTokenIsValid): """ Object-level permission to only allow owners of an object to edit it. Assumes the model instance has an `owner` attribute. """ def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] has_permission = False if request.user.is_authenticated: has_permission = request.user.email_confirmed rules.append(has_permission) return all(rules) class IsContentPageManager(IsApprovedUser): """ Object-level permission to only allow owners of an object to edit it. Assumes the model instance has an `owner` attribute. """ def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] has_permission = False if (request.user.is_authenticated and hasattr(request, 'country_code') and request.country_code): role = Role.objects.filter( role=Role.CONTENT_PAGE_MANAGER, site__country__code=request.country_code, ).only('id') if role.exists(): user_role = UserRole.objects.validated().filter( user=request.user, role__id__in=role.values_list('id', flat=True), ).only('id') has_permission = True if user_role.exists() else has_permission rules.append(has_permission) return all(rules) class IsCountryAdmin(IsApprovedUser): """ Object-level permission to only allow owners of an object to edit it. Assumes the model instance has an `owner` attribute. """ def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] has_permission = False if (request.user.is_authenticated and hasattr(request, 'country_code') and request.country_code): role = Role.objects.filter( role=Role.COUNTRY_ADMIN, site__country__code=request.country_code, ).only('id') if role.exists(): user_role = UserRole.objects.validated().filter( user=request.user, role__id__in=role.values_list('id', flat=True) ).only('id') has_permission = True if user_role.exists() else has_permission rules.append(has_permission) return all(rules) class IsModerator(IsApprovedUser): """ Object-level permission to only allow owners of an object to edit it. Assumes the model instance has an `owner` attribute. """ def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] has_permission = False if (request.user.is_authenticated and hasattr(request, 'country_code') and request.country_code): role = Role.objects.filter( role=Role.MODERATOR, site__country__code=request.country_code, ).only('id') if role.exists(): user_role = UserRole.objects.validated().filter( user=request.user, role__id__in=role.values_list('id', flat=True), ).only('id') has_permission = True if user_role.exists() else has_permission rules.append(has_permission) return all(rules) class IsEstablishmentManager(IsApprovedUser): def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] has_permission = False if (request.user.is_authenticated and hasattr(request, 'country_code') and request.country_code): user = request.user role = Role.objects.filter( role=Role.ESTABLISHMENT_MANAGER, site__country__code=request.country_code, ).only('id') if role.exists(): user_role = UserRole.objects.validated().filter( user=user, role__id__in=role.values_list('id', flat=True), ).only('id') has_permission = True if user_role.exists() else has_permission rules.append(has_permission) return all(rules) class IsEstablishmentAdministrator(IsApprovedUser): def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] has_permission = False if (request.user.is_authenticated and hasattr(request, 'country_code') and request.country_code): role = Role.objects.filter( role=Role.ESTABLISHMENT_ADMINISTRATOR, site__country__code=request.country_code, ).only('id') if role.exists(): user_role = UserRole.objects.validated().filter( user=request.user, role__id__in=role.values_list('id', flat=True), ).only('id') has_permission = True if user_role.exists() else has_permission rules.append(has_permission) return bool(request.method in SAFE_HTTP_METHODS or all(rules)) def has_object_permission(self, request, view, obj): rules = [ super().has_object_permission(request, view, obj) ] has_object_permission = False role = Role.objects.filter(role=Role.ESTABLISHMENT_ADMINISTRATOR).only('id') if request.user.is_authenticated and role.exists() and hasattr(obj, 'id'): filters = { 'user': request.user, 'role__id__in': role.values_list('id', flat=True), } if isinstance(obj, Establishment): filters.update({'establishment__id': obj.id}) if isinstance(obj, Product): filters.update({'establishment__products__id': obj.id}) user_role = UserRole.objects.validated().filter(**filters) has_object_permission = True if user_role.exists() else has_object_permission rules.append(has_object_permission) return all(rules) class IsReviewManager(IsApprovedUser): MODEL_PERMISSIONS = { 'READ': ['establishment', 'product_type', 'news', 'recipe', 'user', ], 'WRITE': ['inquiries', 'userrole', 'review', 'establishment', 'product', 'news', 'recipe', ] } def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] has_permission = False if (request.user.is_authenticated and hasattr(request, 'country_code') and request.country_code): role = Role.objects.filter( role=Role.REVIEW_MANAGER, site__country__code=request.country_code, ).only('id') if role.exists(): user_role = UserRole.objects.validated().filter( user=request.user, role__id__in=role.values_list('id', flat=True), ).only('id') if user_role.exists(): # check model for read model_name = view.get_queryset().model._meta.model_name if ((model_name in self.MODEL_PERMISSIONS.get('READ', []) and request.method in SAFE_HTTP_METHODS) or (model_name in self.MODEL_PERMISSIONS.get('WRITE', []))): has_permission = True rules.append(has_permission) return all(rules) class IsRestaurantInspector(IsApprovedUser): MODEL_PERMISSIONS = { 'READ': ['establishment', ], 'WRITE': ['inquiries', ] } def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] has_permission = False if (request.user.is_authenticated and hasattr(request, 'country_code') and request.country_code): role = Role.objects.filter( role=Role.RESTAURANT_INSPECTOR, site__country__code=request.country_code, ).only('id') if role.exists(): user_role = UserRole.objects.validated().filter( user=request.user, role__id__in=role.values_list('id', flat=True), ).only('id') if user_role.exists(): # check model for read model_name = view.get_queryset().model._meta.model_name if ((model_name in self.MODEL_PERMISSIONS.get('READ', []) and request.method in SAFE_HTTP_METHODS) or (model_name in self.MODEL_PERMISSIONS.get('WRITE', []))): has_permission = True rules.append(has_permission) return all(rules) class IsArtisanInspector(IsApprovedUser): MODEL_PERMISSIONS = { 'READ': ['establishment', ], 'WRITE': ['inquiries', ] } def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] has_permission = False if (request.user.is_authenticated and hasattr(request, 'country_code') and request.country_code): role = Role.objects.filter( role=Role.ARTISAN_INSPECTOR, site__country__code=request.country_code, ).only('id') if role.exists(): user_role = UserRole.objects.validated().filter( user=request.user, role__id__in=role.values_list('id', flat=True), ).only('id') if user_role.exists(): # check model for read model_name = view.get_queryset().model._meta.model_name if ((model_name in self.MODEL_PERMISSIONS.get('READ', []) and request.method in SAFE_HTTP_METHODS) or (model_name in self.MODEL_PERMISSIONS.get('WRITE', []))): has_permission = True rules.append(has_permission) return all(rules) class IsWineryWineInspector(IsApprovedUser): MODEL_PERMISSIONS = { 'READ': ['establishment', 'product', ], 'WRITE': ['inquiries', ] } def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] has_permission = False if (request.user.is_authenticated and hasattr(request, 'country_code') and request.country_code): role = Role.objects.filter( role=Role.WINERY_WINE_INSPECTOR, site__country__code=request.country_code, ).only('id') if role.exists(): user_role = UserRole.objects.validated().filter( user=request.user, role__id__in=role.values_list('id', flat=True), ).only('id') if user_role.exists(): # check model for read model_name = view.get_queryset().model._meta.model_name if ((model_name in self.MODEL_PERMISSIONS.get('READ', []) and request.method in SAFE_HTTP_METHODS) or (model_name in self.MODEL_PERMISSIONS.get('WRITE', []))): has_permission = True rules.append(has_permission) return all(rules) class IsProducerFoodInspector(IsApprovedUser): MODEL_PERMISSIONS = { 'READ': ['establishment', 'product', ], 'WRITE': ['inquiries', ] } def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] has_permission = False if (request.user.is_authenticated and hasattr(request, 'country_code') and request.country_code): role = Role.objects.filter( role=Role.PRODUCER_FOOD_INSPECTOR, site__country__code=request.country_code, ).only('id') if role.exists(): user_role = UserRole.objects.validated().filter( user=request.user, role__id__in=role.values_list('id', flat=True), ).only('id') if user_role.exists(): # check model for read model_name = view.get_queryset().model._meta.model_name if ((model_name in self.MODEL_PERMISSIONS.get('READ', []) and request.method in SAFE_HTTP_METHODS) or (model_name in self.MODEL_PERMISSIONS.get('WRITE', []))): has_permission = True rules.append(has_permission) return all(rules) class IsDistilleryLiquorInspector(IsApprovedUser): MODEL_PERMISSIONS = { 'READ': ['establishment', 'product', ], 'WRITE': ['inquiries', ] } def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] has_permission = False if (request.user.is_authenticated and hasattr(request, 'country_code') and request.country_code): role = Role.objects.filter( role=Role.DISTILLERY_LIQUOR_INSPECTOR, site__country__code=request.country_code, ).only('id') if role.exists(): user_role = UserRole.objects.validated().filter( user=request.user, role__id__in=role.values_list('id', flat=True), ).only('id') if user_role.exists(): # check model for read model_name = view.get_queryset().model._meta.model_name if ((model_name in self.MODEL_PERMISSIONS.get('READ', []) and request.method in SAFE_HTTP_METHODS) or (model_name in self.MODEL_PERMISSIONS.get('WRITE', []))): has_permission = True rules.append(has_permission) return all(rules)