597 lines
21 KiB
Python
597 lines
21 KiB
Python
"""Project custom permissions"""
|
|
from django.contrib.contenttypes.models import ContentType
|
|
|
|
from rest_framework import permissions
|
|
from rest_framework_simplejwt.tokens import AccessToken
|
|
|
|
from account.models import UserRole, Role
|
|
from authorization.models import JWTRefreshToken
|
|
from utils.tokens import GMRefreshToken
|
|
from establishment.models import EstablishmentSubType
|
|
from location.models import Address
|
|
from product.models import Product, ProductType
|
|
|
|
|
|
class IsAuthenticatedAndTokenIsValid(permissions.BasePermission):
|
|
"""
|
|
Check if user has a valid token and authenticated
|
|
"""
|
|
|
|
def has_permission(self, request, view):
|
|
"""Check permissions by access token and default REST permission IsAuthenticated"""
|
|
user = request.user
|
|
access_token = request.COOKIES.get('access_token')
|
|
if user.is_authenticated and access_token:
|
|
access_token = AccessToken(access_token)
|
|
valid_tokens = user.access_tokens.valid() \
|
|
.by_jti(jti=access_token.payload.get('jti'))
|
|
return valid_tokens.exists()
|
|
else:
|
|
return False
|
|
|
|
|
|
class IsRefreshTokenValid(permissions.BasePermission):
|
|
"""
|
|
Check if user has a valid refresh token and authenticated
|
|
"""
|
|
|
|
def has_permission(self, request, view):
|
|
"""Check permissions by refresh token and default REST permission IsAuthenticated"""
|
|
refresh_token = request.COOKIES.get('refresh_token')
|
|
if refresh_token:
|
|
refresh_token = GMRefreshToken(refresh_token)
|
|
refresh_token_qs = JWTRefreshToken.objects.valid() \
|
|
.by_jti(jti=refresh_token.payload.get('jti'))
|
|
return refresh_token_qs.exists()
|
|
else:
|
|
return False
|
|
|
|
def has_object_permission(self, request, view, obj):
|
|
# Read permissions are allowed to any request,
|
|
# so we'll always allow GET, HEAD or OPTIONS requests.
|
|
if request.method in permissions.SAFE_METHODS or \
|
|
obj.user == request.user or request.user.is_superuser:
|
|
return True
|
|
return False
|
|
|
|
|
|
class IsGuest(permissions.IsAuthenticatedOrReadOnly):
|
|
"""
|
|
Object-level permission to only allow owners of an object to edit it.
|
|
"""
|
|
SAFE_METHODS = ('GET', 'HEAD', 'OPTIONS')
|
|
def has_permission(self, request, view):
|
|
|
|
rules = [
|
|
request.user.is_superuser,
|
|
request.method in permissions.SAFE_METHODS
|
|
]
|
|
return any(rules)
|
|
|
|
def has_object_permission(self, request, view, obj):
|
|
rules = [
|
|
request.user.is_superuser,
|
|
request.method in permissions.SAFE_METHODS
|
|
]
|
|
return any(rules)
|
|
|
|
|
|
class IsStandardUser(IsGuest):
|
|
"""
|
|
Object-level permission to only allow owners of an object to edit it.
|
|
Assumes the model instance has an `owner` attribute.
|
|
"""
|
|
|
|
def has_permission(self, request, view):
|
|
|
|
rules = [super().has_permission(request, view),
|
|
request.user.is_authenticated,
|
|
hasattr(request, 'user')
|
|
]
|
|
|
|
return any(rules)
|
|
|
|
def has_object_permission(self, request, view, obj):
|
|
# Read permissions are allowed to any request
|
|
|
|
rules = [super().has_object_permission(request, view, obj),
|
|
request.user.is_authenticated,
|
|
hasattr(request, 'user')
|
|
]
|
|
|
|
return any(rules)
|
|
|
|
|
|
class IsContentPageManager(IsStandardUser):
|
|
"""
|
|
Object-level permission to only allow owners of an object to edit it.
|
|
Assumes the model instance has an `owner` attribute.
|
|
"""
|
|
|
|
def has_permission(self, request, view):
|
|
rules = [
|
|
super().has_permission(request, view)
|
|
]
|
|
|
|
if hasattr(request, 'user'):
|
|
if hasattr(request.data, 'site_id'):
|
|
role = Role.objects.filter(role=Role.CONTENT_PAGE_MANAGER,
|
|
site_id=request.data.site_id,) \
|
|
.first()
|
|
|
|
rules = [
|
|
UserRole.objects.filter(user=request.user, role=role).exists(),
|
|
super().has_permission(request, view)
|
|
]
|
|
elif hasattr(request.data, 'country_id'):
|
|
role = Role.objects.filter(role=Role.CONTENT_PAGE_MANAGER,
|
|
country_id=request.data.country_id) \
|
|
.first()
|
|
|
|
rules = [
|
|
UserRole.objects.filter(user=request.user, role=role).exists(),
|
|
super().has_permission(request, view)
|
|
]
|
|
|
|
return any(rules)
|
|
|
|
def has_object_permission(self, request, view, obj):
|
|
# Read permissions are allowed to any request.
|
|
if hasattr(obj, 'site_id'):
|
|
role = Role.objects.filter(role=Role.CONTENT_PAGE_MANAGER,
|
|
site_id=obj.site_id) \
|
|
.first()
|
|
|
|
rules = [
|
|
UserRole.objects.filter(user=request.user, role=role).exists(),
|
|
super().has_object_permission(request, view, obj)
|
|
]
|
|
elif hasattr(obj, 'country_id'):
|
|
role = Role.objects.filter(role=Role.CONTENT_PAGE_MANAGER,
|
|
country_id=obj.country_id) \
|
|
.first()
|
|
|
|
rules = [
|
|
UserRole.objects.filter(user=request.user, role=role).exists(),
|
|
super().has_object_permission(request, view, obj)
|
|
]
|
|
|
|
return any(rules)
|
|
|
|
|
|
class IsCountryAdmin(IsStandardUser):
|
|
"""
|
|
Object-level permission to only allow owners of an object to edit it.
|
|
Assumes the model instance has an `owner` attribute.
|
|
"""
|
|
def has_permission(self, request, view):
|
|
|
|
rules = [
|
|
super().has_permission(request, view)
|
|
]
|
|
# and request.user.email_confirmed,
|
|
if hasattr(request.data, 'user'):
|
|
if hasattr(request.data, 'site_id'):
|
|
# Read permissions are allowed to any request.
|
|
|
|
role = Role.objects.filter(role=Role.COUNTRY_ADMIN,
|
|
site_id=request.data.site_id) \
|
|
.first()
|
|
|
|
rules = [
|
|
UserRole.objects.filter(user=request.user, role=role).exists(),
|
|
super().has_permission(request, view)
|
|
]
|
|
elif hasattr(request.data, 'country_id'):
|
|
|
|
role = Role.objects.filter(role=Role.COUNTRY_ADMIN,
|
|
country_id=request.data.country_id) \
|
|
.first()
|
|
|
|
rules = [
|
|
UserRole.objects.filter(user=request.user, role=role).exists(),
|
|
super().has_permission(request, view)
|
|
]
|
|
return any(rules)
|
|
|
|
def has_object_permission(self, request, view, obj):
|
|
# Read permissions are allowed to any request.
|
|
if hasattr(obj, 'site_id'):
|
|
role = Role.objects.filter(role=Role.COUNTRY_ADMIN,
|
|
site_id=obj.site_id) \
|
|
.first()
|
|
|
|
rules = [
|
|
super().has_object_permission(request, view, obj)
|
|
]
|
|
elif hasattr(obj, 'country_id'):
|
|
role = Role.objects.filter(role=Role.COUNTRY_ADMIN,
|
|
country_id=obj.country_id) \
|
|
.first()
|
|
|
|
rules = [
|
|
super().has_object_permission(request, view, obj)
|
|
]
|
|
|
|
if hasattr(request, 'user') and request.user.is_authenticated:
|
|
rules = [
|
|
UserRole.objects.filter(user=request.user, role=role).exists(),
|
|
super().has_object_permission(request, view, obj),
|
|
]
|
|
|
|
if hasattr(request.data, 'user'):
|
|
rules = [
|
|
UserRole.objects.filter(user=request.data.user, role=role).exists(),
|
|
super().has_object_permission(request, view, obj),
|
|
]
|
|
|
|
return any(rules)
|
|
|
|
|
|
class IsCommentModerator(IsStandardUser):
|
|
"""
|
|
Object-level permission to only allow owners of an object to edit it.
|
|
Assumes the model instance has an `owner` attribute.
|
|
"""
|
|
|
|
def has_permission(self, request, view):
|
|
rules = [
|
|
super().has_permission(request, view)
|
|
]
|
|
|
|
if any(rules) and hasattr(request.data, 'site_id'):
|
|
# Read permissions are allowed to any request.
|
|
|
|
role = Role.objects.filter(role=Role.COMMENTS_MODERATOR,
|
|
site_id=request.data.site_id) \
|
|
.first()
|
|
|
|
rules = [
|
|
UserRole.objects.filter(user=request.user, role=role).exists(),
|
|
super().has_permission(request, view)
|
|
]
|
|
|
|
return any(rules)
|
|
|
|
def has_object_permission(self, request, view, obj):
|
|
|
|
rules = [
|
|
super().has_object_permission(request, view, obj)
|
|
]
|
|
|
|
if request.user.is_authenticated:
|
|
|
|
role = Role.objects.filter(role=Role.COMMENTS_MODERATOR,
|
|
site_id=obj.site_id) \
|
|
.first() # 'Comments moderator'
|
|
|
|
rules = [
|
|
UserRole.objects.filter(user=request.user, role=role).exists() and
|
|
obj.user != request.user,
|
|
super().has_object_permission(request, view, obj)
|
|
]
|
|
return any(rules)
|
|
|
|
|
|
class IsEstablishmentManager(IsStandardUser):
|
|
|
|
def has_permission(self, request, view):
|
|
rules = [
|
|
super().has_permission(request, view)
|
|
]
|
|
|
|
if hasattr(request.data, 'user'):
|
|
if hasattr(request.data, 'establishment_id'):
|
|
role = Role.objects.filter(role=Role.ESTABLISHMENT_MANAGER) \
|
|
.first()
|
|
|
|
rules = [
|
|
UserRole.objects.filter(user=request.user, role=role,
|
|
establishment_id=request.data.establishment_id
|
|
).exists(),
|
|
super().has_permission(request, view)
|
|
]
|
|
return any(rules)
|
|
|
|
def has_object_permission(self, request, view, obj):
|
|
|
|
rules = [
|
|
# special!
|
|
super().has_permission(request, view)
|
|
]
|
|
|
|
role = Role.objects.filter(role=Role.ESTABLISHMENT_MANAGER) \
|
|
.first()
|
|
|
|
if hasattr(obj, 'establishment_id'):
|
|
rules = [
|
|
UserRole.objects.filter(user=request.user, role=role,
|
|
establishment_id=obj.establishment_id
|
|
).exists(),
|
|
# special!
|
|
super().has_permission(request, view)
|
|
]
|
|
|
|
return any(rules)
|
|
|
|
|
|
class IsReviewerManager(IsStandardUser):
|
|
|
|
def has_permission(self, request, view):
|
|
rules = [
|
|
super().has_permission(request, view)
|
|
]
|
|
|
|
# and request.user.email_confirmed,
|
|
if hasattr(request.data, 'user') and hasattr(request.data, 'site_id'):
|
|
role = Role.objects.filter(role=Role.REVIEWER_MANGER) \
|
|
.first()
|
|
|
|
rules = [
|
|
UserRole.objects.filter(user=request.user, role=role,
|
|
establishment_id=request.data.site_id
|
|
).exists(),
|
|
super().has_permission(request, view)
|
|
]
|
|
return any(rules)
|
|
|
|
def has_object_permission(self, request, view, obj):
|
|
role = Role.objects.filter(role=Role.REVIEWER_MANGER,
|
|
country_id=obj.country_id) \
|
|
.first()
|
|
|
|
rules = [
|
|
UserRole.objects.filter(user=request.user, role=role).exists(),
|
|
super().has_object_permission(request, view, obj)
|
|
]
|
|
|
|
return any(rules)
|
|
|
|
|
|
class IsRestaurantReviewer(IsStandardUser):
|
|
|
|
def has_permission(self, request, view):
|
|
rules = [
|
|
super().has_permission(request, view)
|
|
]
|
|
|
|
# and request.user.email_confirmed,
|
|
if hasattr(request.data, 'user') and hasattr(request.data, 'object_id'):
|
|
role = Role.objects.filter(role=Role.RESTAURANT_REVIEWER) \
|
|
.first()
|
|
|
|
rules = [
|
|
UserRole.objects.filter(user=request.user, role=role,
|
|
establishment_id=request.data.object_id
|
|
).exists(),
|
|
super().has_permission(request, view)
|
|
]
|
|
return any(rules)
|
|
|
|
def has_object_permission(self, request, view, obj):
|
|
content_type = ContentType.objects.get(app_lable='establishment',
|
|
model='establishment')
|
|
|
|
role = Role.objects.filter(role=Role.RESTAURANT_REVIEWER,
|
|
country=obj.country_id).first()
|
|
|
|
rules = [
|
|
obj.content_type_id == content_type.id and
|
|
UserRole.objects.filter(user=request.user, role=role,
|
|
establishment_id=obj.object_id
|
|
).exists(),
|
|
super().has_object_permission(request, view, obj)
|
|
]
|
|
|
|
return any(rules)
|
|
|
|
|
|
class IsWineryReviewer(IsStandardUser):
|
|
|
|
def has_permission(self, request, view):
|
|
rules = [
|
|
super().has_permission(request, view)
|
|
]
|
|
|
|
if 'type_id' in request.data and 'address_id' in request.data and request.user:
|
|
countries = Address.objects.filter(id=request.data['address_id'])
|
|
|
|
est = EstablishmentSubType.objects.filter(establishment_type_id=request.data['type_id'])
|
|
if est.exists():
|
|
role = Role.objects.filter(establishment_subtype_id__in=[type.id for type in est],
|
|
role=Role.WINERY_REVIEWER,
|
|
country_id__in=[country.id for country in countries]) \
|
|
.first()
|
|
|
|
rules.append(
|
|
UserRole.objects.filter(user=request.user, role=role).exists()
|
|
)
|
|
|
|
return any(rules)
|
|
|
|
def has_object_permission(self, request, view, obj):
|
|
rules = [
|
|
super().has_object_permission(request, view, obj)
|
|
]
|
|
|
|
if hasattr(obj, 'type_id') or hasattr(obj, 'establishment_type_id'):
|
|
type_id: int
|
|
if hasattr(obj, 'type_id'):
|
|
type_id = obj.type_id
|
|
else:
|
|
type_id = obj.establishment_type_id
|
|
|
|
est = EstablishmentSubType.objects.filter(establishment_type_id=type_id)
|
|
role = Role.objects.filter(role=Role.WINERY_REVIEWER,
|
|
establishment_subtype_id__in=[id for type.id in est],
|
|
country_id=obj.country_id).first()
|
|
|
|
object_id: int
|
|
if hasattr(obj, 'object_id'):
|
|
object_id = obj.object_id
|
|
else:
|
|
object_id = obj.establishment_id
|
|
|
|
rules = [
|
|
UserRole.objects.filter(user=request.user, role=role,
|
|
establishment_id=object_id
|
|
).exists(),
|
|
super().has_object_permission(request, view, obj)
|
|
]
|
|
return any(rules)
|
|
|
|
|
|
class IsWineryReviewer(IsStandardUser):
|
|
|
|
def has_permission(self, request, view):
|
|
rules = [
|
|
super().has_permission(request, view)
|
|
]
|
|
|
|
if 'type_id' in request.data and 'address_id' in request.data and request.user:
|
|
countries = Address.objects.filter(id=request.data['address_id'])
|
|
|
|
est = EstablishmentSubType.objects.filter(establishment_type_id=request.data['type_id'])
|
|
if est.exists():
|
|
role = Role.objects.filter(establishment_subtype_id__in=[type.id for type in est],
|
|
role=Role.WINERY_REVIEWER,
|
|
country_id__in=[country.id for country in countries]) \
|
|
.first()
|
|
|
|
rules.append(
|
|
UserRole.objects.filter(user=request.user, role=role).exists()
|
|
)
|
|
|
|
return any(rules)
|
|
|
|
def has_object_permission(self, request, view, obj):
|
|
rules = [
|
|
super().has_object_permission(request, view, obj)
|
|
]
|
|
|
|
if hasattr(obj, 'type_id') or hasattr(obj, 'establishment_type_id'):
|
|
type_id: int
|
|
if hasattr(obj, 'type_id'):
|
|
type_id = obj.type_id
|
|
else:
|
|
type_id = obj.establishment_type_id
|
|
|
|
est = EstablishmentSubType.objects.filter(establishment_type_id=type_id)
|
|
role = Role.objects.filter(role=Role.WINERY_REVIEWER,
|
|
establishment_subtype_id__in=[id for type.id in est],
|
|
country_id=obj.country_id).first()
|
|
|
|
object_id: int
|
|
if hasattr(obj, 'object_id'):
|
|
object_id = obj.object_id
|
|
else:
|
|
object_id = obj.establishment_id
|
|
|
|
rules = [
|
|
UserRole.objects.filter(user=request.user, role=role,
|
|
establishment_id=object_id
|
|
).exists(),
|
|
super().has_object_permission(request, view, obj)
|
|
]
|
|
return any(rules)
|
|
|
|
|
|
class IsProductReviewer(IsStandardUser):
|
|
|
|
def has_permission(self, request, view):
|
|
rules = [
|
|
super().has_permission(request, view)
|
|
]
|
|
|
|
pk_object = None
|
|
roles = None
|
|
permission = False
|
|
|
|
if 'site_id' in request.data:
|
|
if request.data['site_id'] is not None:
|
|
roles = Role.objects.filter(role=Role.PRODUCT_REVIEWER,
|
|
site_id=request.data['site_id'])
|
|
|
|
if 'pk' in view.kwargs:
|
|
pk_object = view.kwargs['pk']
|
|
|
|
if pk_object is not None:
|
|
product = Product.objects.get(pk=pk_object)
|
|
if product.site_id is not None:
|
|
roles = Role.objects.filter(role=Role.PRODUCT_REVIEWER,
|
|
site_id=product.site_id)
|
|
|
|
if roles is not None:
|
|
permission = UserRole.objects.filter(user=request.user, role__in=[role for role in roles])\
|
|
.exists()
|
|
|
|
rules.append(permission)
|
|
return any(rules)
|
|
|
|
|
|
class IsLiquorReviewer(IsStandardUser):
|
|
def has_permission(self, request, view):
|
|
rules = [
|
|
super().has_permission(request, view)
|
|
]
|
|
|
|
pk_object = None
|
|
roles = None
|
|
permission = False
|
|
|
|
if 'site_id' in request.data and 'product_type_id' in request.data:
|
|
if request.data['site_id'] is not None \
|
|
and request.data['product_type_id'] is not None:
|
|
|
|
product_types = ProductType.objects. \
|
|
filter(index_name=ProductType.LIQUOR,
|
|
id=request.data['product_type_id'])
|
|
|
|
if product_types.exists():
|
|
roles = Role.objects.filter(role=Role.LIQUOR_REVIEWER,
|
|
site_id=request.data['site_id'])
|
|
|
|
if 'pk' in view.kwargs:
|
|
pk_object = view.kwargs['pk']
|
|
|
|
if pk_object is not None:
|
|
product = Product.objects.get(pk=pk_object)
|
|
if product.site_id is not None \
|
|
and product.product_type_id is not None:
|
|
|
|
product_types = ProductType.objects. \
|
|
filter(index_name=ProductType.LIQUOR,
|
|
id=product.product_type_id)
|
|
|
|
if product_types.exists():
|
|
roles = Role.objects.filter(role=Role.LIQUOR_REVIEWER,
|
|
site_id=product.site_id)
|
|
|
|
if roles is not None:
|
|
permission = UserRole.objects.filter(user=request.user, role__in=[role for role in roles])\
|
|
.exists()
|
|
|
|
rules.append(permission)
|
|
return any(rules)
|
|
|
|
#
|
|
# def has_object_permission(self, request, view, obj):
|
|
# rules = [
|
|
# super().has_object_permission(request, view, obj)
|
|
# ]
|
|
# # pk_object = None
|
|
# # product = None
|
|
# # permission = False
|
|
# #
|
|
# # if 'pk' in view.kwargs:
|
|
# # pk_object = view.kwargs['pk']
|
|
# #
|
|
# # if pk_object is not None:
|
|
# # product = Product.objects.get(pk=pk_object)
|
|
# #
|
|
# # if product.sites.exists():
|
|
# # role = Role.objects.filter(role=Role.LIQUOR_REVIEWER, site__in=[site for site in product.sites])
|
|
# # permission = UserRole.objects.filter(user=request.user, role=role).exists()
|
|
# #
|
|
# # rules.append(permission)
|
|
# return any(rules) |