"""Project custom permissions""" from django.contrib.contenttypes.models import ContentType from rest_framework import permissions from rest_framework_simplejwt.tokens import AccessToken from account.models import UserRole, Role from authorization.models import JWTRefreshToken from utils.tokens import GMRefreshToken from establishment.models import EstablishmentSubType from location.models import Address from product.models import Product, ProductType class IsAuthenticatedAndTokenIsValid(permissions.BasePermission): """ Check if user has a valid token and authenticated """ def has_permission(self, request, view): """Check permissions by access token and default REST permission IsAuthenticated""" user = request.user access_token = request.COOKIES.get('access_token') if user.is_authenticated and access_token: access_token = AccessToken(access_token) valid_tokens = user.access_tokens.valid() \ .by_jti(jti=access_token.payload.get('jti')) return valid_tokens.exists() else: return False class IsRefreshTokenValid(permissions.BasePermission): """ Check if user has a valid refresh token and authenticated """ def has_permission(self, request, view): """Check permissions by refresh token and default REST permission IsAuthenticated""" refresh_token = request.COOKIES.get('refresh_token') if refresh_token: refresh_token = GMRefreshToken(refresh_token) refresh_token_qs = JWTRefreshToken.objects.valid() \ .by_jti(jti=refresh_token.payload.get('jti')) return refresh_token_qs.exists() else: return False def has_object_permission(self, request, view, obj): # Read permissions are allowed to any request, # so we'll always allow GET, HEAD or OPTIONS requests. if request.method in permissions.SAFE_METHODS or \ obj.user == request.user or request.user.is_superuser: return True return False class IsGuest(permissions.IsAuthenticatedOrReadOnly): """ Object-level permission to only allow owners of an object to edit it. """ SAFE_METHODS = ('GET', 'HEAD', 'OPTIONS') def has_permission(self, request, view): rules = [ request.user.is_superuser, request.method in permissions.SAFE_METHODS ] return any(rules) def has_object_permission(self, request, view, obj): rules = [ request.user.is_superuser, request.method in permissions.SAFE_METHODS ] return any(rules) class IsStandardUser(IsGuest): """ Object-level permission to only allow owners of an object to edit it. Assumes the model instance has an `owner` attribute. """ def has_permission(self, request, view): rules = [super().has_permission(request, view), request.user.is_authenticated, hasattr(request, 'user') ] return any(rules) def has_object_permission(self, request, view, obj): # Read permissions are allowed to any request rules = [super().has_object_permission(request, view, obj), request.user.is_authenticated, hasattr(request, 'user') ] return any(rules) class IsContentPageManager(IsStandardUser): """ Object-level permission to only allow owners of an object to edit it. Assumes the model instance has an `owner` attribute. """ def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] if hasattr(request, 'user'): if hasattr(request.data, 'site_id'): role = Role.objects.filter(role=Role.CONTENT_PAGE_MANAGER, site_id=request.data.site_id,) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role).exists(), super().has_permission(request, view) ] elif hasattr(request.data, 'country_id'): role = Role.objects.filter(role=Role.CONTENT_PAGE_MANAGER, country_id=request.data.country_id) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role).exists(), super().has_permission(request, view) ] return any(rules) def has_object_permission(self, request, view, obj): # Read permissions are allowed to any request. if hasattr(obj, 'site_id'): role = Role.objects.filter(role=Role.CONTENT_PAGE_MANAGER, site_id=obj.site_id) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role).exists(), super().has_object_permission(request, view, obj) ] elif hasattr(obj, 'country_id'): role = Role.objects.filter(role=Role.CONTENT_PAGE_MANAGER, country_id=obj.country_id) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role).exists(), super().has_object_permission(request, view, obj) ] return any(rules) class IsCountryAdmin(IsStandardUser): """ Object-level permission to only allow owners of an object to edit it. Assumes the model instance has an `owner` attribute. """ def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] # and request.user.email_confirmed, if hasattr(request.data, 'user'): if hasattr(request.data, 'site_id'): # Read permissions are allowed to any request. role = Role.objects.filter(role=Role.COUNTRY_ADMIN, site_id=request.data.site_id) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role).exists(), super().has_permission(request, view) ] elif hasattr(request.data, 'country_id'): role = Role.objects.filter(role=Role.COUNTRY_ADMIN, country_id=request.data.country_id) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role).exists(), super().has_permission(request, view) ] return any(rules) def has_object_permission(self, request, view, obj): # Read permissions are allowed to any request. if hasattr(obj, 'site_id'): role = Role.objects.filter(role=Role.COUNTRY_ADMIN, site_id=obj.site_id) \ .first() rules = [ super().has_object_permission(request, view, obj) ] elif hasattr(obj, 'country_id'): role = Role.objects.filter(role=Role.COUNTRY_ADMIN, country_id=obj.country_id) \ .first() rules = [ super().has_object_permission(request, view, obj) ] if hasattr(request, 'user') and request.user.is_authenticated: rules = [ UserRole.objects.filter(user=request.user, role=role).exists(), super().has_object_permission(request, view, obj), ] if hasattr(request.data, 'user'): rules = [ UserRole.objects.filter(user=request.data.user, role=role).exists(), super().has_object_permission(request, view, obj), ] return any(rules) class IsCommentModerator(IsStandardUser): """ Object-level permission to only allow owners of an object to edit it. Assumes the model instance has an `owner` attribute. """ def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] if any(rules) and hasattr(request.data, 'site_id'): # Read permissions are allowed to any request. role = Role.objects.filter(role=Role.COMMENTS_MODERATOR, site_id=request.data.site_id) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role).exists(), super().has_permission(request, view) ] return any(rules) def has_object_permission(self, request, view, obj): rules = [ super().has_object_permission(request, view, obj) ] if request.user.is_authenticated: role = Role.objects.filter(role=Role.COMMENTS_MODERATOR, site_id=obj.site_id) \ .first() # 'Comments moderator' rules = [ UserRole.objects.filter(user=request.user, role=role).exists() and obj.user != request.user, super().has_object_permission(request, view, obj) ] return any(rules) class IsEstablishmentManager(IsStandardUser): def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] if hasattr(request.data, 'user'): if hasattr(request.data, 'establishment_id'): role = Role.objects.filter(role=Role.ESTABLISHMENT_MANAGER) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role, establishment_id=request.data.establishment_id ).exists(), super().has_permission(request, view) ] return any(rules) def has_object_permission(self, request, view, obj): rules = [ # special! super().has_permission(request, view) ] role = Role.objects.filter(role=Role.ESTABLISHMENT_MANAGER) \ .first() if hasattr(obj, 'establishment_id'): rules = [ UserRole.objects.filter(user=request.user, role=role, establishment_id=obj.establishment_id ).exists(), # special! super().has_permission(request, view) ] return any(rules) class IsReviewerManager(IsStandardUser): def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] # and request.user.email_confirmed, if hasattr(request.data, 'user') and hasattr(request.data, 'site_id'): role = Role.objects.filter(role=Role.REVIEWER_MANGER) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role, establishment_id=request.data.site_id ).exists(), super().has_permission(request, view) ] return any(rules) def has_object_permission(self, request, view, obj): role = Role.objects.filter(role=Role.REVIEWER_MANGER, country_id=obj.country_id) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role).exists(), super().has_object_permission(request, view, obj) ] return any(rules) class IsRestaurantReviewer(IsStandardUser): def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] # and request.user.email_confirmed, if hasattr(request.data, 'user') and hasattr(request.data, 'object_id'): role = Role.objects.filter(role=Role.RESTAURANT_REVIEWER) \ .first() rules = [ UserRole.objects.filter(user=request.user, role=role, establishment_id=request.data.object_id ).exists(), super().has_permission(request, view) ] return any(rules) def has_object_permission(self, request, view, obj): content_type = ContentType.objects.get(app_lable='establishment', model='establishment') role = Role.objects.filter(role=Role.RESTAURANT_REVIEWER, country=obj.country_id).first() rules = [ obj.content_type_id == content_type.id and UserRole.objects.filter(user=request.user, role=role, establishment_id=obj.object_id ).exists(), super().has_object_permission(request, view, obj) ] return any(rules) class IsWineryReviewer(IsStandardUser): def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] if 'type_id' in request.data and 'address_id' in request.data and request.user: countries = Address.objects.filter(id=request.data['address_id']) est = EstablishmentSubType.objects.filter(establishment_type_id=request.data['type_id']) if est.exists(): role = Role.objects.filter(establishment_subtype_id__in=[est_type.id for est_type in est], role=Role.WINERY_REVIEWER, country_id__in=[country.id for country in countries]) \ .first() rules.append( UserRole.objects.filter(user=request.user, role=role).exists() ) return any(rules) def has_object_permission(self, request, view, obj): rules = [ super().has_object_permission(request, view, obj) ] if hasattr(obj, 'type_id') or hasattr(obj, 'establishment_type_id'): type_id: int if hasattr(obj, 'type_id'): type_id = obj.type_id else: type_id = obj.establishment_type_id est = EstablishmentSubType.objects.filter(establishment_type_id=type_id) role = Role.objects.filter(role=Role.WINERY_REVIEWER, establishment_subtype_id__in=[est_type.id for est_type in est], country_id=obj.country_id).first() object_id: int if hasattr(obj, 'object_id'): object_id = obj.object_id else: object_id = obj.establishment_id rules = [ UserRole.objects.filter(user=request.user, role=role, establishment_id=object_id ).exists(), super().has_object_permission(request, view, obj) ] return any(rules) class IsWineryReviewer(IsStandardUser): def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] if 'type_id' in request.data and 'address_id' in request.data and request.user: countries = Address.objects.filter(id=request.data['address_id']) est = EstablishmentSubType.objects.filter(establishment_type_id=request.data['type_id']) if est.exists(): role = Role.objects.filter(establishment_subtype_id__in=[est_type.id for est_type in est], role=Role.WINERY_REVIEWER, country_id__in=[country.id for country in countries]) \ .first() rules.append( UserRole.objects.filter(user=request.user, role=role).exists() ) return any(rules) def has_object_permission(self, request, view, obj): rules = [ super().has_object_permission(request, view, obj) ] if hasattr(obj, 'type_id') or hasattr(obj, 'establishment_type_id'): type_id: int if hasattr(obj, 'type_id'): type_id = obj.type_id else: type_id = obj.establishment_type_id est = EstablishmentSubType.objects.filter(establishment_type_id=type_id) role = Role.objects.filter(role=Role.WINERY_REVIEWER, establishment_subtype_id__in=[est_type.id for est_type in est], country_id=obj.country_id).first() object_id: int if hasattr(obj, 'object_id'): object_id = obj.object_id else: object_id = obj.establishment_id rules = [ UserRole.objects.filter(user=request.user, role=role, establishment_id=object_id ).exists(), super().has_object_permission(request, view, obj) ] return any(rules) class IsProductReviewer(IsStandardUser): def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] pk_object = None roles = None permission = False if 'site_id' in request.data: if request.data['site_id'] is not None: roles = Role.objects.filter(role=Role.PRODUCT_REVIEWER, site_id=request.data['site_id']) if 'pk' in view.kwargs: pk_object = view.kwargs['pk'] if pk_object is not None: product = Product.objects.get(pk=pk_object) if product.site_id is not None: roles = Role.objects.filter(role=Role.PRODUCT_REVIEWER, site_id=product.site_id) if roles is not None: permission = UserRole.objects.filter(user=request.user, role__in=[role for role in roles])\ .exists() rules.append(permission) return any(rules) class IsLiquorReviewer(IsStandardUser): def has_permission(self, request, view): rules = [ super().has_permission(request, view) ] pk_object = None roles = None permission = False if 'site_id' in request.data and 'product_type_id' in request.data: if request.data['site_id'] is not None \ and request.data['product_type_id'] is not None: product_types = ProductType.objects. \ filter(index_name=ProductType.LIQUOR, id=request.data['product_type_id']) if product_types.exists(): roles = Role.objects.filter(role=Role.LIQUOR_REVIEWER, site_id=request.data['site_id']) if 'pk' in view.kwargs: pk_object = view.kwargs['pk'] if pk_object is not None: product = Product.objects.get(pk=pk_object) if product.site_id is not None \ and product.product_type_id is not None: product_types = ProductType.objects. \ filter(index_name=ProductType.LIQUOR, id=product.product_type_id) if product_types.exists(): roles = Role.objects.filter(role=Role.LIQUOR_REVIEWER, site_id=product.site_id) if roles is not None: permission = UserRole.objects.filter(user=request.user, role__in=[role for role in roles])\ .exists() rules.append(permission) return any(rules) # # def has_object_permission(self, request, view, obj): # rules = [ # super().has_object_permission(request, view, obj) # ] # # pk_object = None # # product = None # # permission = False # # # # if 'pk' in view.kwargs: # # pk_object = view.kwargs['pk'] # # # # if pk_object is not None: # # product = Product.objects.get(pk=pk_object) # # # # if product.sites.exists(): # # role = Role.objects.filter(role=Role.LIQUOR_REVIEWER, site__in=[site for site in product.sites]) # # permission = UserRole.objects.filter(user=request.user, role=role).exists() # # # # rules.append(permission) # return any(rules)