society-self-service/ssmain/email.py

106 lines
3.8 KiB
Python

# Society Self-Service
# Copyright © 2018-2023 Yingtong Li (RunasSudo)
# Copyright © 2023 MUMUS Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import boto3
from botocore.exceptions import ClientError
import premailer
from django.conf import settings
from django.template import loader
from markupsafe import Markup
from selfserv.mdx_urlize import UrlizeExtension
import logging
import markdown
# Debugging
import subprocess
import tempfile
import time
class Emailer:
def __init__(self):
self.client = boto3.client('ses', aws_access_key_id=settings.AWS_KEY_ID, aws_secret_access_key=settings.AWS_SECRET, region_name=settings.AWS_REGION)
self.template = loader.get_template('ssmain/email/rendered.html')
def markdown(self, x):
return markdown.markdown(x, extensions=[UrlizeExtension(), 'fenced_code'])
def boto3_send(self, *args, **kwargs):
for i in range(0, 10):
try:
self.client.send_email(*args, **kwargs)
return
except ClientError as e:
if e['Error']['Code'] == 'Throttling' and e['Error']['Message'] == 'Maximum sending rate exceeded.':
wait_time = max(10*(2**i), 5000)
#self.stdout.write(self.style.NOTICE('Reached maximum sending rate, waiting {} ms'.format(wait_time)))
time.sleep(wait_time/1000)
else:
raise e
raise Exception('Reached maximum number of retries')
def send_raw_mail(self, recipients, subject, content_txt, content_html):
if settings.EMAIL_DEBUG:
with tempfile.NamedTemporaryFile(mode='w', encoding='utf-8', suffix='.mbox') as f:
print('From: sender@example.com\nTo: ' + ','.join(recipients) + '\nSubject: ' + subject + '\nContent-Type: multipart/alternative; boundary=boundary\nMessage-ID: <0@example.com>\n\n--boundary\nContent-Type: text/html; charset=utf-8\n\n' + content_html + '\n--boundary\nContent-Type: text/plain; charset=utf-8\n\n' + content_txt + '\n--boundary', file=f)
subprocess.run(['evolution', f.name])
time.sleep(5)
else:
self.boto3_send(
Destination={
'ToAddresses': recipients,
},
Message={
'Body': {
'Html': {
'Charset': 'utf-8',
'Data': content_html,
},
'Text': {
'Charset': 'utf-8',
'Data': content_txt,
},
},
'Subject': {
'Charset': 'utf-8',
'Data': subject,
},
},
Source='{} <{}>'.format(settings.ORG_NAME, settings.AWS_SENDER_EMAIL),
)
def render_mail(self, template_loc, params={}):
params['baseurl'] = 'https://' + settings.ALLOWED_HOSTS[0]
params['format'] = 'txt'
template = loader.get_template(template_loc)
content_txt = template.render(params).strip().replace('\\*', '*')
params['format'] = 'markdown'
content_markdown = self.markdown(template.render(params))
content_html = self.template.render({'email_content': Markup(content_markdown)})
content_html = premailer.Premailer(content_html, cssutils_logging_level=logging.ERROR, strip_important=False).transform()
return content_txt, content_html
def send_mail(self, recipients, subject, template_loc, params):
content_txt, content_html = self.render_mail(template_loc, params)
self.send_raw_mail(recipients, subject, content_txt, content_html)