"""Project custom permissions""" from django.contrib.contenttypes.models import ContentType from rest_framework import permissions from rest_framework_simplejwt.tokens import AccessToken from account.models import UserRole, Role from authorization.models import JWTRefreshToken from utils.tokens import GMRefreshToken from establishment.models import EstablishmentSubType from location.models import Address from product.models import Product class IsAuthenticatedAndTokenIsValid(permissions.BasePermission): """ Check if user has a valid token and authenticated """ def has_permission(self, request, view): """Check permissions by access token and default REST permission IsAuthenticated""" user = request.user access_token = request.COOKIES.get('access_token') if user.is_authenticated and access_token: access_token = AccessToken(access_token) valid_tokens = user.access_tokens.valid() \ .by_jti(jti=access_token.payload.get('jti')) return valid_tokens.exists() else: return False class IsRefreshTokenValid(permissions.BasePermission): """ Check if user has a valid refresh token and authenticated """ def has_permission(self, request, view): """Check permissions by refresh token and default REST permission IsAuthenticated""" refresh_token = request.COOKIES.get('refresh_token') if refresh_token: refresh_token = GMRefreshToken(refresh_token) refresh_token_qs = JWTRefreshToken.objects.valid() \ .by_jti(jti=refresh_token.payload.get('jti')) return refresh_token_qs.exists() else: return False def has_object_permission(self, request, view, obj): # Read permissions are allowed to any request, # so we'll always allow GET, HEAD or OPTIONS requests. if request.method in permissions.SAFE_METHODS or \ obj.user == request.user or request.user.is_superuser: return True return False class IsGuest(permissions.IsAuthenticatedOrReadOnly): """ Object-level permission to only allow owners of an object to edit it. """ SAFE_METHODS = ('GET', 'HEAD', 'OPTIONS') def has_permission(self, request, view): rules = [ request.user.is_superuser, request.method in permissions.SAFE_METHODS ] return any(rules) def has_object_permission(self, request, view, obj): rules = [ request.user.is_superuser, request.method in permissions.SAFE_METHODS ] return any(rules) class IsStandardUser(IsGuest): """ Object-level permission to only allow owners of an object to edit it. Assumes the model instance has an `owner` attribute. """ def has_permission(self, request, view): rules = [super().has_permission(request, view), request.user.is_authenticated, hasattr(request, 'user') ] return any(rules) def has_object_permission(self, request, view, obj): # Read permissions are allowed to any request rules = [super().has_object_permission(request, view, obj), request.user.is_authenticated, hasattr(request, 'user') ] return any(rules) class IsContentPageManager(IsStandardUser): """ Object-level permission to only allow owners of an object to edit it. Assumes the model instance has an `owner` attribute. """ def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] if hasattr(request, 'user'): if hasattr(request.data, 'site_id'): role = Role.objects.filter(role=Role.CONTENT_PAGE_MANAGER, site_id=request.data.site_id,) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role).exists(), super().has_permission(request, view) ] elif hasattr(request.data, 'country_id'): role = Role.objects.filter(role=Role.CONTENT_PAGE_MANAGER, country_id=request.data.country_id) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role).exists(), super().has_permission(request, view) ] return any(rules) def has_object_permission(self, request, view, obj): # Read permissions are allowed to any request. if hasattr(obj, 'site_id'): role = Role.objects.filter(role=Role.CONTENT_PAGE_MANAGER, site_id=obj.site_id) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role).exists(), super().has_object_permission(request, view, obj) ] elif hasattr(obj, 'country_id'): role = Role.objects.filter(role=Role.CONTENT_PAGE_MANAGER, country_id=obj.country_id) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role).exists(), super().has_object_permission(request, view, obj) ] return any(rules) class IsCountryAdmin(IsStandardUser): """ Object-level permission to only allow owners of an object to edit it. Assumes the model instance has an `owner` attribute. """ def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] # and request.user.email_confirmed, if hasattr(request.data, 'user'): if hasattr(request.data, 'site_id'): # Read permissions are allowed to any request. role = Role.objects.filter(role=Role.COUNTRY_ADMIN, site_id=request.data.site_id) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role).exists(), super().has_permission(request, view) ] elif hasattr(request.data, 'country_id'): role = Role.objects.filter(role=Role.COUNTRY_ADMIN, country_id=request.data.country_id) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role).exists(), super().has_permission(request, view) ] return any(rules) def has_object_permission(self, request, view, obj): # Read permissions are allowed to any request. if hasattr(obj, 'site_id'): role = Role.objects.filter(role=Role.COUNTRY_ADMIN, site_id=obj.site_id) \ .first() rules = [ super().has_object_permission(request, view, obj) ] elif hasattr(obj, 'country_id'): role = Role.objects.filter(role=Role.COUNTRY_ADMIN, country_id=obj.country_id) \ .first() rules = [ super().has_object_permission(request, view, obj) ] if hasattr(request, 'user') and request.user.is_authenticated: rules = [ UserRole.objects.filter(user=request.user, role=role).exists(), super().has_object_permission(request, view, obj), ] if hasattr(request.data, 'user'): rules = [ UserRole.objects.filter(user=request.data.user, role=role).exists(), super().has_object_permission(request, view, obj), ] return any(rules) class IsCommentModerator(IsStandardUser): """ Object-level permission to only allow owners of an object to edit it. Assumes the model instance has an `owner` attribute. """ def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] if any(rules) and hasattr(request.data, 'site_id'): # Read permissions are allowed to any request. role = Role.objects.filter(role=Role.COMMENTS_MODERATOR, site_id=request.data.site_id) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role).exists(), super().has_permission(request, view) ] return any(rules) def has_object_permission(self, request, view, obj): rules = [ super().has_object_permission(request, view, obj) ] if request.user.is_authenticated: role = Role.objects.filter(role=Role.COMMENTS_MODERATOR, site_id=obj.site_id) \ .first() # 'Comments moderator' rules = [ UserRole.objects.filter(user=request.user, role=role).exists() and obj.user != request.user, super().has_object_permission(request, view, obj) ] return any(rules) class IsEstablishmentManager(IsStandardUser): def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] if hasattr(request.data, 'user'): if hasattr(request.data, 'establishment_id'): role = Role.objects.filter(role=Role.ESTABLISHMENT_MANAGER) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role, establishment_id=request.data.establishment_id ).exists(), super().has_permission(request, view) ] return any(rules) def has_object_permission(self, request, view, obj): rules = [ # special! super().has_permission(request, view) ] role = Role.objects.filter(role=Role.ESTABLISHMENT_MANAGER) \ .first() if hasattr(obj, 'establishment_id'): rules = [ UserRole.objects.filter(user=request.user, role=role, establishment_id=obj.establishment_id ).exists(), # special! super().has_permission(request, view) ] return any(rules) class IsReviewerManager(IsStandardUser): def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] # and request.user.email_confirmed, if hasattr(request.data, 'user') and hasattr(request.data, 'site_id'): role = Role.objects.filter(role=Role.REVIEWER_MANGER) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role, establishment_id=request.data.site_id ).exists(), super().has_permission(request, view) ] return any(rules) def has_object_permission(self, request, view, obj): role = Role.objects.filter(role=Role.REVIEWER_MANGER, country_id=obj.country_id) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role).exists(), super().has_object_permission(request, view, obj) ] return any(rules) class IsRestaurantReviewer(IsStandardUser): def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] # and request.user.email_confirmed, if hasattr(request.data, 'user') and hasattr(request.data, 'object_id'): role = Role.objects.filter(role=Role.RESTAURANT_REVIEWER) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role, establishment_id=request.data.object_id ).exists(), super().has_permission(request, view) ] return any(rules) def has_object_permission(self, request, view, obj): content_type = ContentType.objects.get(app_lable='establishment', model='establishment') role = Role.objects.filter(role=Role.RESTAURANT_REVIEWER, country=obj.country_id).first() rules = [ obj.content_type_id == content_type.id and UserRole.objects.filter(user=request.user, role=role, establishment_id=obj.object_id ).exists(), super().has_object_permission(request, view, obj) ] return any(rules) class IsWineryReviewer(IsStandardUser): def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] if 'type_id' in request.data and 'address_id' in request.data and request.user: countries = Address.objects.filter(id=request.data['address_id']) est = EstablishmentSubType.objects.filter(establishment_type_id=request.data['type_id']) if est.exists(): role = Role.objects.filter(establishment_subtype_id__in=[type.id for type in est], role=Role.WINERY_REVIEWER, country_id__in=[country.id for country in countries]) \ .first() rules.append( UserRole.objects.filter(user=request.user, role=role).exists() ) return any(rules) def has_object_permission(self, request, view, obj): rules = [ super().has_object_permission(request, view, obj) ] if hasattr(obj, 'type_id') or hasattr(obj, 'establishment_type_id'): type_id: int if hasattr(obj, 'type_id'): type_id = obj.type_id else: type_id = obj.establishment_type_id est = EstablishmentSubType.objects.filter(establishment_type_id=type_id) role = Role.objects.filter(role=Role.WINERY_REVIEWER, establishment_subtype_id__in=[id for type.id in est], country_id=obj.country_id).first() object_id: int if hasattr(obj, 'object_id'): object_id = obj.object_id else: object_id = obj.establishment_id rules = [ UserRole.objects.filter(user=request.user, role=role, establishment_id=object_id ).exists(), super().has_object_permission(request, view, obj) ] return any(rules) class IsLiquorReviewer(IsStandardUser): def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] pk_object = None product = None permission = False if 'pk' in view.kwargs: pk_object = view.kwargs['pk'] if pk_object is not None: product = Product.objects.get(pk=pk_object) if hasattr(product, 'sites') and product.sites.exists(): role = Role.objects.filter(role=Role.LIQUOR_REVIEWER, site__in=[site for site in product.sites]) permission = UserRole.objects.filter(user=request.user, role=role).exists() rules.append(permission) return any(rules) def has_object_permission(self, request, view, obj): rules = [ super().has_object_permission(request, view, obj) ] pk_object = None product = None permission = False if 'pk' in view.kwargs: pk_object = view.kwargs['pk'] if pk_object is not None: product = Product.objects.get(pk=pk_object) if product.sites.exists(): role = Role.objects.filter(role=Role.LIQUOR_REVIEWER, site__in=[site for site in product.sites]) permission = UserRole.objects.filter(user=request.user, role=role).exists() rules.append(permission) return any(rules)