# Society Self-Service # Copyright © 2018-2023 Yingtong Li (RunasSudo) # Copyright © 2023 MUMUS Inc. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import boto3 from botocore.exceptions import ClientError import premailer from django.conf import settings from django.template import loader from markupsafe import Markup from selfserv.mdx_urlize import UrlizeExtension import logging import markdown # Debugging import subprocess import tempfile import time class Emailer: def __init__(self): self.client = boto3.client('ses', aws_access_key_id=settings.AWS_KEY_ID, aws_secret_access_key=settings.AWS_SECRET, region_name=settings.AWS_REGION) self.template = loader.get_template('ssmain/email/rendered.html') def markdown(self, x): return markdown.markdown(x, extensions=[UrlizeExtension(), 'fenced_code']) def boto3_send(self, *args, **kwargs): for i in range(0, 10): try: self.client.send_email(*args, **kwargs) return except ClientError as e: if e['Error']['Code'] == 'Throttling' and e['Error']['Message'] == 'Maximum sending rate exceeded.': wait_time = max(10*(2**i), 5000) #self.stdout.write(self.style.NOTICE('Reached maximum sending rate, waiting {} ms'.format(wait_time))) time.sleep(wait_time/1000) else: raise e raise Exception('Reached maximum number of retries') def send_raw_mail(self, recipients, subject, content_txt, content_html): if settings.EMAIL_DEBUG: with tempfile.NamedTemporaryFile(mode='w', encoding='utf-8', suffix='.mbox') as f: print('From: sender@example.com\nTo: ' + ','.join(recipients) + '\nSubject: ' + subject + '\nContent-Type: multipart/alternative; boundary=boundary\nMessage-ID: <0@example.com>\n\n--boundary\nContent-Type: text/html; charset=utf-8\n\n' + content_html + '\n--boundary\nContent-Type: text/plain; charset=utf-8\n\n' + content_txt + '\n--boundary', file=f) subprocess.run(['evolution', f.name]) time.sleep(5) else: self.boto3_send( Destination={ 'ToAddresses': recipients, }, Message={ 'Body': { 'Html': { 'Charset': 'utf-8', 'Data': content_html, }, 'Text': { 'Charset': 'utf-8', 'Data': content_txt, }, }, 'Subject': { 'Charset': 'utf-8', 'Data': subject, }, }, Source='{} <{}>'.format(settings.ORG_NAME, settings.AWS_SENDER_EMAIL), ) def render_mail(self, template_loc, params={}): params['baseurl'] = 'https://' + settings.ALLOWED_HOSTS[0] params['format'] = 'txt' template = loader.get_template(template_loc) content_txt = template.render(params).strip().replace('\\*', '*') params['format'] = 'markdown' content_markdown = self.markdown(template.render(params)) content_html = self.template.render({'email_content': Markup(content_markdown)}) content_html = premailer.Premailer(content_html, cssutils_logging_level=logging.ERROR, strip_important=False).transform() return content_txt, content_html def send_mail(self, recipients, subject, template_loc, params): content_txt, content_html = self.render_mail(template_loc, params) self.send_raw_mail(recipients, subject, content_txt, content_html)