gault-millau/apps/utils/permissions.py
2020-01-30 10:28:01 +03:00

423 lines
16 KiB
Python

"""Project custom permissions"""
from rest_framework import permissions
from rest_framework.permissions import SAFE_METHODS as SAFE_HTTP_METHODS
from rest_framework_simplejwt.tokens import AccessToken
from account.models import UserRole, Role
from authorization.models import JWTRefreshToken
from establishment.models import Establishment
from product.models import Product
from utils.tokens import GMRefreshToken
class IsAuthenticatedAndTokenIsValid(permissions.BasePermission):
"""
Check if user has a valid token and authenticated
"""
def has_permission(self, request, view):
"""Check permissions by access token and default REST permission IsAuthenticated"""
user = request.user
access_token = request.COOKIES.get('access_token')
if user.is_authenticated and access_token:
access_token = AccessToken(access_token)
valid_tokens = user.access_tokens.valid() \
.by_jti(jti=access_token.payload.get('jti'))
return valid_tokens.exists()
else:
return False
class IsRefreshTokenValid(permissions.BasePermission):
"""
Check if user has a valid refresh token and authenticated
"""
def has_permission(self, request, view):
"""Check permissions by refresh token and default REST permission IsAuthenticated"""
refresh_token = request.COOKIES.get('refresh_token')
if refresh_token:
refresh_token = GMRefreshToken(refresh_token)
refresh_token_qs = JWTRefreshToken.objects.valid() \
.by_jti(jti=refresh_token.payload.get('jti'))
return refresh_token_qs.exists()
else:
return False
def has_object_permission(self, request, view, obj):
# Read permissions are allowed to all request,
# so we'll always allow GET, HEAD or OPTIONS requests.
if request.method in SAFE_HTTP_METHODS or \
obj.user == request.user or request.user.is_superuser:
return True
return False
class IsGuest(permissions.IsAuthenticatedOrReadOnly):
"""
Object-level permission to only allow owners of an object to edit it.
"""
def has_permission(self, request, view):
rules = [
request.user.is_anonymous,
request.method in SAFE_HTTP_METHODS
]
return all(rules)
class IsApprovedUser(IsAuthenticatedAndTokenIsValid):
"""
Object-level permission to only allow owners of an object to edit it.
Assumes the model instance has an `owner` attribute.
"""
def has_permission(self, request, view):
rules = [
super().has_permission(request, view)
]
has_permission = False
if request.user.is_authenticated:
has_permission = request.user.email_confirmed
rules.append(has_permission)
return all(rules)
class IsContentPageManager(IsApprovedUser):
"""
Object-level permission to only allow owners of an object to edit it.
Assumes the model instance has an `owner` attribute.
"""
def has_permission(self, request, view):
rules = [
super().has_permission(request, view)
]
has_permission = False
if (request.user.is_authenticated and hasattr(request, 'country_code') and
request.country_code):
role = Role.objects.filter(
role=Role.CONTENT_PAGE_MANAGER, site__country__code=request.country_code,
).only('id')
if role.exists():
user_role = UserRole.objects.filter(
user=request.user, role__id__in=role.values_list('id', flat=True),
).only('id')
has_permission = True if user_role.exists() else has_permission
rules.append(has_permission)
return all(rules)
class IsCountryAdmin(IsApprovedUser):
"""
Object-level permission to only allow owners of an object to edit it.
Assumes the model instance has an `owner` attribute.
"""
def has_permission(self, request, view):
rules = [
super().has_permission(request, view)
]
has_permission = False
if (request.user.is_authenticated and hasattr(request, 'country_code') and
request.country_code):
role = Role.objects.filter(
role=Role.COUNTRY_ADMIN, site__country__code=request.country_code,
).only('id')
if role.exists():
user_role = UserRole.objects.filter(
user=request.user, role__id__in=role.values_list('id', flat=True)
).only('id')
has_permission = True if user_role.exists() else has_permission
rules.append(has_permission)
return all(rules)
class IsModerator(IsApprovedUser):
"""
Object-level permission to only allow owners of an object to edit it.
Assumes the model instance has an `owner` attribute.
"""
def has_permission(self, request, view):
rules = [
super().has_permission(request, view)
]
has_permission = False
if (request.user.is_authenticated and hasattr(request, 'country_code') and
request.country_code):
role = Role.objects.filter(
role=Role.MODERATOR, site__country__code=request.country_code,
).only('id')
if role.exists():
user_role = UserRole.objects.filter(
user=request.user, role__id__in=role.values_list('id', flat=True),
).only('id')
has_permission = True if user_role.exists() else has_permission
rules.append(has_permission)
return all(rules)
class IsEstablishmentManager(IsApprovedUser):
def has_permission(self, request, view):
rules = [
super().has_permission(request, view)
]
has_permission = False
if (request.user.is_authenticated and
hasattr(request, 'country_code') and
request.country_code):
user = request.user
role = Role.objects.filter(
role=Role.ESTABLISHMENT_MANAGER, site__country__code=request.country_code,
).only('id')
if role.exists():
user_role = UserRole.objects.filter(
user=user, role__id__in=role.values_list('id', flat=True),
).only('id')
has_permission = True if user_role.exists() else has_permission
rules.append(has_permission)
return all(rules)
class IsEstablishmentAdministrator(IsApprovedUser):
def has_permission(self, request, view):
rules = [
super().has_permission(request, view)
]
has_permission = False
if (request.user.is_authenticated and
hasattr(request, 'country_code') and
request.country_code):
role = Role.objects.filter(
role=Role.ESTABLISHMENT_ADMINISTRATOR, site__country__code=request.country_code,
).only('id')
if role.exists():
user_role = UserRole.objects.filter(
user=request.user, role__id__in=role.values_list('id', flat=True),
).only('id')
has_permission = True if user_role.exists() else has_permission
rules.append(has_permission)
return all(rules)
def has_object_permission(self, request, view, obj):
rules = [
super().has_object_permission(request, view, obj)
]
has_object_permission = False
role = Role.objects.filter(role=Role.ESTABLISHMENT_ADMINISTRATOR).only('id')
if request.user.is_authenticated and role.exists() and hasattr(obj, 'id'):
filters = {
'user': request.user,
'role__id__in': role.values_list('id', flat=True),
}
if isinstance(obj, Establishment):
filters.update({'establishment__id': obj.id})
if isinstance(obj, Product):
filters.update({'establishment__products__id': obj.id})
user_role = UserRole.objects.filter(**filters)
has_object_permission = True if user_role.exists() else has_object_permission
rules.append(has_object_permission)
return all(rules)
class IsReviewManager(IsApprovedUser):
MODEL_PERMISSIONS = {
'READ': ['establishment', 'product_type', 'news', 'recipe', 'user', ],
'WRITE': ['inquiries', 'userrole', 'review', 'establishment', 'product', 'news', 'recipe', ]
}
def has_permission(self, request, view):
rules = [
super().has_permission(request, view)
]
has_permission = False
if (request.user.is_authenticated and
hasattr(request, 'country_code') and
request.country_code):
role = Role.objects.filter(
role=Role.REVIEW_MANAGER, site__country__code=request.country_code,
).only('id')
if role.exists():
user_role = UserRole.objects.filter(
user=request.user, role__id__in=role.values_list('id', flat=True),
).only('id')
if user_role.exists():
# check model for read
model_name = view.get_queryset().model._meta.model_name
if ((model_name in self.MODEL_PERMISSIONS.get('READ', []) and
request.method in SAFE_HTTP_METHODS) or
(model_name in self.MODEL_PERMISSIONS.get('WRITE', []))):
has_permission = True
rules.append(has_permission)
return all(rules)
class IsRestaurantInspector(IsApprovedUser):
MODEL_PERMISSIONS = {
'READ': ['establishment', ],
'WRITE': ['inquiries', ]
}
def has_permission(self, request, view):
rules = [
super().has_permission(request, view)
]
has_permission = False
if (request.user.is_authenticated and
hasattr(request, 'country_code') and
request.country_code):
role = Role.objects.filter(
role=Role.RESTAURANT_INSPECTOR, site__country__code=request.country_code,
).only('id')
if role.exists():
user_role = UserRole.objects.filter(
user=request.user, role__id__in=role.values_list('id', flat=True),
).only('id')
if user_role.exists():
# check model for read
model_name = view.get_queryset().model._meta.model_name
if ((model_name in self.MODEL_PERMISSIONS.get('READ', []) and
request.method in SAFE_HTTP_METHODS) or
(model_name in self.MODEL_PERMISSIONS.get('WRITE', []))):
has_permission = True
rules.append(has_permission)
return all(rules)
class IsArtisanInspector(IsApprovedUser):
MODEL_PERMISSIONS = {
'READ': ['establishment', ],
'WRITE': ['inquiries', ]
}
def has_permission(self, request, view):
rules = [
super().has_permission(request, view)
]
has_permission = False
if (request.user.is_authenticated and
hasattr(request, 'country_code') and
request.country_code):
role = Role.objects.filter(
role=Role.ARTISAN_INSPECTOR, site__country__code=request.country_code,
).only('id')
if role.exists():
user_role = UserRole.objects.filter(
user=request.user, role__id__in=role.values_list('id', flat=True),
).only('id')
if user_role.exists():
# check model for read
model_name = view.get_queryset().model._meta.model_name
if ((model_name in self.MODEL_PERMISSIONS.get('READ', []) and
request.method in SAFE_HTTP_METHODS) or
(model_name in self.MODEL_PERMISSIONS.get('WRITE', []))):
has_permission = True
rules.append(has_permission)
return all(rules)
class IsWineryWineInspector(IsApprovedUser):
MODEL_PERMISSIONS = {
'READ': ['establishment', 'product', ],
'WRITE': ['inquiries', ]
}
def has_permission(self, request, view):
rules = [
super().has_permission(request, view)
]
has_permission = False
if (request.user.is_authenticated and
hasattr(request, 'country_code') and
request.country_code):
role = Role.objects.filter(
role=Role.WINERY_WINE_INSPECTOR, site__country__code=request.country_code,
).only('id')
if role.exists():
user_role = UserRole.objects.filter(
user=request.user, role__id__in=role.values_list('id', flat=True),
).only('id')
if user_role.exists():
# check model for read
model_name = view.get_queryset().model._meta.model_name
if ((model_name in self.MODEL_PERMISSIONS.get('READ', []) and
request.method in SAFE_HTTP_METHODS) or
(model_name in self.MODEL_PERMISSIONS.get('WRITE', []))):
has_permission = True
rules.append(has_permission)
return all(rules)
class IsProducerFoodInspector(IsApprovedUser):
MODEL_PERMISSIONS = {
'READ': ['establishment', 'product', ],
'WRITE': ['inquiries', ]
}
def has_permission(self, request, view):
rules = [
super().has_permission(request, view)
]
has_permission = False
if (request.user.is_authenticated and
hasattr(request, 'country_code') and
request.country_code):
role = Role.objects.filter(
role=Role.PRODUCER_FOOD_INSPECTOR, site__country__code=request.country_code,
).only('id')
if role.exists():
user_role = UserRole.objects.filter(
user=request.user, role__id__in=role.values_list('id', flat=True),
).only('id')
if user_role.exists():
# check model for read
model_name = view.get_queryset().model._meta.model_name
if ((model_name in self.MODEL_PERMISSIONS.get('READ', []) and
request.method in SAFE_HTTP_METHODS) or
(model_name in self.MODEL_PERMISSIONS.get('WRITE', []))):
has_permission = True
rules.append(has_permission)
return all(rules)
class IsDistilleryLiquorInspector(IsApprovedUser):
MODEL_PERMISSIONS = {
'READ': ['establishment', 'product', ],
'WRITE': ['inquiries', ]
}
def has_permission(self, request, view):
rules = [
super().has_permission(request, view)
]
has_permission = False
if (request.user.is_authenticated and
hasattr(request, 'country_code') and
request.country_code):
role = Role.objects.filter(
role=Role.DISTILLERY_LIQUOR_INSPECTOR, site__country__code=request.country_code,
).only('id')
if role.exists():
user_role = UserRole.objects.filter(
user=request.user, role__id__in=role.values_list('id', flat=True),
).only('id')
if user_role.exists():
# check model for read
model_name = view.get_queryset().model._meta.model_name
if ((model_name in self.MODEL_PERMISSIONS.get('READ', []) and
request.method in SAFE_HTTP_METHODS) or
(model_name in self.MODEL_PERMISSIONS.get('WRITE', []))):
has_permission = True
rules.append(has_permission)
return all(rules)