gault-millau/apps/utils/export.py
2019-12-27 15:59:15 +03:00

435 lines
15 KiB
Python

import abc
import csv
import logging
import os
import tempfile
from smtplib import SMTPException
import docx
import xlsxwriter
from django.conf import settings
from django.core.mail import EmailMultiAlternatives
from docx.blkcntnr import BlockItemContainer
from timetable.models import Timetable
from docx.shared import RGBColor, Pt
from utils.methods import section_name_into_index_name
logging.basicConfig(format='[%(levelname)s] %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
class DocTemplate:
DOCUMENT_FONT_NAME = 'Palatino'
DOCUMENT_FONT_SIZE = Pt(12)
DOCUMENT_FONT_COLOR = RGBColor(0, 0, 0)
def __init__(self):
self.document = docx.Document()
style = self.document.styles['Normal']
style.paragraph_format.space_before = Pt(12)
style.font.size = self.DOCUMENT_FONT_SIZE
style.font.name = self.DOCUMENT_FONT_NAME
style.font.color.rgb = self.DOCUMENT_FONT_COLOR
def add_page_break(self):
self.document.add_page_break()
def add_empty_line(self):
self.document.add_paragraph()
self.document.add_paragraph()
def add_horizontal_line(self):
return self.document.add_paragraph(f'{"_" * 50}')
def add_bullet_list(self, elements: (tuple, list)):
styles = {
'name': 'Arial',
'size': Pt(10)
}
for element in elements:
bullet = self.document.add_paragraph(style='List Bullet').add_run(element)
self.apply_font_style(bullet, params=styles)
def add_heading(self, name: str, level: int = 2, font_style: dict = None,
color_rgb: tuple = (0, 0, 0)):
heading = self.document.add_heading(level=level).add_run(name)
self.apply_font_style(heading, font_style)
if color_rgb:
self.apply_font_color(heading, color_rgb)
def add_paragraph(self, name: str, font_style: dict = None, color_rgb: tuple = (0, 0, 0)):
paragraph = self.document.add_paragraph().add_run(name)
self.apply_font_style(paragraph, font_style)
if color_rgb:
self.apply_font_color(paragraph, color_rgb)
def apply_font_style(self, section, params: dict):
for attr, value in params.items():
if not hasattr(section, 'font'):
continue
setattr(section.font, attr, value)
def apply_font_color(self, section, color_rgb: tuple):
if len(color_rgb) == 3:
font = getattr(section, 'font', None)
color = getattr(font, 'color', None)
if font and color:
setattr(color, 'rgb', RGBColor(*color_rgb))
def template(self, data: list):
for instance in data:
instance = dict(instance)
index_name = section_name_into_index_name(instance.get('section_name'))
# ESTABLISHMENT HEADING (LEVEL 1)
self.add_heading(name=instance['name'],
font_style={'size': Pt(18), 'name': 'Palatino', 'bold': False},
level=1)
# ESTABLISHMENT TYPE PARAGRAPH
self.add_paragraph(name=index_name,
font_style={'size': Pt(8), 'name': 'Palatino', 'bold': False, 'italic': False})
self.add_empty_line()
# CITY HEADING (LEVEL 2)
self.add_heading(name='City',
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
level=2)
# CITY NAME HEADING (LEVEL 3)
self.add_heading(name=instance['city_name'],
font_style={'size': Pt(12), 'name': 'Arial', 'bold': True, 'italic': True},
color_rgb=(102, 102, 102))
self.add_empty_line()
# REVIEW HEADING (LEVEL 2)
review = instance.get('review')
if review:
self.add_heading(name='Review',
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
level=2)
for locale, text in review.items():
# REVIEW LOCALE HEADING (LEVEL 6)
self.add_heading(name=locale,
font_style={'size': Pt(11), 'name': 'Arial', 'underline': True, 'italic': True},
color_rgb=(102, 102, 102),
level=6)
# REVIEW TEXT PARAGRAPH
self.add_paragraph(name=text,
font_style={'size': Pt(10), 'name': 'Arial'})
# PHONE HEADING (LEVEL 2)
phones = instance.get('phones')
if phones:
self.add_heading(name='Phones',
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
level=2)
# PHONE NUMBER PARAGRAPH
self.add_bullet_list(phones)
self.add_empty_line()
# ADDRESS HEADING (LEVEL 2)
address = instance.get('address')
if address:
self.add_heading(name='Address',
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
level=2)
# ADDRESS DATA PARAGRAPH
self.add_paragraph(name=instance.get('address'),
font_style={'size': Pt(10), 'name': 'Arial'})
self.add_empty_line()
# TIMETABLE HEADING (LEVEL 2)
schedule = instance.get('schedule')
if schedule:
self.add_heading(name='Schedule',
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
level=2)
# TIMETABLE ITEMS PARAGRAPH
for weekday, working_hours in schedule.items():
bullet = self.document.add_paragraph(style='List Bullet').add_run(f'{weekday}: {working_hours}')
self.apply_font_style(bullet, {'name': 'Arial', 'size': Pt(10)})
self.add_empty_line()
# PUBLIC MARK HEADING (LEVEL 2)
public_mark = instance.get('public_mark')
if public_mark:
self.add_heading(name='Mark',
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
level=2)
self.add_heading(name=public_mark,
font_style={'size': Pt(10), 'name': 'Arial'},
level=2)
# TOQUE HEADING (LEVEL 2)
toque = instance.get('toque_number')
if toque:
self.add_heading(name='Toque',
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
level=2)
self.add_heading(name=toque,
font_style={'size': Pt(10), 'name': 'Arial'},
level=2)
# TOQUE HEADING (LEVEL 2)
price_level = instance.get('price_level')
if price_level:
self.add_heading(name='Price level',
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
level=2)
self.add_heading(name=price_level,
font_style={'size': Pt(10), 'name': 'Arial'},
level=2)
# SERVICES HEADING (LEVEL 2)
services = instance.get('services')
if services:
self.add_heading(name='Services',
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
level=2)
# TIMETABLE ITEMS PARAGRAPH
self.add_bullet_list(services)
self.add_empty_line()
# METADATA HEADING (LEVEL 2)
metadata = instance.get('metadata')
if metadata:
for obj in metadata:
for section, tags in obj.items():
self.add_heading(name=section,
font_style={'size': Pt(13), 'name': 'Arial', 'bold': True},
level=2)
# TIMETABLE ITEMS PARAGRAPH
self.add_bullet_list(tags)
self.add_empty_line()
# PAGE BREAK
self.add_page_break()
class SendExportBase:
"""Base class of export and sending data."""
def __init__(self):
self.success = False
self.email_from = settings.EMAIL_HOST_USER
self.email_subject = f'Export panel: {self.get_file_name()}'
self.email_body = 'Exported panel data'
self.file_path = os.path.join(
settings.STATIC_ROOT,
'email', tempfile.gettempdir(),
self.get_file_name()
)
self.type_mapper = {
"csv": self.make_csv_file,
"xls": self.make_xls_file
}
@abc.abstractmethod
def get_headers(self):
pass
@abc.abstractmethod
def get_emails_to(self):
return [
'a.feteleu@spider.ru',
# 'kuzmenko.da@gmail.com',
# 'sinapsit@yandex.ru'
]
@abc.abstractmethod
def get_data(self):
pass
@abc.abstractmethod
def get_file_name(self):
return ''
def make_csv_file(self):
file_header = self.get_headers()
if not self.success:
return
with open(self.file_path, 'w') as f:
file_writer = csv.writer(f, quotechar='"', quoting=csv.QUOTE_MINIMAL)
# Write headers to CSV file
file_writer.writerow(file_header)
for row in self.get_data():
file_writer.writerow(row)
def make_xls_file(self):
headings = self.get_headers()
if not self.success:
return
with xlsxwriter.Workbook(self.file_path) as workbook:
worksheet = workbook.add_worksheet()
# Add a bold format to use to highlight cells.
bold = workbook.add_format({'bold': True})
# Add the worksheet data that the charts will refer to.
data = self.get_data()
worksheet.write_row('A1', headings, bold)
for n, row in enumerate(data):
worksheet.write_row(f'A{n+2}', [str(i) for i in row])
workbook.close()
def make_xml_file(self):
pass
def make_doc_file(self):
pass
def send_email(self):
msg = EmailMultiAlternatives(
subject=self.email_subject,
body=self.email_body,
from_email=self.email_from,
to=self.get_emails_to()
)
# Create an inline attachment
if self.file_path and self.success:
msg.attach_file(self.file_path)
else:
msg.body = 'An error occurred while executing the request.'
try:
msg.send()
logger.debug(f"COMMUTATOR:Email successfully sent")
except SMTPException as e:
logger.error(f"COMMUTATOR:Email connector: {e}")
@abc.abstractmethod
def send(self):
pass
class SendExport(SendExportBase):
def __init__(self, panel, user, file_type='csv', **kwargs):
super().__init__()
self.panel = panel
self.user = user
self.file_type = file_type
self.get_file_method = self.type_mapper[file_type]
def get_emails_to(self):
return [self.user.email] + super().get_emails_to()
def get_file_name(self):
name = '_'.join(self.panel.name.split(' '))
return f'export_{name.lower()}.{self.file_type}'
def get_data(self):
return self.panel.get_data()
def get_headers(self):
try:
header = self.panel.get_headers()
self.success = True
return header
except Exception as err:
logger.info(f'HEADER:{err}')
def get_file(self):
if os.path.exists(self.file_path) and os.path.isfile(self.file_path):
with open(self.file_path, 'rb') as export_file:
return export_file
else:
logger.info('COMMUTATOR:image file not found dir: {path}')
def send(self):
self.get_file_method()
print(f'ok: {self.file_path}')
self.send_email()
class SendGuideExport(SendExportBase):
"""Send guid export."""
def __init__(self, data, guide, user, file_type='csv', **kwargs):
self.type_mapper = {
"csv": self.make_csv_file,
"xml": self.make_xml_file,
"doc": self.make_doc_file
}
self.guide = guide
self.data = data
self.user = user
self.file_type = file_type
self.get_file_method = self.type_mapper[file_type]
super().__init__()
def get_emails_to(self):
return [self.user.email] + super().get_emails_to()
def get_file_name(self):
name = self.guide.slug
return f'export_{name}.{self.file_type}'
def make_doc_file(self):
document = DocTemplate()
document.template(self.get_doc_data())
document.document.save(self.file_path)
self.success = True
def get_headers(self):
headers = list(self.data[0].keys())
headers.pop(headers.index('node_name'))
self.success = True
return headers
def get_data(self):
return self.data
def get_doc_data(self):
init_data = self.get_data()
objects = []
city_name = None
section_name = None
for instance in init_data:
row_city = instance.get('city_name')
if row_city:
city_name = row_city
else:
instance['city_name'] = city_name
row_section = instance.get('node_name')
if row_section.endswith('SectionNode'):
section_name = row_section
else:
instance['section_name'] = section_name
if instance.pop('node_name', None) == 'EstablishmentNode':
objects.append(instance.items())
return objects
def send(self):
self.get_file_method()
print(f'ok: {self.file_path}')
self.send_email()
def make_csv_file(self):
file_header = self.get_headers()
if not self.success:
return
with open(self.file_path, 'w') as f:
file_writer = csv.writer(f, quotechar='"', quoting=csv.QUOTE_MINIMAL)
# Write headers to CSV file
file_writer.writerow(file_header)
city = None
for row in self.get_data():
row_city = row.get('city_name')
if row_city:
city = row_city
else:
row['city_name'] = city
if row.pop("node_name") == "EstablishmentNode":
file_writer.writerow(row.values())