import abc import csv import logging import os import tempfile import xml.etree.ElementTree as ET from smtplib import SMTPException import docx import xlsxwriter from django.conf import settings from django.core.mail import EmailMultiAlternatives from docx.shared import RGBColor, Pt from utils.methods import section_name_into_index_name logging.basicConfig(format='[%(levelname)s] %(message)s', level=logging.INFO) logger = logging.getLogger(__name__) class DocTemplate: DOCUMENT_FONT_NAME = 'Palatino' DOCUMENT_FONT_SIZE = Pt(12) DOCUMENT_FONT_COLOR = RGBColor(0, 0, 0) def __init__(self): self.document = docx.Document() style = self.document.styles['Normal'] style.font.size = self.DOCUMENT_FONT_SIZE style.font.name = self.DOCUMENT_FONT_NAME style.font.color.rgb = self.DOCUMENT_FONT_COLOR def add_page_break(self): self.document.add_page_break() def add_empty_line(self): self.document.add_paragraph() def add_horizontal_line(self): return self.document.add_paragraph(f'{"_" * 50}') def add_bullet_list(self, elements: (tuple, list)): styles = { 'name': 'Arial', 'size': Pt(10) } for element in elements: bullet = self.document.add_paragraph(style='List Bullet') bullet.paragraph_format.space_before = Pt(10) bullet = bullet.add_run(element) self.apply_font_style(bullet, params=styles) def add_heading(self, name: str, level: int = 2, font_style: dict = None, color_rgb: tuple = (0, 0, 0)): heading = self.document.add_heading(level=level).add_run(name) self.apply_font_style(heading, font_style) if color_rgb: self.apply_font_color(heading, color_rgb) def add_paragraph(self, name: str, font_style: dict = None, color_rgb: tuple = (0, 0, 0)): paragraph = self.document.add_paragraph() paragraph.paragraph_format.space_before = Pt(10) paragraph.add_run(name) self.apply_font_style(paragraph, font_style) if color_rgb: self.apply_font_color(paragraph, color_rgb) def apply_font_style(self, section, params: dict): for attr, value in params.items(): if not hasattr(section, 'font'): continue setattr(section.font, attr, value) def apply_font_color(self, section, color_rgb: tuple): if len(color_rgb) == 3: font = getattr(section, 'font', None) color = getattr(font, 'color', None) if font and color: setattr(color, 'rgb', RGBColor(*color_rgb)) def template(self, data: list): for obj in data: obj = dict(obj) index_name = section_name_into_index_name(obj.get('section_name')) # ESTABLISHMENT HEADING (LEVEL 1) self.add_heading(name=obj['name'], font_style={'size': Pt(18), 'name': 'Palatino', 'bold': False}, level=1) # ESTABLISHMENT TYPE PARAGRAPH self.add_paragraph(name=index_name, font_style={'size': Pt(8), 'name': 'Palatino', 'bold': False, 'italic': False}) self.add_empty_line() # CITY HEADING (LEVEL 2) self.add_heading(name='City', font_style={'size': Pt(13), 'name': 'Arial', 'bold': True}, level=2) # CITY NAME HEADING (LEVEL 3) self.add_heading(name=obj['city_name'], font_style={'size': Pt(12), 'name': 'Arial', 'bold': True, 'italic': True}, color_rgb=(102, 102, 102)) self.add_empty_line() # REVIEW HEADING (LEVEL 2) review = obj.get('review') if review: self.add_heading(name='Review', font_style={'size': Pt(13), 'name': 'Arial', 'bold': True}, level=2) for locale, text in review.items(): # REVIEW LOCALE HEADING (LEVEL 6) self.add_heading(name=locale, font_style={'size': Pt(11), 'name': 'Arial', 'underline': True, 'italic': True}, color_rgb=(102, 102, 102), level=6) # REVIEW TEXT PARAGRAPH self.add_paragraph(name=text, font_style={'size': Pt(10), 'name': 'Arial'}) self.add_empty_line() # PHONE HEADING (LEVEL 2) phones = obj.get('phones') if phones: self.add_heading(name='Phones', font_style={'size': Pt(13), 'name': 'Arial', 'bold': True}, level=2) # PHONE NUMBER PARAGRAPH self.add_bullet_list(phones) self.add_empty_line() # ADDRESS HEADING (LEVEL 2) address = obj.get('address') if address: self.add_heading(name='Address', font_style={'size': Pt(13), 'name': 'Arial', 'bold': True}, level=2) # ADDRESS DATA PARAGRAPH self.add_paragraph(name=obj.get('address'), font_style={'size': Pt(10), 'name': 'Arial'}) self.add_empty_line() # TIMETABLE HEADING (LEVEL 2) schedule = obj.get('schedule') if schedule: self.add_heading(name='Schedule', font_style={'size': Pt(13), 'name': 'Arial', 'bold': True}, level=2) # TIMETABLE ITEMS PARAGRAPH for weekday, working_hours in schedule.items(): bullet = self.document.add_paragraph(style='List Bullet') bullet.paragraph_format.space_before = Pt(10) bullet = bullet.add_run(f'{weekday}: {working_hours}') self.apply_font_style(bullet, {'name': 'Arial', 'size': Pt(10)}) self.add_empty_line() # PUBLIC MARK HEADING (LEVEL 2) public_mark = obj.get('public_mark') if public_mark: self.add_heading(name='Mark', font_style={'size': Pt(13), 'name': 'Arial', 'bold': True}, level=2) self.add_heading(name=public_mark, font_style={'size': Pt(10), 'name': 'Arial'}, level=2) self.add_empty_line() # TOQUE HEADING (LEVEL 2) toque = obj.get('toque_number') if toque: self.add_heading(name='Toque', font_style={'size': Pt(13), 'name': 'Arial', 'bold': True}, level=2) self.add_heading(name=toque, font_style={'size': Pt(10), 'name': 'Arial'}, level=2) self.add_empty_line() # TOQUE HEADING (LEVEL 2) price_level = obj.get('price_level') if price_level: self.add_heading(name='Price level', font_style={'size': Pt(13), 'name': 'Arial', 'bold': True}, level=2) self.add_heading(name=price_level, font_style={'size': Pt(10), 'name': 'Arial'}, level=2) self.add_empty_line() # SERVICES HEADING (LEVEL 2) services = obj.get('services') if services: self.add_heading(name='Services', font_style={'size': Pt(13), 'name': 'Arial', 'bold': True}, level=2) # TIMETABLE ITEMS PARAGRAPH self.add_bullet_list(services) self.add_empty_line() # METADATA HEADING (LEVEL 2) metadata = obj.get('metadata') if metadata: for obj in metadata: for section, tags in obj.items(): section = section.capitalize() self.add_heading(name=section, font_style={'size': Pt(13), 'name': 'Arial', 'bold': True}, level=2) # TIMETABLE ITEMS PARAGRAPH self.add_bullet_list(tags) self.add_empty_line() # PAGE BREAK self.add_page_break() class SendExportBase: """Base class of export and sending data.""" def __init__(self): self.success = False self.email_from = settings.EMAIL_HOST_USER self.email_subject = f'Export panel: {self.get_file_name()}' self.email_body = 'Exported panel data' self.file_path = os.path.join( settings.STATIC_ROOT, 'email', tempfile.gettempdir(), self.get_file_name() ) self.type_mapper = { "csv": self.make_csv_file, "xls": self.make_xls_file } @abc.abstractmethod def get_headers(self): pass @abc.abstractmethod def get_emails_to(self): return [ # 'a.feteleu@spider.ru', # 'kuzmenko.da@gmail.com', # 'sinapsit@yandex.ru' ] @abc.abstractmethod def get_data(self): pass @abc.abstractmethod def get_file_name(self): return '' def make_csv_file(self): file_header = self.get_headers() if not self.success: return with open(self.file_path, 'w') as f: file_writer = csv.writer(f, quotechar='"', quoting=csv.QUOTE_MINIMAL) # Write headers to CSV file file_writer.writerow(file_header) for row in self.get_data(): file_writer.writerow(row) def make_xls_file(self): headings = self.get_headers() if not self.success: return with xlsxwriter.Workbook(self.file_path) as workbook: worksheet = workbook.add_worksheet() # Add a bold format to use to highlight cells. bold = workbook.add_format({'bold': True}) # Add the worksheet data that the charts will refer to. data = self.get_data() worksheet.write_row('A1', headings, bold) for n, row in enumerate(data): worksheet.write_row(f'A{n+2}', [str(i) for i in row]) workbook.close() def make_xml_file(self): pass def make_doc_file(self): pass def send_email(self): msg = EmailMultiAlternatives( subject=self.email_subject, body=self.email_body, from_email=self.email_from, to=self.get_emails_to() ) # Create an inline attachment if self.file_path and self.success: msg.attach_file(self.file_path) else: msg.body = 'An error occurred while executing the request.' try: msg.send() logger.debug(f"COMMUTATOR:Email successfully sent") except SMTPException as e: logger.error(f"COMMUTATOR:Email connector: {e}") @abc.abstractmethod def send(self): pass class SendExport(SendExportBase): def __init__(self, panel, user, file_type='csv', **kwargs): super().__init__() self.panel = panel self.user = user self.file_type = file_type self.get_file_method = self.type_mapper[file_type] def get_emails_to(self): return [self.user.email] + super().get_emails_to() def get_file_name(self): name = '_'.join(self.panel.name.split(' ')) return f'export_{name.lower()}.{self.file_type}' def get_data(self): return self.panel.get_data() def get_headers(self): try: header = self.panel.get_headers() self.success = True return header except Exception as err: logger.info(f'HEADER:{err}') def get_file(self): if os.path.exists(self.file_path) and os.path.isfile(self.file_path): with open(self.file_path, 'rb') as export_file: return export_file else: logger.info('COMMUTATOR:image file not found dir: {path}') def send(self): self.get_file_method() print(f'ok: {self.file_path}') self.send_email() class SendGuideExport(SendExportBase): """Send guid export.""" def __init__(self, data, guide, user, file_type='csv', **kwargs): self.type_mapper = { "csv": self.make_csv_file, "xml": self.make_xml_file, "doc": self.make_doc_file } self.guide = guide self.data = data self.user = user self.file_type = file_type self.get_file_method = self.type_mapper[file_type] super().__init__() def get_emails_to(self): return [self.user.email] + super().get_emails_to() def get_file_name(self): name = self.guide.slug return f'export_{name}.{self.file_type}' def get_headers(self): headers = list(self.get_data()[0].keys()) headers.pop(headers.index('node_name')) self.success = True return headers def get_data(self): excluded_data = ['guide', ] if self.guide.guide_type in [self.guide.ARTISAN, self.guide.RESTAURANT]: excluded_data.extend([ 'product_name', 'product_review', 'product_type', 'product_subtypes', 'product_address', 'product_city', 'product_metadata', 'wine_color_section_name', ]) elif self.guide.guide_type == self.guide.WINE: excluded_data.extend([ 'public_mark', 'toque_number', 'schedule', 'address', 'phones', 'establishment_type', 'establishment_subtypes', 'review', 'price_level', 'metadata', 'public_mark', 'toque_number', 'schedule', 'phones', 'establishment_type', 'establishment_subtypes', 'city_name', ]) for obj in self.data: for column in excluded_data: obj.pop(column) if column in obj.keys() else None return self.data def get_doc_data(self): init_data = self.get_data() objects = [] city_name = None section_name = None ad_number_of_pages = None ad_right_pages = None for row in init_data: row_advertorial = row.pop('advertorial') if row_advertorial: ad_number_of_pages = row_advertorial.get('number_of_pages') ad_right_pages = row_advertorial.get('right_pages') else: row['ad_number_of_pages'] = ad_number_of_pages row['ad_right_pages'] = ad_right_pages row_city = row.get('city_name') if row_city: city_name = row_city else: row['city_name'] = city_name row_section = row.get('node_name') if row_section.endswith('SectionNode'): section_name = row_section else: row['section_name'] = section_name if row.pop('node_name', None) == 'EstablishmentNode': objects.append(row.items()) return objects def send(self): self.get_file_method() print(f'ok: {self.file_path}') self.send_email() def make_doc_file(self): document = DocTemplate() document.template(self.get_doc_data()) document.document.save(self.file_path) self.success = True def make_csv_file(self): file_header = self.get_headers() if not self.success: return with open(self.file_path, 'w') as f: file_writer = csv.writer(f, quotechar='"', quoting=csv.QUOTE_MINIMAL) # Write headers to CSV file file_writer.writerow(file_header) city = None ad_number_of_pages = None ad_right_pages = None for row in self.get_data(): row_city = row.get('city_name') if row_city: city = row_city else: row['city_name'] = city row_advertorial = row.get('advertorial') if row_advertorial: ad_number_of_pages = row_advertorial.get('number_of_pages') ad_right_pages = row_advertorial.get('right_pages') else: row['ad_number_of_pages'] = ad_number_of_pages row['ad_right_pages'] = ad_right_pages if row.pop("node_name") == "EstablishmentNode": file_writer.writerow(row.values()) def make_xml_file(self): if self.guide.guide_type == self.guide.WINE: # products city = None wine_color = None establishment_name = None establishment_address = None ad_number_of_pages = None ad_right_pages = None # create the file structure data = ET.Element('data') products = ET.SubElement(data, 'products') for row in self.get_data(): row_advertorial = row.pop('advertorial') if row_advertorial: ad_number_of_pages = row_advertorial.get('number_of_pages') ad_right_pages = row_advertorial.get('right_pages') else: row['ad_number_of_pages'] = ad_number_of_pages row['ad_right_pages'] = ad_right_pages row_establishment_address = row.get('address') if row_establishment_address: establishment_address = row_establishment_address else: row['establishment_name'] = establishment_address row_establishment = row.pop('name') if row_establishment: establishment_name = row_establishment else: row['establishment_name'] = establishment_name row_city = row.get('product_city') if row_city: city = row_city else: row['city_name'] = city row_wine_color = row.pop('wine_color_section_name') if row_wine_color: wine_color = row_wine_color else: row['wine_color'] = wine_color if row.pop("node_name") == "WineNode": product = ET.SubElement(products, 'product') for key, value in row.items(): product.set(key, str(value)) if value else None # create a new XML file with the results tree = ET.ElementTree(data) with open(self.file_path, 'bw') as f: tree.write(f) self.success = True elif self.guide.guide_type in [self.guide.ARTISAN, self.guide.RESTAURANT]: # establishment objects = [] city = None ad_number_of_pages = None ad_right_pages = None for row in self.get_data(): row_advertorial = row.pop('advertorial') if row_advertorial: ad_number_of_pages = row_advertorial.get('number_of_pages') ad_right_pages = row_advertorial.get('right_pages') else: row['ad_number_of_pages'] = ad_number_of_pages row['ad_right_pages'] = ad_right_pages row_city = row.pop('city_name') if row_city: city = row_city objects.append({'city': city, 'establishments': []}) if row.pop("node_name") == "EstablishmentNode": city_index = [i for i, obj in enumerate(objects) if obj['city'] == city][0] establishments = objects[city_index].get('establishments') establishments.append(row) # create xml-structure data = ET.Element('data') cities_element = ET.SubElement(data, 'cities') for city in objects: city_element = ET.SubElement(cities_element, 'city', attrib={'name': city.get('city')}) establishments_element = ET.SubElement(city_element, 'establishments') for establishment in city.get('establishments'): establishment_element = ET.SubElement(establishments_element, 'establishment') for key, value in establishment.items(): establishment_element.set(key, str(value)) if value else None # create a new XML file with the results tree = ET.ElementTree(data) with open(self.file_path, 'bw') as f: tree.write(f) self.success = True