gault-millau/apps/utils/export.py
2020-01-10 13:06:17 +03:00

606 lines
22 KiB
Python

import abc
import csv
import logging
import os
import tempfile
import xml.etree.ElementTree as ET
from smtplib import SMTPException
import docx
import xlsxwriter
from django.conf import settings
from django.core.mail import EmailMultiAlternatives
from docx.shared import RGBColor, Pt
from utils.methods import section_name_into_index_name
logging.basicConfig(format='[%(levelname)s] %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
class DocTemplate:
DOCUMENT_FONT_NAME = 'Palatino'
DOCUMENT_FONT_SIZE = Pt(12)
DOCUMENT_FONT_COLOR = RGBColor(0, 0, 0)
def __init__(self):
self.document = docx.Document()
style = self.document.styles['Normal']
style.font.size = self.DOCUMENT_FONT_SIZE
style.font.name = self.DOCUMENT_FONT_NAME
style.font.color.rgb = self.DOCUMENT_FONT_COLOR
def add_page_break(self):
self.document.add_page_break()
def add_empty_line(self):
self.document.add_paragraph()
def add_horizontal_line(self):
return self.document.add_paragraph(f'{"_" * 50}')
def add_bullet_list(self, elements: (tuple, list)):
styles = {
'name': 'Arial',
'size': Pt(10)
}
for element in elements:
bullet = self.document.add_paragraph(style='List Bullet')
bullet.paragraph_format.space_before = Pt(10)
bullet = bullet.add_run(element)
self.apply_font_style(bullet, params=styles)
def add_heading(self, name: str, level: int = 2, font_style: dict = None,
color_rgb: tuple = (0, 0, 0)):
heading = self.document.add_heading(level=level).add_run(name)
self.apply_font_style(heading, font_style)
if color_rgb:
self.apply_font_color(heading, color_rgb)
def add_paragraph(self, name: str, font_style: dict = None, color_rgb: tuple = (0, 0, 0)):
paragraph = self.document.add_paragraph()
paragraph.paragraph_format.space_before = Pt(10)
paragraph.add_run(name)
self.apply_font_style(paragraph, font_style)
if color_rgb:
self.apply_font_color(paragraph, color_rgb)
def apply_font_style(self, section, params: dict):
for attr, value in params.items():
if not hasattr(section, 'font'):
continue
setattr(section.font, attr, value)
def apply_font_color(self, section, color_rgb: tuple):
if len(color_rgb) == 3:
font = getattr(section, 'font', None)
color = getattr(font, 'color', None)
if font and color:
setattr(color, 'rgb', RGBColor(*color_rgb))
def template(self, data: list):
for obj in data:
obj = dict(obj)
index_name = section_name_into_index_name(obj.get('section_name'))
# ESTABLISHMENT HEADING (LEVEL 1)
self.add_heading(name=obj['name'],
font_style={'size': Pt(18), 'name': 'Palatino', 'bold': False},
level=1)
# ESTABLISHMENT TYPE PARAGRAPH
self.add_paragraph(name=index_name,
font_style={'size': Pt(8), 'name': 'Palatino', 'bold': False, 'italic': False})
self.add_empty_line()
# CITY HEADING (LEVEL 2)
self.add_heading(name='City',
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
level=2)
# CITY NAME HEADING (LEVEL 3)
self.add_heading(name=obj['city_name'],
font_style={'size': Pt(12), 'name': 'Arial', 'bold': True, 'italic': True},
color_rgb=(102, 102, 102))
self.add_empty_line()
# REVIEW HEADING (LEVEL 2)
review = obj.get('review')
if review:
self.add_heading(name='Review',
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
level=2)
for locale, text in review.items():
# REVIEW LOCALE HEADING (LEVEL 6)
self.add_heading(name=locale,
font_style={'size': Pt(11), 'name': 'Arial', 'underline': True, 'italic': True},
color_rgb=(102, 102, 102),
level=6)
# REVIEW TEXT PARAGRAPH
self.add_paragraph(name=text,
font_style={'size': Pt(10), 'name': 'Arial'})
self.add_empty_line()
# PHONE HEADING (LEVEL 2)
phones = obj.get('phones')
if phones:
self.add_heading(name='Phones',
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
level=2)
# PHONE NUMBER PARAGRAPH
self.add_bullet_list(phones)
self.add_empty_line()
# ADDRESS HEADING (LEVEL 2)
address = obj.get('address')
if address:
self.add_heading(name='Address',
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
level=2)
# ADDRESS DATA PARAGRAPH
self.add_paragraph(name=obj.get('address'),
font_style={'size': Pt(10), 'name': 'Arial'})
self.add_empty_line()
# TIMETABLE HEADING (LEVEL 2)
schedule = obj.get('schedule')
if schedule:
self.add_heading(name='Schedule',
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
level=2)
# TIMETABLE ITEMS PARAGRAPH
for weekday, working_hours in schedule.items():
bullet = self.document.add_paragraph(style='List Bullet')
bullet.paragraph_format.space_before = Pt(10)
bullet = bullet.add_run(f'{weekday}: {working_hours}')
self.apply_font_style(bullet, {'name': 'Arial', 'size': Pt(10)})
self.add_empty_line()
# PUBLIC MARK HEADING (LEVEL 2)
public_mark = obj.get('public_mark')
if public_mark:
self.add_heading(name='Mark',
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
level=2)
self.add_heading(name=public_mark,
font_style={'size': Pt(10), 'name': 'Arial'},
level=2)
self.add_empty_line()
# TOQUE HEADING (LEVEL 2)
toque = obj.get('toque_number')
if toque:
self.add_heading(name='Toque',
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
level=2)
self.add_heading(name=toque,
font_style={'size': Pt(10), 'name': 'Arial'},
level=2)
self.add_empty_line()
# TOQUE HEADING (LEVEL 2)
price_level = obj.get('price_level')
if price_level:
self.add_heading(name='Price level',
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
level=2)
self.add_heading(name=price_level,
font_style={'size': Pt(10), 'name': 'Arial'},
level=2)
self.add_empty_line()
# SERVICES HEADING (LEVEL 2)
services = obj.get('services')
if services:
self.add_heading(name='Services',
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
level=2)
# TIMETABLE ITEMS PARAGRAPH
self.add_bullet_list(services)
self.add_empty_line()
# METADATA HEADING (LEVEL 2)
metadata = obj.get('metadata')
if metadata:
for obj in metadata:
for section, tags in obj.items():
section = section.capitalize()
self.add_heading(name=section,
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
level=2)
# TIMETABLE ITEMS PARAGRAPH
self.add_bullet_list(tags)
self.add_empty_line()
# PAGE BREAK
self.add_page_break()
class SendExportBase:
"""Base class of export and sending data."""
def __init__(self):
self.success = False
self.email_from = settings.EMAIL_HOST_USER
self.email_subject = f'Export panel: {self.get_file_name()}'
self.email_body = 'Exported panel data'
self.file_path = os.path.join(
settings.STATIC_ROOT,
'email', tempfile.gettempdir(),
self.get_file_name()
)
self.type_mapper = {
"csv": self.make_csv_file,
"xls": self.make_xls_file
}
@abc.abstractmethod
def get_headers(self):
pass
@abc.abstractmethod
def get_emails_to(self):
return [
# 'a.feteleu@spider.ru',
# 'kuzmenko.da@gmail.com',
# 'sinapsit@yandex.ru'
]
@abc.abstractmethod
def get_data(self):
pass
@abc.abstractmethod
def get_file_name(self):
return ''
def make_csv_file(self):
file_header = self.get_headers()
if not self.success:
return
with open(self.file_path, 'w') as f:
file_writer = csv.writer(f, quotechar='"', quoting=csv.QUOTE_MINIMAL)
# Write headers to CSV file
file_writer.writerow(file_header)
for row in self.get_data():
file_writer.writerow(row)
def make_xls_file(self):
headings = self.get_headers()
if not self.success:
return
with xlsxwriter.Workbook(self.file_path) as workbook:
worksheet = workbook.add_worksheet()
# Add a bold format to use to highlight cells.
bold = workbook.add_format({'bold': True})
# Add the worksheet data that the charts will refer to.
data = self.get_data()
worksheet.write_row('A1', headings, bold)
for n, row in enumerate(data):
worksheet.write_row(f'A{n+2}', [str(i) for i in row])
workbook.close()
def make_xml_file(self):
pass
def make_doc_file(self):
pass
def send_email(self):
msg = EmailMultiAlternatives(
subject=self.email_subject,
body=self.email_body,
from_email=self.email_from,
to=self.get_emails_to()
)
# Create an inline attachment
if self.file_path and self.success:
msg.attach_file(self.file_path)
else:
msg.body = 'An error occurred while executing the request.'
try:
msg.send()
logger.debug(f"COMMUTATOR:Email successfully sent")
except SMTPException as e:
logger.error(f"COMMUTATOR:Email connector: {e}")
@abc.abstractmethod
def send(self):
pass
class SendExport(SendExportBase):
def __init__(self, panel, user, file_type='csv', **kwargs):
super().__init__()
self.panel = panel
self.user = user
self.file_type = file_type
self.get_file_method = self.type_mapper[file_type]
def get_emails_to(self):
return [self.user.email] + super().get_emails_to()
def get_file_name(self):
name = '_'.join(self.panel.name.split(' '))
return f'export_{name.lower()}.{self.file_type}'
def get_data(self):
return self.panel.get_data()
def get_headers(self):
try:
header = self.panel.get_headers()
self.success = True
return header
except Exception as err:
logger.info(f'HEADER:{err}')
def get_file(self):
if os.path.exists(self.file_path) and os.path.isfile(self.file_path):
with open(self.file_path, 'rb') as export_file:
return export_file
else:
logger.info('COMMUTATOR:image file not found dir: {path}')
def send(self):
self.get_file_method()
print(f'ok: {self.file_path}')
self.send_email()
class SendGuideExport(SendExportBase):
"""Send guid export."""
def __init__(self, data, guide, user, file_type='csv', **kwargs):
self.type_mapper = {
"csv": self.make_csv_file,
"xml": self.make_xml_file,
"doc": self.make_doc_file
}
self.guide = guide
self.data = data
self.user = user
self.file_type = file_type
self.get_file_method = self.type_mapper[file_type]
super().__init__()
def get_emails_to(self):
return [self.user.email] + super().get_emails_to()
def get_file_name(self):
name = self.guide.slug
return f'export_{name}.{self.file_type}'
def get_headers(self):
headers = list(self.get_data()[0].keys())
headers.pop(headers.index('node_name'))
self.success = True
return headers
def get_data(self):
excluded_data = ['guide', ]
if self.guide.guide_type in [self.guide.ARTISAN, self.guide.RESTAURANT]:
excluded_data.extend([
'product_name',
'product_review',
'product_type',
'product_subtypes',
'product_address',
'product_city',
'product_metadata',
'wine_color_section_name',
])
elif self.guide.guide_type == self.guide.WINE:
excluded_data.extend([
'public_mark',
'toque_number',
'schedule',
'address',
'phones',
'establishment_type',
'establishment_subtypes',
'review',
'price_level',
'metadata',
'public_mark',
'toque_number',
'schedule',
'phones',
'establishment_type',
'establishment_subtypes',
'city_name',
])
for obj in self.data:
for column in excluded_data:
obj.pop(column) if column in obj.keys() else None
return self.data
def get_doc_data(self):
init_data = self.get_data()
objects = []
city_name = None
section_name = None
ad_number_of_pages = None
ad_right_pages = None
for row in init_data:
row_advertorial = row.pop('advertorial')
if row_advertorial:
ad_number_of_pages = row_advertorial.get('number_of_pages')
ad_right_pages = row_advertorial.get('right_pages')
else:
row['ad_number_of_pages'] = ad_number_of_pages
row['ad_right_pages'] = ad_right_pages
row_city = row.get('city_name')
if row_city:
city_name = row_city
else:
row['city_name'] = city_name
row_section = row.get('node_name')
if row_section.endswith('SectionNode'):
section_name = row_section
else:
row['section_name'] = section_name
if row.pop('node_name', None) == 'EstablishmentNode':
objects.append(row.items())
return objects
def send(self):
self.get_file_method()
print(f'ok: {self.file_path}')
self.send_email()
def make_doc_file(self):
document = DocTemplate()
document.template(self.get_doc_data())
document.document.save(self.file_path)
self.success = True
def make_csv_file(self):
file_header = self.get_headers()
if not self.success:
return
with open(self.file_path, 'w') as f:
file_writer = csv.writer(f, quotechar='"', quoting=csv.QUOTE_MINIMAL)
# Write headers to CSV file
file_writer.writerow(file_header)
city = None
ad_number_of_pages = None
ad_right_pages = None
for row in self.get_data():
row_city = row.get('city_name')
if row_city:
city = row_city
else:
row['city_name'] = city
row_advertorial = row.get('advertorial')
if row_advertorial:
ad_number_of_pages = row_advertorial.get('number_of_pages')
ad_right_pages = row_advertorial.get('right_pages')
else:
row['ad_number_of_pages'] = ad_number_of_pages
row['ad_right_pages'] = ad_right_pages
if row.pop("node_name") == "EstablishmentNode":
file_writer.writerow(row.values())
def make_xml_file(self):
if self.guide.guide_type == self.guide.WINE:
# products
city = None
wine_color = None
establishment_name = None
establishment_address = None
ad_number_of_pages = None
ad_right_pages = None
# create the file structure
data = ET.Element('data')
products = ET.SubElement(data, 'products')
for row in self.get_data():
row_advertorial = row.pop('advertorial')
if row_advertorial:
ad_number_of_pages = row_advertorial.get('number_of_pages')
ad_right_pages = row_advertorial.get('right_pages')
else:
row['ad_number_of_pages'] = ad_number_of_pages
row['ad_right_pages'] = ad_right_pages
row_establishment_address = row.get('address')
if row_establishment_address:
establishment_address = row_establishment_address
else:
row['establishment_name'] = establishment_address
row_establishment = row.pop('name')
if row_establishment:
establishment_name = row_establishment
else:
row['establishment_name'] = establishment_name
row_city = row.get('product_city')
if row_city:
city = row_city
else:
row['city_name'] = city
row_wine_color = row.pop('wine_color_section_name')
if row_wine_color:
wine_color = row_wine_color
else:
row['wine_color'] = wine_color
if row.pop("node_name") == "WineNode":
product = ET.SubElement(products, 'product')
for key, value in row.items():
product.set(key, str(value)) if value else None
# create a new XML file with the results
tree = ET.ElementTree(data)
with open(self.file_path, 'bw') as f:
tree.write(f)
self.success = True
elif self.guide.guide_type in [self.guide.ARTISAN, self.guide.RESTAURANT]:
# establishment
objects = []
city = None
ad_number_of_pages = None
ad_right_pages = None
for row in self.get_data():
row_advertorial = row.pop('advertorial')
if row_advertorial:
ad_number_of_pages = row_advertorial.get('number_of_pages')
ad_right_pages = row_advertorial.get('right_pages')
else:
row['ad_number_of_pages'] = ad_number_of_pages
row['ad_right_pages'] = ad_right_pages
row_city = row.pop('city_name')
if row_city:
city = row_city
objects.append({'city': city, 'establishments': []})
if row.pop("node_name") == "EstablishmentNode":
city_index = [i for i, obj in enumerate(objects)
if obj['city'] == city][0]
establishments = objects[city_index].get('establishments')
establishments.append(row)
# create xml-structure
data = ET.Element('data')
cities_element = ET.SubElement(data, 'cities')
for city in objects:
city_element = ET.SubElement(cities_element, 'city', attrib={'name': city.get('city')})
establishments_element = ET.SubElement(city_element, 'establishments')
for establishment in city.get('establishments'):
establishment_element = ET.SubElement(establishments_element, 'establishment')
for key, value in establishment.items():
establishment_element.set(key, str(value)) if value else None
# create a new XML file with the results
tree = ET.ElementTree(data)
with open(self.file_path, 'bw') as f:
tree.write(f)
self.success = True