606 lines
22 KiB
Python
606 lines
22 KiB
Python
import abc
|
|
import csv
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
import xml.etree.ElementTree as ET
|
|
from smtplib import SMTPException
|
|
|
|
import docx
|
|
import xlsxwriter
|
|
from django.conf import settings
|
|
from django.core.mail import EmailMultiAlternatives
|
|
from docx.shared import RGBColor, Pt
|
|
|
|
from utils.methods import section_name_into_index_name
|
|
|
|
logging.basicConfig(format='[%(levelname)s] %(message)s', level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DocTemplate:
|
|
|
|
DOCUMENT_FONT_NAME = 'Palatino'
|
|
DOCUMENT_FONT_SIZE = Pt(12)
|
|
DOCUMENT_FONT_COLOR = RGBColor(0, 0, 0)
|
|
|
|
def __init__(self):
|
|
self.document = docx.Document()
|
|
style = self.document.styles['Normal']
|
|
style.font.size = self.DOCUMENT_FONT_SIZE
|
|
style.font.name = self.DOCUMENT_FONT_NAME
|
|
style.font.color.rgb = self.DOCUMENT_FONT_COLOR
|
|
|
|
def add_page_break(self):
|
|
self.document.add_page_break()
|
|
|
|
def add_empty_line(self):
|
|
self.document.add_paragraph()
|
|
|
|
def add_horizontal_line(self):
|
|
return self.document.add_paragraph(f'{"_" * 50}')
|
|
|
|
def add_bullet_list(self, elements: (tuple, list)):
|
|
styles = {
|
|
'name': 'Arial',
|
|
'size': Pt(10)
|
|
}
|
|
for element in elements:
|
|
bullet = self.document.add_paragraph(style='List Bullet')
|
|
bullet.paragraph_format.space_before = Pt(10)
|
|
bullet = bullet.add_run(element)
|
|
self.apply_font_style(bullet, params=styles)
|
|
|
|
def add_heading(self, name: str, level: int = 2, font_style: dict = None,
|
|
color_rgb: tuple = (0, 0, 0)):
|
|
heading = self.document.add_heading(level=level).add_run(name)
|
|
self.apply_font_style(heading, font_style)
|
|
if color_rgb:
|
|
self.apply_font_color(heading, color_rgb)
|
|
|
|
def add_paragraph(self, name: str, font_style: dict = None, color_rgb: tuple = (0, 0, 0)):
|
|
paragraph = self.document.add_paragraph()
|
|
paragraph.paragraph_format.space_before = Pt(10)
|
|
paragraph.add_run(name)
|
|
self.apply_font_style(paragraph, font_style)
|
|
if color_rgb:
|
|
self.apply_font_color(paragraph, color_rgb)
|
|
|
|
def apply_font_style(self, section, params: dict):
|
|
for attr, value in params.items():
|
|
if not hasattr(section, 'font'):
|
|
continue
|
|
setattr(section.font, attr, value)
|
|
|
|
def apply_font_color(self, section, color_rgb: tuple):
|
|
if len(color_rgb) == 3:
|
|
font = getattr(section, 'font', None)
|
|
color = getattr(font, 'color', None)
|
|
if font and color:
|
|
setattr(color, 'rgb', RGBColor(*color_rgb))
|
|
|
|
def template(self, data: list):
|
|
|
|
for obj in data:
|
|
obj = dict(obj)
|
|
index_name = section_name_into_index_name(obj.get('section_name'))
|
|
|
|
# ESTABLISHMENT HEADING (LEVEL 1)
|
|
self.add_heading(name=obj['name'],
|
|
font_style={'size': Pt(18), 'name': 'Palatino', 'bold': False},
|
|
level=1)
|
|
# ESTABLISHMENT TYPE PARAGRAPH
|
|
self.add_paragraph(name=index_name,
|
|
font_style={'size': Pt(8), 'name': 'Palatino', 'bold': False, 'italic': False})
|
|
self.add_empty_line()
|
|
|
|
# CITY HEADING (LEVEL 2)
|
|
self.add_heading(name='City',
|
|
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
|
|
level=2)
|
|
# CITY NAME HEADING (LEVEL 3)
|
|
self.add_heading(name=obj['city_name'],
|
|
font_style={'size': Pt(12), 'name': 'Arial', 'bold': True, 'italic': True},
|
|
color_rgb=(102, 102, 102))
|
|
self.add_empty_line()
|
|
|
|
# REVIEW HEADING (LEVEL 2)
|
|
review = obj.get('review')
|
|
if review:
|
|
self.add_heading(name='Review',
|
|
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
|
|
level=2)
|
|
for locale, text in review.items():
|
|
# REVIEW LOCALE HEADING (LEVEL 6)
|
|
self.add_heading(name=locale,
|
|
font_style={'size': Pt(11), 'name': 'Arial', 'underline': True, 'italic': True},
|
|
color_rgb=(102, 102, 102),
|
|
level=6)
|
|
# REVIEW TEXT PARAGRAPH
|
|
self.add_paragraph(name=text,
|
|
font_style={'size': Pt(10), 'name': 'Arial'})
|
|
self.add_empty_line()
|
|
|
|
# PHONE HEADING (LEVEL 2)
|
|
phones = obj.get('phones')
|
|
if phones:
|
|
self.add_heading(name='Phones',
|
|
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
|
|
level=2)
|
|
# PHONE NUMBER PARAGRAPH
|
|
self.add_bullet_list(phones)
|
|
self.add_empty_line()
|
|
|
|
# ADDRESS HEADING (LEVEL 2)
|
|
address = obj.get('address')
|
|
if address:
|
|
self.add_heading(name='Address',
|
|
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
|
|
level=2)
|
|
# ADDRESS DATA PARAGRAPH
|
|
self.add_paragraph(name=obj.get('address'),
|
|
font_style={'size': Pt(10), 'name': 'Arial'})
|
|
self.add_empty_line()
|
|
|
|
# TIMETABLE HEADING (LEVEL 2)
|
|
schedule = obj.get('schedule')
|
|
if schedule:
|
|
self.add_heading(name='Schedule',
|
|
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
|
|
level=2)
|
|
# TIMETABLE ITEMS PARAGRAPH
|
|
for weekday, working_hours in schedule.items():
|
|
bullet = self.document.add_paragraph(style='List Bullet')
|
|
bullet.paragraph_format.space_before = Pt(10)
|
|
bullet = bullet.add_run(f'{weekday}: {working_hours}')
|
|
self.apply_font_style(bullet, {'name': 'Arial', 'size': Pt(10)})
|
|
self.add_empty_line()
|
|
|
|
# PUBLIC MARK HEADING (LEVEL 2)
|
|
public_mark = obj.get('public_mark')
|
|
if public_mark:
|
|
self.add_heading(name='Mark',
|
|
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
|
|
level=2)
|
|
self.add_heading(name=public_mark,
|
|
font_style={'size': Pt(10), 'name': 'Arial'},
|
|
level=2)
|
|
self.add_empty_line()
|
|
|
|
# TOQUE HEADING (LEVEL 2)
|
|
toque = obj.get('toque_number')
|
|
if toque:
|
|
self.add_heading(name='Toque',
|
|
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
|
|
level=2)
|
|
self.add_heading(name=toque,
|
|
font_style={'size': Pt(10), 'name': 'Arial'},
|
|
level=2)
|
|
self.add_empty_line()
|
|
|
|
# TOQUE HEADING (LEVEL 2)
|
|
price_level = obj.get('price_level')
|
|
if price_level:
|
|
self.add_heading(name='Price level',
|
|
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
|
|
level=2)
|
|
self.add_heading(name=price_level,
|
|
font_style={'size': Pt(10), 'name': 'Arial'},
|
|
level=2)
|
|
self.add_empty_line()
|
|
|
|
# SERVICES HEADING (LEVEL 2)
|
|
services = obj.get('services')
|
|
if services:
|
|
self.add_heading(name='Services',
|
|
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
|
|
level=2)
|
|
# TIMETABLE ITEMS PARAGRAPH
|
|
self.add_bullet_list(services)
|
|
self.add_empty_line()
|
|
|
|
# METADATA HEADING (LEVEL 2)
|
|
metadata = obj.get('metadata')
|
|
if metadata:
|
|
for obj in metadata:
|
|
for section, tags in obj.items():
|
|
section = section.capitalize()
|
|
self.add_heading(name=section,
|
|
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
|
|
level=2)
|
|
# TIMETABLE ITEMS PARAGRAPH
|
|
self.add_bullet_list(tags)
|
|
self.add_empty_line()
|
|
|
|
# PAGE BREAK
|
|
self.add_page_break()
|
|
|
|
|
|
class SendExportBase:
|
|
"""Base class of export and sending data."""
|
|
|
|
def __init__(self):
|
|
self.success = False
|
|
self.email_from = settings.EMAIL_HOST_USER
|
|
self.email_subject = f'Export panel: {self.get_file_name()}'
|
|
self.email_body = 'Exported panel data'
|
|
self.file_path = os.path.join(
|
|
settings.STATIC_ROOT,
|
|
'email', tempfile.gettempdir(),
|
|
self.get_file_name()
|
|
)
|
|
self.type_mapper = {
|
|
"csv": self.make_csv_file,
|
|
"xls": self.make_xls_file
|
|
}
|
|
|
|
@abc.abstractmethod
|
|
def get_headers(self):
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def get_emails_to(self):
|
|
return [
|
|
# 'a.feteleu@spider.ru',
|
|
# 'kuzmenko.da@gmail.com',
|
|
# 'sinapsit@yandex.ru'
|
|
]
|
|
|
|
@abc.abstractmethod
|
|
def get_data(self):
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def get_file_name(self):
|
|
return ''
|
|
|
|
def make_csv_file(self):
|
|
file_header = self.get_headers()
|
|
if not self.success:
|
|
return
|
|
with open(self.file_path, 'w') as f:
|
|
file_writer = csv.writer(f, quotechar='"', quoting=csv.QUOTE_MINIMAL)
|
|
# Write headers to CSV file
|
|
file_writer.writerow(file_header)
|
|
for row in self.get_data():
|
|
file_writer.writerow(row)
|
|
|
|
def make_xls_file(self):
|
|
headings = self.get_headers()
|
|
if not self.success:
|
|
return
|
|
with xlsxwriter.Workbook(self.file_path) as workbook:
|
|
worksheet = workbook.add_worksheet()
|
|
|
|
# Add a bold format to use to highlight cells.
|
|
bold = workbook.add_format({'bold': True})
|
|
|
|
# Add the worksheet data that the charts will refer to.
|
|
data = self.get_data()
|
|
|
|
worksheet.write_row('A1', headings, bold)
|
|
for n, row in enumerate(data):
|
|
worksheet.write_row(f'A{n+2}', [str(i) for i in row])
|
|
workbook.close()
|
|
|
|
def make_xml_file(self):
|
|
pass
|
|
|
|
def make_doc_file(self):
|
|
pass
|
|
|
|
def send_email(self):
|
|
|
|
msg = EmailMultiAlternatives(
|
|
subject=self.email_subject,
|
|
body=self.email_body,
|
|
from_email=self.email_from,
|
|
to=self.get_emails_to()
|
|
)
|
|
|
|
# Create an inline attachment
|
|
if self.file_path and self.success:
|
|
msg.attach_file(self.file_path)
|
|
else:
|
|
msg.body = 'An error occurred while executing the request.'
|
|
|
|
try:
|
|
msg.send()
|
|
logger.debug(f"COMMUTATOR:Email successfully sent")
|
|
except SMTPException as e:
|
|
logger.error(f"COMMUTATOR:Email connector: {e}")
|
|
|
|
@abc.abstractmethod
|
|
def send(self):
|
|
pass
|
|
|
|
|
|
class SendExport(SendExportBase):
|
|
|
|
def __init__(self, panel, user, file_type='csv', **kwargs):
|
|
super().__init__()
|
|
self.panel = panel
|
|
self.user = user
|
|
self.file_type = file_type
|
|
self.get_file_method = self.type_mapper[file_type]
|
|
|
|
def get_emails_to(self):
|
|
return [self.user.email] + super().get_emails_to()
|
|
|
|
def get_file_name(self):
|
|
name = '_'.join(self.panel.name.split(' '))
|
|
return f'export_{name.lower()}.{self.file_type}'
|
|
|
|
def get_data(self):
|
|
return self.panel.get_data()
|
|
|
|
def get_headers(self):
|
|
try:
|
|
header = self.panel.get_headers()
|
|
self.success = True
|
|
return header
|
|
except Exception as err:
|
|
logger.info(f'HEADER:{err}')
|
|
|
|
def get_file(self):
|
|
if os.path.exists(self.file_path) and os.path.isfile(self.file_path):
|
|
with open(self.file_path, 'rb') as export_file:
|
|
return export_file
|
|
else:
|
|
logger.info('COMMUTATOR:image file not found dir: {path}')
|
|
|
|
def send(self):
|
|
self.get_file_method()
|
|
print(f'ok: {self.file_path}')
|
|
self.send_email()
|
|
|
|
|
|
class SendGuideExport(SendExportBase):
|
|
"""Send guid export."""
|
|
|
|
def __init__(self, data, guide, user, file_type='csv', **kwargs):
|
|
|
|
self.type_mapper = {
|
|
"csv": self.make_csv_file,
|
|
"xml": self.make_xml_file,
|
|
"doc": self.make_doc_file
|
|
}
|
|
self.guide = guide
|
|
self.data = data
|
|
self.user = user
|
|
self.file_type = file_type
|
|
self.get_file_method = self.type_mapper[file_type]
|
|
super().__init__()
|
|
|
|
def get_emails_to(self):
|
|
return [self.user.email] + super().get_emails_to()
|
|
|
|
def get_file_name(self):
|
|
name = self.guide.slug
|
|
return f'export_{name}.{self.file_type}'
|
|
|
|
def get_headers(self):
|
|
headers = list(self.get_data()[0].keys())
|
|
headers.pop(headers.index('node_name'))
|
|
self.success = True
|
|
return headers
|
|
|
|
def get_data(self):
|
|
excluded_data = ['guide', ]
|
|
if self.guide.guide_type in [self.guide.ARTISAN, self.guide.RESTAURANT]:
|
|
excluded_data.extend([
|
|
'product_name',
|
|
'product_review',
|
|
'product_type',
|
|
'product_subtypes',
|
|
'product_address',
|
|
'product_city',
|
|
'product_metadata',
|
|
'wine_color_section_name',
|
|
|
|
])
|
|
elif self.guide.guide_type == self.guide.WINE:
|
|
excluded_data.extend([
|
|
'public_mark',
|
|
'toque_number',
|
|
'schedule',
|
|
'address',
|
|
'phones',
|
|
'establishment_type',
|
|
'establishment_subtypes',
|
|
'review',
|
|
'price_level',
|
|
'metadata',
|
|
'public_mark',
|
|
'toque_number',
|
|
'schedule',
|
|
'phones',
|
|
'establishment_type',
|
|
'establishment_subtypes',
|
|
'city_name',
|
|
])
|
|
|
|
for obj in self.data:
|
|
for column in excluded_data:
|
|
obj.pop(column) if column in obj.keys() else None
|
|
return self.data
|
|
|
|
def get_doc_data(self):
|
|
init_data = self.get_data()
|
|
objects = []
|
|
city_name = None
|
|
section_name = None
|
|
ad_number_of_pages = None
|
|
ad_right_pages = None
|
|
|
|
for row in init_data:
|
|
row_advertorial = row.pop('advertorial')
|
|
if row_advertorial:
|
|
ad_number_of_pages = row_advertorial.get('number_of_pages')
|
|
ad_right_pages = row_advertorial.get('right_pages')
|
|
else:
|
|
row['ad_number_of_pages'] = ad_number_of_pages
|
|
row['ad_right_pages'] = ad_right_pages
|
|
|
|
row_city = row.get('city_name')
|
|
if row_city:
|
|
city_name = row_city
|
|
else:
|
|
row['city_name'] = city_name
|
|
|
|
row_section = row.get('node_name')
|
|
if row_section.endswith('SectionNode'):
|
|
section_name = row_section
|
|
else:
|
|
row['section_name'] = section_name
|
|
|
|
if row.pop('node_name', None) == 'EstablishmentNode':
|
|
objects.append(row.items())
|
|
return objects
|
|
|
|
def send(self):
|
|
self.get_file_method()
|
|
print(f'ok: {self.file_path}')
|
|
self.send_email()
|
|
|
|
def make_doc_file(self):
|
|
document = DocTemplate()
|
|
document.template(self.get_doc_data())
|
|
document.document.save(self.file_path)
|
|
self.success = True
|
|
|
|
def make_csv_file(self):
|
|
file_header = self.get_headers()
|
|
if not self.success:
|
|
return
|
|
|
|
with open(self.file_path, 'w') as f:
|
|
file_writer = csv.writer(f, quotechar='"', quoting=csv.QUOTE_MINIMAL)
|
|
# Write headers to CSV file
|
|
file_writer.writerow(file_header)
|
|
city = None
|
|
ad_number_of_pages = None
|
|
ad_right_pages = None
|
|
|
|
for row in self.get_data():
|
|
row_city = row.get('city_name')
|
|
if row_city:
|
|
city = row_city
|
|
else:
|
|
row['city_name'] = city
|
|
|
|
row_advertorial = row.get('advertorial')
|
|
if row_advertorial:
|
|
ad_number_of_pages = row_advertorial.get('number_of_pages')
|
|
ad_right_pages = row_advertorial.get('right_pages')
|
|
else:
|
|
row['ad_number_of_pages'] = ad_number_of_pages
|
|
row['ad_right_pages'] = ad_right_pages
|
|
|
|
if row.pop("node_name") == "EstablishmentNode":
|
|
file_writer.writerow(row.values())
|
|
|
|
def make_xml_file(self):
|
|
if self.guide.guide_type == self.guide.WINE:
|
|
# products
|
|
city = None
|
|
wine_color = None
|
|
establishment_name = None
|
|
establishment_address = None
|
|
ad_number_of_pages = None
|
|
ad_right_pages = None
|
|
|
|
# create the file structure
|
|
data = ET.Element('data')
|
|
products = ET.SubElement(data, 'products')
|
|
|
|
for row in self.get_data():
|
|
row_advertorial = row.pop('advertorial')
|
|
if row_advertorial:
|
|
ad_number_of_pages = row_advertorial.get('number_of_pages')
|
|
ad_right_pages = row_advertorial.get('right_pages')
|
|
else:
|
|
row['ad_number_of_pages'] = ad_number_of_pages
|
|
row['ad_right_pages'] = ad_right_pages
|
|
|
|
row_establishment_address = row.get('address')
|
|
if row_establishment_address:
|
|
establishment_address = row_establishment_address
|
|
else:
|
|
row['establishment_name'] = establishment_address
|
|
|
|
row_establishment = row.pop('name')
|
|
if row_establishment:
|
|
establishment_name = row_establishment
|
|
else:
|
|
row['establishment_name'] = establishment_name
|
|
|
|
row_city = row.get('product_city')
|
|
if row_city:
|
|
city = row_city
|
|
else:
|
|
row['city_name'] = city
|
|
|
|
row_wine_color = row.pop('wine_color_section_name')
|
|
if row_wine_color:
|
|
wine_color = row_wine_color
|
|
else:
|
|
row['wine_color'] = wine_color
|
|
|
|
if row.pop("node_name") == "WineNode":
|
|
product = ET.SubElement(products, 'product')
|
|
for key, value in row.items():
|
|
product.set(key, str(value)) if value else None
|
|
|
|
# create a new XML file with the results
|
|
tree = ET.ElementTree(data)
|
|
with open(self.file_path, 'bw') as f:
|
|
tree.write(f)
|
|
self.success = True
|
|
|
|
elif self.guide.guide_type in [self.guide.ARTISAN, self.guide.RESTAURANT]:
|
|
# establishment
|
|
objects = []
|
|
city = None
|
|
ad_number_of_pages = None
|
|
ad_right_pages = None
|
|
|
|
for row in self.get_data():
|
|
row_advertorial = row.pop('advertorial')
|
|
if row_advertorial:
|
|
ad_number_of_pages = row_advertorial.get('number_of_pages')
|
|
ad_right_pages = row_advertorial.get('right_pages')
|
|
else:
|
|
row['ad_number_of_pages'] = ad_number_of_pages
|
|
row['ad_right_pages'] = ad_right_pages
|
|
|
|
row_city = row.pop('city_name')
|
|
if row_city:
|
|
city = row_city
|
|
objects.append({'city': city, 'establishments': []})
|
|
|
|
if row.pop("node_name") == "EstablishmentNode":
|
|
city_index = [i for i, obj in enumerate(objects)
|
|
if obj['city'] == city][0]
|
|
establishments = objects[city_index].get('establishments')
|
|
establishments.append(row)
|
|
|
|
# create xml-structure
|
|
data = ET.Element('data')
|
|
cities_element = ET.SubElement(data, 'cities')
|
|
|
|
for city in objects:
|
|
city_element = ET.SubElement(cities_element, 'city', attrib={'name': city.get('city')})
|
|
|
|
establishments_element = ET.SubElement(city_element, 'establishments')
|
|
for establishment in city.get('establishments'):
|
|
establishment_element = ET.SubElement(establishments_element, 'establishment')
|
|
for key, value in establishment.items():
|
|
establishment_element.set(key, str(value)) if value else None
|
|
|
|
# create a new XML file with the results
|
|
tree = ET.ElementTree(data)
|
|
with open(self.file_path, 'bw') as f:
|
|
tree.write(f)
|
|
self.success = True
|