import abc import csv import logging import os import tempfile from smtplib import SMTPException import docx import xlsxwriter from django.conf import settings from django.core.mail import EmailMultiAlternatives from docx.blkcntnr import BlockItemContainer from timetable.models import Timetable from docx.shared import RGBColor, Pt import xml.etree.ElementTree as ET from utils.methods import section_name_into_index_name logging.basicConfig(format='[%(levelname)s] %(message)s', level=logging.INFO) logger = logging.getLogger(__name__) class DocTemplate: DOCUMENT_FONT_NAME = 'Palatino' DOCUMENT_FONT_SIZE = Pt(12) DOCUMENT_FONT_COLOR = RGBColor(0, 0, 0) def __init__(self): self.document = docx.Document() style = self.document.styles['Normal'] style.font.size = self.DOCUMENT_FONT_SIZE style.font.name = self.DOCUMENT_FONT_NAME style.font.color.rgb = self.DOCUMENT_FONT_COLOR def add_page_break(self): self.document.add_page_break() def add_empty_line(self): self.document.add_paragraph() def add_horizontal_line(self): return self.document.add_paragraph(f'{"_" * 50}') def add_bullet_list(self, elements: (tuple, list)): styles = { 'name': 'Arial', 'size': Pt(10) } for element in elements: bullet = self.document.add_paragraph(style='List Bullet') bullet.paragraph_format.space_before = Pt(10) bullet = bullet.add_run(element) self.apply_font_style(bullet, params=styles) def add_heading(self, name: str, level: int = 2, font_style: dict = None, color_rgb: tuple = (0, 0, 0)): heading = self.document.add_heading(level=level).add_run(name) self.apply_font_style(heading, font_style) if color_rgb: self.apply_font_color(heading, color_rgb) def add_paragraph(self, name: str, font_style: dict = None, color_rgb: tuple = (0, 0, 0)): paragraph = self.document.add_paragraph() paragraph.paragraph_format.space_before = Pt(10) paragraph.add_run(name) self.apply_font_style(paragraph, font_style) if color_rgb: self.apply_font_color(paragraph, color_rgb) def apply_font_style(self, section, params: dict): for attr, value in params.items(): if not hasattr(section, 'font'): continue setattr(section.font, attr, value) def apply_font_color(self, section, color_rgb: tuple): if len(color_rgb) == 3: font = getattr(section, 'font', None) color = getattr(font, 'color', None) if font and color: setattr(color, 'rgb', RGBColor(*color_rgb)) def template(self, data: list): for instance in data: instance = dict(instance) element_id = instance.get('id') index_name = section_name_into_index_name(instance.get('section_name')) # ESTABLISHMENT HEADING (LEVEL 1) self.add_heading(name=instance['name'], font_style={'size': Pt(18), 'name': 'Palatino', 'bold': False}, level=1) # ESTABLISHMENT TYPE PARAGRAPH self.add_paragraph(name=index_name, font_style={'size': Pt(8), 'name': 'Palatino', 'bold': False, 'italic': False}) self.add_empty_line() # CITY HEADING (LEVEL 2) self.add_heading(name='City', font_style={'size': Pt(13), 'name': 'Arial', 'bold': True}, level=2) # CITY NAME HEADING (LEVEL 3) self.add_heading(name=instance['city_name'], font_style={'size': Pt(12), 'name': 'Arial', 'bold': True, 'italic': True}, color_rgb=(102, 102, 102)) self.add_empty_line() # REVIEW HEADING (LEVEL 2) review = instance.get('review') if review: self.add_heading(name='Review', font_style={'size': Pt(13), 'name': 'Arial', 'bold': True}, level=2) for locale, text in review.items(): # REVIEW LOCALE HEADING (LEVEL 6) self.add_heading(name=locale, font_style={'size': Pt(11), 'name': 'Arial', 'underline': True, 'italic': True}, color_rgb=(102, 102, 102), level=6) # REVIEW TEXT PARAGRAPH self.add_paragraph(name=text, font_style={'size': Pt(10), 'name': 'Arial'}) self.add_empty_line() # PHONE HEADING (LEVEL 2) phones = instance.get('phones') if phones: self.add_heading(name='Phones', font_style={'size': Pt(13), 'name': 'Arial', 'bold': True}, level=2) # PHONE NUMBER PARAGRAPH self.add_bullet_list(phones) self.add_empty_line() # ADDRESS HEADING (LEVEL 2) address = instance.get('address') if address: self.add_heading(name='Address', font_style={'size': Pt(13), 'name': 'Arial', 'bold': True}, level=2) # ADDRESS DATA PARAGRAPH self.add_paragraph(name=instance.get('address'), font_style={'size': Pt(10), 'name': 'Arial'}) self.add_empty_line() # TIMETABLE HEADING (LEVEL 2) schedule = instance.get('schedule') if schedule: self.add_heading(name='Schedule', font_style={'size': Pt(13), 'name': 'Arial', 'bold': True}, level=2) # TIMETABLE ITEMS PARAGRAPH for weekday, working_hours in schedule.items(): bullet = self.document.add_paragraph(style='List Bullet') bullet.paragraph_format.space_before = Pt(10) bullet = bullet.add_run(f'{weekday}: {working_hours}') self.apply_font_style(bullet, {'name': 'Arial', 'size': Pt(10)}) self.add_empty_line() # PUBLIC MARK HEADING (LEVEL 2) public_mark = instance.get('public_mark') if public_mark: self.add_heading(name='Mark', font_style={'size': Pt(13), 'name': 'Arial', 'bold': True}, level=2) self.add_heading(name=public_mark, font_style={'size': Pt(10), 'name': 'Arial'}, level=2) self.add_empty_line() # TOQUE HEADING (LEVEL 2) toque = instance.get('toque_number') if toque: self.add_heading(name='Toque', font_style={'size': Pt(13), 'name': 'Arial', 'bold': True}, level=2) self.add_heading(name=toque, font_style={'size': Pt(10), 'name': 'Arial'}, level=2) self.add_empty_line() # TOQUE HEADING (LEVEL 2) price_level = instance.get('price_level') if price_level: self.add_heading(name='Price level', font_style={'size': Pt(13), 'name': 'Arial', 'bold': True}, level=2) self.add_heading(name=price_level, font_style={'size': Pt(10), 'name': 'Arial'}, level=2) self.add_empty_line() # SERVICES HEADING (LEVEL 2) services = instance.get('services') if services: self.add_heading(name='Services', font_style={'size': Pt(13), 'name': 'Arial', 'bold': True}, level=2) # TIMETABLE ITEMS PARAGRAPH self.add_bullet_list(services) self.add_empty_line() # METADATA HEADING (LEVEL 2) metadata = instance.get('metadata') if metadata: for obj in metadata: for section, tags in obj.items(): section = section.capitalize() self.add_heading(name=section, font_style={'size': Pt(13), 'name': 'Arial', 'bold': True}, level=2) # TIMETABLE ITEMS PARAGRAPH self.add_bullet_list(tags) self.add_empty_line() # PAGE BREAK self.add_page_break() class SendExportBase: """Base class of export and sending data.""" def __init__(self): self.success = False self.email_from = settings.EMAIL_HOST_USER self.email_subject = f'Export panel: {self.get_file_name()}' self.email_body = 'Exported panel data' self.file_path = os.path.join( settings.STATIC_ROOT, 'email', tempfile.gettempdir(), self.get_file_name() ) self.type_mapper = { "csv": self.make_csv_file, "xls": self.make_xls_file } @abc.abstractmethod def get_headers(self): pass @abc.abstractmethod def get_emails_to(self): return [ # 'a.feteleu@spider.ru', 'kuzmenko.da@gmail.com', # 'sinapsit@yandex.ru' ] @abc.abstractmethod def get_data(self): pass @abc.abstractmethod def get_file_name(self): return '' def make_csv_file(self): file_header = self.get_headers() if not self.success: return with open(self.file_path, 'w') as f: file_writer = csv.writer(f, quotechar='"', quoting=csv.QUOTE_MINIMAL) # Write headers to CSV file file_writer.writerow(file_header) for row in self.get_data(): file_writer.writerow(row) def make_xls_file(self): headings = self.get_headers() if not self.success: return with xlsxwriter.Workbook(self.file_path) as workbook: worksheet = workbook.add_worksheet() # Add a bold format to use to highlight cells. bold = workbook.add_format({'bold': True}) # Add the worksheet data that the charts will refer to. data = self.get_data() worksheet.write_row('A1', headings, bold) for n, row in enumerate(data): worksheet.write_row(f'A{n+2}', [str(i) for i in row]) workbook.close() def make_xml_file(self): pass def make_doc_file(self): pass def send_email(self): msg = EmailMultiAlternatives( subject=self.email_subject, body=self.email_body, from_email=self.email_from, to=self.get_emails_to() ) # Create an inline attachment if self.file_path and self.success: msg.attach_file(self.file_path) else: msg.body = 'An error occurred while executing the request.' try: msg.send() logger.debug(f"COMMUTATOR:Email successfully sent") except SMTPException as e: logger.error(f"COMMUTATOR:Email connector: {e}") @abc.abstractmethod def send(self): pass class SendExport(SendExportBase): def __init__(self, panel, user, file_type='csv', **kwargs): super().__init__() self.panel = panel self.user = user self.file_type = file_type self.get_file_method = self.type_mapper[file_type] def get_emails_to(self): return [self.user.email] + super().get_emails_to() def get_file_name(self): name = '_'.join(self.panel.name.split(' ')) return f'export_{name.lower()}.{self.file_type}' def get_data(self): return self.panel.get_data() def get_headers(self): try: header = self.panel.get_headers() self.success = True return header except Exception as err: logger.info(f'HEADER:{err}') def get_file(self): if os.path.exists(self.file_path) and os.path.isfile(self.file_path): with open(self.file_path, 'rb') as export_file: return export_file else: logger.info('COMMUTATOR:image file not found dir: {path}') def send(self): self.get_file_method() print(f'ok: {self.file_path}') self.send_email() class SendGuideExport(SendExportBase): """Send guid export.""" def __init__(self, data, guide, user, file_type='csv', **kwargs): self.type_mapper = { "csv": self.make_csv_file, "xml": self.make_xml_file, "doc": self.make_doc_file } self.guide = guide self.data = data self.user = user self.file_type = file_type self.get_file_method = self.type_mapper[file_type] super().__init__() def get_emails_to(self): return [self.user.email] + super().get_emails_to() def get_file_name(self): name = self.guide.slug return f'export_{name}.{self.file_type}' def make_doc_file(self): document = DocTemplate() document.template(self.get_doc_data()) document.document.save(self.file_path) self.success = True def get_data(self): return self.data def get_doc_data(self): init_data = self.get_data() objects = [] city_name = None section_name = None advertorial_page = None for instance in init_data: row_advertorial_page = instance.get('advertorial_page') if row_advertorial_page: advertorial_page = row_advertorial_page else: instance['advertorial_page'] = advertorial_page row_city = instance.get('city_name') if row_city: city_name = row_city else: instance['city_name'] = city_name row_section = instance.get('node_name') if row_section.endswith('SectionNode'): section_name = row_section else: instance['section_name'] = section_name if instance.pop('node_name', None) == 'EstablishmentNode': objects.append(instance.items()) return objects def send(self): self.get_file_method() print(f'ok: {self.file_path}') self.send_email() def get_headers(self): """Get headers for model Establishment.""" exclude_headers = ['node_name', ] headers = list(self.data[0].keys()) if self.guide.RESTAURANT or self.guide.ARTISAN: exclude_headers.append('product_name', ) if self.guide.WINE: exclude_headers.extend([ 'name', 'public_mark', 'toque_number', 'schedule', 'address', 'phones', 'establishment_type', 'establishment_subtypes', 'review', 'price_level', 'metadata', ]) for name in set(exclude_headers): headers.pop(headers.index(name)) else: self.success = True return headers def make_csv_file(self): file_header = self.get_headers() if not self.success: return with open(self.file_path, 'w') as f: file_writer = csv.writer(f, quotechar='"', quoting=csv.QUOTE_MINIMAL) # Write headers to CSV file file_writer.writerow(file_header) city = None for row in self.get_data(): row_city = row.get('city_name') if row_city: city = row_city else: row['city_name'] = city if row.pop("node_name") == "EstablishmentNode": file_writer.writerow(row.values()) def make_xml_file(self): # create the file structure data = ET.Element('data') items = ET.SubElement(data, 'items') city = None for row in self.get_data(): row_city = row.get('city_name') if row_city: city = row_city else: row['city_name'] = city if row.pop("node_name") == "EstablishmentNode": for key, value in row.items(): item1 = ET.SubElement(items, 'item') item1.set('name', key) item1.text = str(value) # create a new XML file with the results tree = ET.ElementTree(data) with open(self.file_path, 'bw') as f: tree.write(f) self.success = True