255 lines
7.1 KiB
Python
255 lines
7.1 KiB
Python
"""Utils app method."""
|
|
import logging
|
|
import pathlib
|
|
import random
|
|
import re
|
|
import string
|
|
from collections import namedtuple
|
|
from functools import reduce
|
|
from io import BytesIO
|
|
from operator import or_
|
|
|
|
import requests
|
|
from PIL import Image
|
|
from django.conf import settings
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.contrib.gis.geos import Point
|
|
from django.http.request import HttpRequest
|
|
from django.utils.timezone import datetime
|
|
from rest_framework import status
|
|
from rest_framework.request import Request
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def generate_code(digits=6, string_output=True):
|
|
"""Generate random int."""
|
|
max_value = 10 ** digits - 1
|
|
min_value = 10 ** (digits - 1)
|
|
value = random.randint(min_value, max_value)
|
|
return str(value) if string_output else value
|
|
|
|
|
|
def get_token_from_cookies(request):
|
|
"""Get access token from request cookies"""
|
|
cookies = request.COOKIES
|
|
if cookies.get('access_token'):
|
|
token = f'Bearer {cookies.get("access_token")}'
|
|
return token.encode()
|
|
|
|
|
|
def get_token_from_request(request):
|
|
"""Get access token from request"""
|
|
token = None
|
|
if 'Authorization' in request.headers:
|
|
if isinstance(request, HttpRequest):
|
|
token = request.headers.get('Authorization').split(' ')[::-1][0]
|
|
if isinstance(request, Request):
|
|
token = request.headers.get('Authorization').split(' ')[::-1][0]
|
|
return token
|
|
|
|
|
|
def username_validator(username: str) -> bool:
|
|
"""Validate given username"""
|
|
pattern = r'[\s@,]'
|
|
if re.search(pattern=pattern, string=username):
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
def string_random():
|
|
"""Generate random username"""
|
|
username = list('{letters}{digits}'.format(
|
|
letters=''.join([random.choice(string.ascii_lowercase) for _ in range(4)]),
|
|
digits=random.randrange(100, 1000)
|
|
))
|
|
random.shuffle(username)
|
|
return '{first}{username}'.format(
|
|
first=random.choice(string.ascii_lowercase),
|
|
username=''.join(username)
|
|
)
|
|
|
|
|
|
def file_path(instance, filename):
|
|
"""Determine file path method."""
|
|
filename = '%s.%s' % (generate_code(), pathlib.Path(filename).suffix.lstrip('.'))
|
|
return 'files/%s/%s/%s' % (
|
|
instance._meta.model_name,
|
|
datetime.now().strftime(settings.REST_DATE_FORMAT),
|
|
filename)
|
|
|
|
|
|
def image_path(instance, filename):
|
|
"""Determine avatar path method."""
|
|
filename = '%s.jpeg' % generate_code()
|
|
return 'image/%s/%s/%s' % (
|
|
instance._meta.model_name,
|
|
datetime.now().strftime(settings.REST_DATE_FORMAT),
|
|
filename)
|
|
|
|
|
|
def svg_image_path(instance, filename):
|
|
"""Determine SVG path method."""
|
|
filename = '%s.svg' % generate_code()
|
|
return 'svg/%s/%s/%s' % (
|
|
instance._meta.model_name,
|
|
datetime.now().strftime(settings.REST_DATE_FORMAT),
|
|
filename)
|
|
|
|
|
|
def get_user_ip(request):
|
|
"""Get user ip."""
|
|
meta_dict = request.META
|
|
x_forwarded_for = meta_dict.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
ip = x_forwarded_for.split(',')[0]
|
|
else:
|
|
ip = meta_dict.get('REMOTE_ADDR')
|
|
return ip
|
|
|
|
|
|
def generate_string_code(size=64,
|
|
chars=string.ascii_lowercase + string.ascii_uppercase + string.digits):
|
|
"""Generate string code."""
|
|
return ''.join([random.SystemRandom().choice(chars) for _ in range(size)])
|
|
|
|
|
|
def get_contenttype(app_label: str, model: str):
|
|
"""Get ContentType instance by app_label and model"""
|
|
qs = ContentType.objects.filter(app_label=app_label, model=model)
|
|
if qs.exists():
|
|
return qs.first()
|
|
|
|
|
|
def image_url_valid(url: str):
|
|
"""
|
|
Check if requested URL is valid.
|
|
:param url: string
|
|
:return: boolean
|
|
"""
|
|
try:
|
|
assert url.startswith('http')
|
|
response = requests.request('head', url)
|
|
except Exception as e:
|
|
logger.info(f'ConnectionError: {e}')
|
|
else:
|
|
return response.status_code == status.HTTP_200_OK
|
|
|
|
|
|
def absolute_url_decorator(func):
|
|
def get_absolute_image_url(self, obj):
|
|
"""Get absolute image url"""
|
|
url_path = func(self, obj)
|
|
if url_path:
|
|
if url_path.startswith('/media/'):
|
|
return f'{settings.MEDIA_URL}{url_path}/'
|
|
else:
|
|
return url_path
|
|
|
|
return get_absolute_image_url
|
|
|
|
|
|
def get_point_from_coordinates(latitude: str, longitude: str):
|
|
if latitude and longitude:
|
|
return Point(x=longitude, y=latitude, srid=4326)
|
|
|
|
|
|
def namedtuplefetchall(cursor):
|
|
"""Return all rows from a cursor as a namedtuple."""
|
|
desc = cursor.description
|
|
nt_result = namedtuple('Result', [col[0] for col in desc])
|
|
return [nt_result(*row) for row in cursor.fetchall()]
|
|
|
|
|
|
def dictfetchall(cursor):
|
|
"Return all rows from a cursor as a dict"
|
|
columns = [col[0] for col in cursor.description]
|
|
return [
|
|
dict(zip(columns, row))
|
|
for row in cursor.fetchall()
|
|
]
|
|
|
|
|
|
def transform_into_readable_str(raw_string: str):
|
|
"""
|
|
Transform slug into section name, i.e:
|
|
like
|
|
"EffervescentRoseDeSaignee"
|
|
from
|
|
"effervescent-rose-de-saignee"
|
|
"""
|
|
re_exp = r'[\w]+'
|
|
result = re.findall(re_exp, raw_string)
|
|
if result:
|
|
return f"{''.join([i.capitalize() for i in result])}"
|
|
|
|
|
|
def transform_into_section_name(raw_string: str, postfix: str = 'SectionNode'):
|
|
"""
|
|
Transform slug into section name, i.e:
|
|
like
|
|
"EffervescentRoseDeSaigneeSectionNode"
|
|
from
|
|
"effervescent-rose-de-saignee"
|
|
"""
|
|
return f'{transform_into_readable_str(raw_string)}{postfix}'
|
|
|
|
|
|
def transform_camelcase_to_underscore(raw_string: str):
|
|
"""
|
|
Transform str, i.e:
|
|
from
|
|
"ContentPage"
|
|
to
|
|
"content_page"
|
|
"""
|
|
|
|
re_exp = r'[A-Z][^A-Z]*'
|
|
result = [i.lower() for i in re.findall(re_exp, raw_string) if i]
|
|
if result:
|
|
return reduce(lambda x, y: f'{x}_{y}', result)
|
|
else:
|
|
return raw_string
|
|
|
|
|
|
def section_name_into_index_name(section_name: str):
|
|
"""
|
|
Transform slug into section name, i.e:
|
|
like
|
|
"Restaurant"
|
|
from
|
|
"RestaurantSectionNode"
|
|
"""
|
|
re_exp = r'[A-Z][^A-Z]*'
|
|
result = re.findall(re_exp, section_name)
|
|
if result:
|
|
return f"{' '.join([word.capitalize() if i == 0 else word for i, word in enumerate(result[:-2])])}"
|
|
|
|
|
|
def get_url_images_in_text(text):
|
|
"""Find images urls in text"""
|
|
return re.findall(r'\<img.+src="([^"]+)".+>', text)
|
|
|
|
|
|
def get_image_meta_by_url(url) -> (int, int, int):
|
|
"""Returns image size (bytes, width, height)"""
|
|
image_raw = requests.get(url)
|
|
image = Image.open(BytesIO(image_raw.content))
|
|
width, height = image.size
|
|
return int(image_raw.headers.get('content-length')), width, height
|
|
|
|
|
|
def get_permission_classes(*args) -> list:
|
|
"""Return permission_class object with admin permissions."""
|
|
from rest_framework.permissions import IsAdminUser
|
|
from utils.permissions import IsCountryAdmin, IsReadOnly
|
|
|
|
admin_permission_classes = [IsCountryAdmin, IsAdminUser, IsReadOnly]
|
|
permission_classes = [
|
|
reduce(
|
|
or_, admin_permission_classes + list(args)
|
|
)
|
|
]
|
|
return permission_classes
|