* Sending mail works independently * You can send a mail by calling Queue_doc.send() * Used context manager to track exceptions while sending mailsversion-14
@@ -1620,6 +1620,12 @@ def enqueue(*args, **kwargs): | |||
import frappe.utils.background_jobs | |||
return frappe.utils.background_jobs.enqueue(*args, **kwargs) | |||
def task(**task_kwargs): | |||
def decorator_task(f): | |||
f.enqueue = lambda **fun_kwargs: enqueue(f, **task_kwargs, **fun_kwargs) | |||
return f | |||
return decorator_task | |||
def enqueue_doc(*args, **kwargs): | |||
''' | |||
Enqueue method to be executed using a background worker | |||
@@ -35,9 +35,6 @@ OUTGOING_EMAIL_ACCOUNT_MISSING = _("Please setup default Email Account from Setu | |||
class SentEmailInInbox(Exception): | |||
pass | |||
class InvalidEmailCredentials(frappe.ValidationError): | |||
pass | |||
def cache_email_account(cache_name): | |||
def decorator_cache_email_account(func): | |||
@functools.wraps(func) | |||
@@ -100,9 +97,8 @@ class EmailAccount(Document): | |||
self.get_incoming_server() | |||
self.no_failed = 0 | |||
if self.enable_outgoing: | |||
self.check_smtp() | |||
self.validate_smtp_conn() | |||
else: | |||
if self.enable_incoming or (self.enable_outgoing and not self.no_smtp_authentication): | |||
frappe.throw(_("Password is required or select Awaiting Password")) | |||
@@ -118,6 +114,13 @@ class EmailAccount(Document): | |||
if self.append_to not in valid_doctypes: | |||
frappe.throw(_("Append To can be one of {0}").format(comma_or(valid_doctypes))) | |||
def validate_smtp_conn(self): | |||
if not self.smtp_server: | |||
frappe.throw(_("SMTP Server is required")) | |||
server = self.get_smtp_server() | |||
return server.session | |||
def before_save(self): | |||
messages = [] | |||
as_list = 1 | |||
@@ -179,24 +182,6 @@ class EmailAccount(Document): | |||
except Exception: | |||
pass | |||
def check_smtp(self): | |||
"""Checks SMTP settings.""" | |||
if self.enable_outgoing: | |||
if not self.smtp_server: | |||
frappe.throw(_("{0} is required").format("SMTP Server")) | |||
server = SMTPServer( | |||
login = getattr(self, "login_id", None) or self.email_id, | |||
server=self.smtp_server, | |||
port=cint(self.smtp_port), | |||
use_tls=cint(self.use_tls), | |||
use_ssl=cint(self.use_ssl_for_outgoing) | |||
) | |||
if self.password and not self.no_smtp_authentication: | |||
server.password = self.get_password() | |||
server.sess | |||
def get_incoming_server(self, in_receive=False, email_sync_rule="UNSEEN"): | |||
"""Returns logged in POP3/IMAP connection object.""" | |||
if frappe.cache().get_value("workers:no-internet") == True: | |||
@@ -259,7 +244,7 @@ class EmailAccount(Document): | |||
return None | |||
elif not in_receive and any(map(lambda t: t in message, auth_error_codes)): | |||
self.throw_invalid_credentials_exception() | |||
SMTPServer.throw_invalid_credentials_exception() | |||
else: | |||
frappe.throw(cstr(e)) | |||
@@ -279,20 +264,18 @@ class EmailAccount(Document): | |||
@property | |||
def _password(self): | |||
raise_exception = not self.no_smtp_authentication | |||
raise_exception = not (self.no_smtp_authentication or frappe.flags.in_test) | |||
return self.get_password(raise_exception=raise_exception) | |||
@property | |||
def default_sender(self): | |||
return email.utils.formataddr((self.name, self.get("email_id"))) | |||
@classmethod | |||
def throw_invalid_credentials_exception(cls): | |||
frappe.throw( | |||
_("Incorrect email or password. Please check your login credentials."), | |||
exc=InvalidEmailCredentials, | |||
title=_("Invalid Credentials") | |||
) | |||
def is_exists_in_db(self): | |||
"""Some of the Email Accounts we create from configs and those doesn't exists in DB. | |||
This is is to check the specific email account exists in DB or not. | |||
""" | |||
return self.find_one_by_filters(name=self.name) | |||
@classmethod | |||
def from_record(cls, record): | |||
@@ -402,6 +385,20 @@ class EmailAccount(Document): | |||
account_details[doc_field_name] = (value and value[0]) or default | |||
return account_details | |||
def sendmail_config(self): | |||
return { | |||
'server': self.smtp_server, | |||
'port': cint(self.smtp_port), | |||
'login': getattr(self, "login_id", None) or self.email_id, | |||
'password': self._password, | |||
'use_ssl': cint(self.use_ssl_for_outgoing), | |||
'use_tls': cint(self.use_tls) | |||
} | |||
def get_smtp_server(self): | |||
config = self.sendmail_config() | |||
return SMTPServer(**config) | |||
def handle_incoming_connect_error(self, description): | |||
if test_internet(): | |||
if self.get_failed_attempts_count() > 2: | |||
@@ -2,15 +2,26 @@ | |||
# Copyright (c) 2015, Frappe Technologies and contributors | |||
# For license information, please see license.txt | |||
from __future__ import unicode_literals | |||
import traceback | |||
import json | |||
from rq.timeouts import JobTimeoutException | |||
import smtplib | |||
import quopri | |||
from email.parser import Parser | |||
import frappe | |||
from frappe import _ | |||
from frappe import _, safe_encode, task | |||
from frappe.model.document import Document | |||
from frappe.email.queue import send_one | |||
from frappe.utils import now_datetime | |||
from frappe.email.queue import get_unsubcribed_url | |||
from frappe.email.email_body import add_attachment | |||
from frappe.utils import cint | |||
from email.policy import SMTPUTF8 | |||
MAX_RETRY_COUNT = 3 | |||
class EmailQueue(Document): | |||
DOCTYPE = 'Email Queue' | |||
def set_recipients(self, recipients): | |||
self.set("recipients", []) | |||
for r in recipients: | |||
@@ -30,6 +41,241 @@ class EmailQueue(Document): | |||
duplicate.set_recipients(recipients) | |||
return duplicate | |||
@classmethod | |||
def find(cls, name): | |||
return frappe.get_doc(cls.DOCTYPE, name) | |||
def update_db(self, commit=False, **kwargs): | |||
frappe.db.set_value(self.DOCTYPE, self.name, kwargs) | |||
if commit: | |||
frappe.db.commit() | |||
def update_status(self, status, commit=False, **kwargs): | |||
self.update_db(status = status, commit = commit, **kwargs) | |||
if self.communication: | |||
communication_doc = frappe.get_doc('Communication', self.communication) | |||
communication_doc.set_delivery_status(commit=commit) | |||
@property | |||
def cc(self): | |||
return (self.show_as_cc and self.show_as_cc.split(",")) or [] | |||
@property | |||
def to(self): | |||
return [r.recipient for r in self.recipients if r.recipient not in self.cc] | |||
@property | |||
def attachments_list(self): | |||
return json.loads(self.attachments) if self.attachments else [] | |||
def get_email_account(self): | |||
from frappe.email.doctype.email_account.email_account import EmailAccount | |||
if self.email_account: | |||
return frappe.get_doc('Email Account', self.email_account) | |||
return EmailAccount.find_outgoing( | |||
match_by_email = self.sender, match_by_doctype = self.reference_doctype) | |||
def is_to_be_sent(self): | |||
return self.status in ['Not Sent','Partially Sent'] | |||
def can_send_now(self): | |||
hold_queue = (cint(frappe.defaults.get_defaults().get("hold_queue"))==1) | |||
if frappe.are_emails_muted() or not self.is_to_be_sent() or hold_queue: | |||
return False | |||
return True | |||
def send(self, is_background_task=False): | |||
""" Send emails to recipients. | |||
""" | |||
if not self.can_send_now(): | |||
frappe.db.rollback() | |||
return | |||
with SendMailContext(self, is_background_task) as ctx: | |||
message = None | |||
for recipient in self.recipients: | |||
if not recipient.is_mail_to_be_sent(): | |||
continue | |||
message = ctx.build_message(recipient.recipient) | |||
if not frappe.flags.in_test: | |||
ctx.smtp_session.sendmail(recipient.recipient, self.sender, message) | |||
ctx.add_to_sent_list(recipient) | |||
if frappe.flags.in_test: | |||
frappe.flags.sent_mail = message | |||
return | |||
if ctx.email_account_doc.append_emails_to_sent_folder and ctx.sent_to: | |||
ctx.email_account_doc.append_email_to_sent_folder(message) | |||
@task(queue = 'short') | |||
def send_mail(email_queue_name, is_background_task=False): | |||
"""This is equalent to EmqilQueue.send. | |||
This provides a way to make sending mail as a background job. | |||
""" | |||
record = EmailQueue.find(email_queue_name) | |||
record.send(is_background_task=is_background_task) | |||
class SendMailContext: | |||
def __init__(self, queue_doc: Document, is_background_task: bool = False): | |||
self.queue_doc = queue_doc | |||
self.is_background_task = is_background_task | |||
self.email_account_doc = queue_doc.get_email_account() | |||
self.smtp_server = self.email_account_doc.get_smtp_server() | |||
self.sent_to = [rec.recipient for rec in self.queue_doc.recipients if rec.is_main_sent()] | |||
def __enter__(self): | |||
self.queue_doc.update_status(status='Sending', commit=True) | |||
return self | |||
def __exit__(self, exc_type, exc_val, exc_tb): | |||
exceptions = [ | |||
smtplib.SMTPServerDisconnected, | |||
smtplib.SMTPAuthenticationError, | |||
smtplib.SMTPRecipientsRefused, | |||
smtplib.SMTPConnectError, | |||
smtplib.SMTPHeloError, | |||
JobTimeoutException | |||
] | |||
self.smtp_server.quit() | |||
self.log_exception(exc_type, exc_val, exc_tb) | |||
if exc_type in exceptions: | |||
email_status = (self.sent_to and 'Partially Sent') or 'Not Sent' | |||
self.queue_doc.update_status(status = email_status, commit = True) | |||
elif exc_type: | |||
if self.queue_doc.retry < MAX_RETRY_COUNT: | |||
update_fields = {'status': 'Not Sent', 'retry': self.queue_doc.retry + 1} | |||
else: | |||
update_fields = {'status': (self.sent_to and 'Partially Errored') or 'Error'} | |||
self.queue_doc.update_status(**update_fields, commit = True) | |||
else: | |||
email_status = self.is_mail_sent_to_all() and 'Sent' | |||
email_status = email_status or (self.sent_to and 'Partially Sent') or 'Not Sent' | |||
self.queue_doc.update_status(status = email_status, commit = True) | |||
def log_exception(self, exc_type, exc_val, exc_tb): | |||
if exc_type: | |||
traceback_string = "".join(traceback.format_tb(exc_tb)) | |||
traceback_string += f"\n Queue Name: {self.queue_doc.name}" | |||
if self.is_background_task: | |||
frappe.log_error(title = 'frappe.email.queue.flush', message = traceback_string) | |||
else: | |||
frappe.log_error(message = traceback_string) | |||
@property | |||
def smtp_session(self): | |||
if frappe.flags.in_test: | |||
return | |||
return self.smtp_server.session | |||
def add_to_sent_list(self, recipient): | |||
# Update recipient status | |||
recipient.update_db(status='Sent', commit=True) | |||
self.sent_to.append(recipient.recipient) | |||
def is_mail_sent_to_all(self): | |||
return sorted(self.sent_to) == sorted([rec.recipient for rec in self.queue_doc.recipients]) | |||
def get_message_object(self, message): | |||
return Parser(policy=SMTPUTF8).parsestr(message) | |||
def message_placeholder(self, placeholder_key): | |||
map = { | |||
'tracker': '<!--email open check-->', | |||
'unsubscribe_url': '<!--unsubscribe url-->', | |||
'cc': '<!--cc message-->', | |||
'recipient': '<!--recipient-->', | |||
} | |||
return map.get(placeholder_key) | |||
def build_message(self, recipient_email): | |||
"""Build message specific to the recipient. | |||
""" | |||
message = self.queue_doc.message | |||
if not message: | |||
return "" | |||
message = message.replace(self.message_placeholder('tracker'), self.get_tracker_str()) | |||
message = message.replace(self.message_placeholder('unsubscribe_url'), | |||
self.get_unsubscribe_str(recipient_email)) | |||
message = message.replace(self.message_placeholder('cc'), self.get_receivers_str()) | |||
message = message.replace(self.message_placeholder('recipient'), | |||
self.get_receipient_str(recipient_email)) | |||
message = self.include_attachments(message) | |||
return message | |||
def get_tracker_str(self): | |||
tracker_url_html = \ | |||
'<img src="https://{}/api/method/frappe.core.doctype.communication.email.mark_email_as_seen?name={}"/>' | |||
message = '' | |||
if frappe.conf.use_ssl and self.queue_doc.track_email_status: | |||
message = quopri.encodestring( | |||
tracker_url_html.format(frappe.local.site, self.queue_doc.communication).encode() | |||
).decode() | |||
return message | |||
def get_unsubscribe_str(self, recipient_email): | |||
unsubscribe_url = '' | |||
if self.queue_doc.add_unsubscribe_link and self.queue_doc.reference_doctype: | |||
doctype, doc_name = self.queue_doc.reference_doctype, self.queue_doc.reference_name | |||
unsubscribe_url = get_unsubcribed_url(doctype, doc_name, recipient_email, | |||
self.queue_doc.unsubscribe_method, self.queue_doc.unsubscribe_param) | |||
return quopri.encodestring(unsubscribe_url.encode()).decode() | |||
def get_receivers_str(self): | |||
message = '' | |||
if self.queue_doc.expose_recipients == "footer": | |||
to_str = ', '.join(self.queue_doc.to) | |||
cc_str = ', '.join(self.queue_doc.cc) | |||
message = f"This email was sent to {to_str}" | |||
message = message + f" and copied to {cc_str}" if cc_str else message | |||
return message | |||
def get_receipient_str(self, recipient_email): | |||
message = '' | |||
if self.queue_doc.expose_recipients != "header": | |||
message = recipient_email | |||
return message | |||
def include_attachments(self, message): | |||
message_obj = self.get_message_object(message) | |||
attachments = self.queue_doc.attachments_list | |||
for attachment in attachments: | |||
if attachment.get('fcontent'): | |||
continue | |||
fid = attachment.get("fid") | |||
if fid: | |||
_file = frappe.get_doc("File", fid) | |||
fcontent = _file.get_content() | |||
attachment.update({ | |||
'fname': _file.file_name, | |||
'fcontent': fcontent, | |||
'parent': message_obj | |||
}) | |||
attachment.pop("fid", None) | |||
add_attachment(**attachment) | |||
elif attachment.get("print_format_attachment") == 1: | |||
attachment.pop("print_format_attachment", None) | |||
print_format_file = frappe.attach_print(**attachment) | |||
print_format_file.update({"parent": message_obj}) | |||
add_attachment(**print_format_file) | |||
return safe_encode(message_obj.as_string()) | |||
@frappe.whitelist() | |||
def retry_sending(name): | |||
doc = frappe.get_doc("Email Queue", name) | |||
@@ -42,7 +288,9 @@ def retry_sending(name): | |||
@frappe.whitelist() | |||
def send_now(name): | |||
send_one(name, now=True) | |||
record = EmailQueue.find(name) | |||
if record: | |||
record.send() | |||
def on_doctype_update(): | |||
"""Add index in `tabCommunication` for `(reference_doctype, reference_name)`""" | |||
@@ -7,4 +7,16 @@ import frappe | |||
from frappe.model.document import Document | |||
class EmailQueueRecipient(Document): | |||
pass | |||
DOCTYPE = 'Email Queue Recipient' | |||
def is_mail_to_be_sent(self): | |||
return self.status == 'Not Sent' | |||
def is_main_sent(self): | |||
return self.status == 'Sent' | |||
def update_db(self, commit=False, **kwargs): | |||
frappe.db.set_value(self.DOCTYPE, self.name, kwargs) | |||
if commit: | |||
frappe.db.commit() | |||
@@ -173,19 +173,19 @@ def add(recipients, sender, subject, **kwargs): | |||
if not email_queue: | |||
email_queue = get_email_queue([r], sender, subject, **kwargs) | |||
if kwargs.get('now'): | |||
send_one(email_queue.name, now=True) | |||
email_queue.send() | |||
else: | |||
duplicate = email_queue.get_duplicate([r]) | |||
duplicate.insert(ignore_permissions=True) | |||
if kwargs.get('now'): | |||
send_one(duplicate.name, now=True) | |||
duplicate.send() | |||
frappe.db.commit() | |||
else: | |||
email_queue = get_email_queue(recipients, sender, subject, **kwargs) | |||
if kwargs.get('now'): | |||
send_one(email_queue.name, now=True) | |||
email_queue.send() | |||
def get_email_queue(recipients, sender, subject, **kwargs): | |||
'''Make Email Queue object''' | |||
@@ -237,6 +237,9 @@ def get_email_queue(recipients, sender, subject, **kwargs): | |||
', '.join(mail.recipients), traceback.format_exc()), 'Email Not Sent') | |||
recipients = list(set(recipients + kwargs.get('cc', []) + kwargs.get('bcc', []))) | |||
email_account = kwargs.get('email_account') | |||
email_account_name = email_account and email_account.is_exists_in_db() and email_account.name | |||
e.set_recipients(recipients) | |||
e.reference_doctype = kwargs.get('reference_doctype') | |||
e.reference_name = kwargs.get('reference_name') | |||
@@ -248,8 +251,8 @@ def get_email_queue(recipients, sender, subject, **kwargs): | |||
e.send_after = kwargs.get('send_after') | |||
e.show_as_cc = ",".join(kwargs.get('cc', [])) | |||
e.show_as_bcc = ",".join(kwargs.get('bcc', [])) | |||
e.email_account = email_account_name or None | |||
e.insert(ignore_permissions=True) | |||
return e | |||
def get_emails_sent_this_month(): | |||
@@ -331,44 +334,25 @@ def return_unsubscribed_page(email, doctype, name): | |||
indicator_color='green') | |||
def flush(from_test=False): | |||
"""flush email queue, every time: called from scheduler""" | |||
# additional check | |||
auto_commit = not from_test | |||
"""flush email queue, every time: called from scheduler | |||
""" | |||
from frappe.email.doctype.email_queue.email_queue import send_mail | |||
# To avoid running jobs inside unit tests | |||
if frappe.are_emails_muted(): | |||
msgprint(_("Emails are muted")) | |||
from_test = True | |||
smtpserver_dict = frappe._dict() | |||
for email in get_queue(): | |||
if cint(frappe.defaults.get_defaults().get("hold_queue"))==1: | |||
break | |||
if email.name: | |||
smtpserver = smtpserver_dict.get(email.sender) | |||
if not smtpserver: | |||
smtpserver = SMTPServer() | |||
smtpserver_dict[email.sender] = smtpserver | |||
if cint(frappe.defaults.get_defaults().get("hold_queue"))==1: | |||
return | |||
if from_test: | |||
send_one(email.name, smtpserver, auto_commit) | |||
else: | |||
send_one_args = { | |||
'email': email.name, | |||
'smtpserver': smtpserver, | |||
'auto_commit': auto_commit, | |||
} | |||
enqueue( | |||
method = 'frappe.email.queue.send_one', | |||
queue = 'short', | |||
**send_one_args | |||
) | |||
for row in get_queue(): | |||
try: | |||
func = send_mail if from_test else send_mail.enqueue | |||
is_background_task = not from_test | |||
func(email_queue_name = row.name, is_background_task = is_background_task) | |||
except Exception: | |||
frappe.log_error() | |||
# NOTE: removing commit here because we pass auto_commit | |||
# finally: | |||
# frappe.db.commit() | |||
def get_queue(): | |||
return frappe.db.sql('''select | |||
name, sender | |||
@@ -381,213 +365,6 @@ def get_queue(): | |||
by priority desc, creation asc | |||
limit 500''', { 'now': now_datetime() }, as_dict=True) | |||
def send_one(email, smtpserver=None, auto_commit=True, now=False): | |||
'''Send Email Queue with given smtpserver''' | |||
email = frappe.db.sql('''select | |||
name, status, communication, message, sender, reference_doctype, | |||
reference_name, unsubscribe_param, unsubscribe_method, expose_recipients, | |||
show_as_cc, add_unsubscribe_link, attachments, retry | |||
from | |||
`tabEmail Queue` | |||
where | |||
name=%s | |||
for update''', email, as_dict=True) | |||
if len(email): | |||
email = email[0] | |||
else: | |||
return | |||
recipients_list = frappe.db.sql('''select name, recipient, status from | |||
`tabEmail Queue Recipient` where parent=%s''', email.name, as_dict=1) | |||
if frappe.are_emails_muted(): | |||
frappe.msgprint(_("Emails are muted")) | |||
return | |||
if cint(frappe.defaults.get_defaults().get("hold_queue"))==1 : | |||
return | |||
if email.status not in ('Not Sent','Partially Sent') : | |||
# rollback to release lock and return | |||
frappe.db.rollback() | |||
return | |||
frappe.db.sql("""update `tabEmail Queue` set status='Sending', modified=%s where name=%s""", | |||
(now_datetime(), email.name), auto_commit=auto_commit) | |||
if email.communication: | |||
frappe.get_doc('Communication', email.communication).set_delivery_status(commit=auto_commit) | |||
email_sent_to_any_recipient = None | |||
try: | |||
message = None | |||
if not frappe.flags.in_test: | |||
if not smtpserver: | |||
smtpserver = SMTPServer() | |||
# to avoid always using default email account for outgoing | |||
if getattr(frappe.local, "outgoing_email_account", None): | |||
frappe.local.outgoing_email_account = {} | |||
smtpserver.setup_email_account(email.reference_doctype, sender=email.sender) | |||
for recipient in recipients_list: | |||
if recipient.status != "Not Sent": | |||
continue | |||
message = prepare_message(email, recipient.recipient, recipients_list) | |||
if not frappe.flags.in_test: | |||
smtpserver.sess.sendmail(email.sender, recipient.recipient, message) | |||
recipient.status = "Sent" | |||
frappe.db.sql("""update `tabEmail Queue Recipient` set status='Sent', modified=%s where name=%s""", | |||
(now_datetime(), recipient.name), auto_commit=auto_commit) | |||
email_sent_to_any_recipient = any("Sent" == s.status for s in recipients_list) | |||
#if all are sent set status | |||
if email_sent_to_any_recipient: | |||
frappe.db.sql("""update `tabEmail Queue` set status='Sent', modified=%s where name=%s""", | |||
(now_datetime(), email.name), auto_commit=auto_commit) | |||
else: | |||
frappe.db.sql("""update `tabEmail Queue` set status='Error', error=%s | |||
where name=%s""", ("No recipients to send to", email.name), auto_commit=auto_commit) | |||
if frappe.flags.in_test: | |||
frappe.flags.sent_mail = message | |||
return | |||
if email.communication: | |||
frappe.get_doc('Communication', email.communication).set_delivery_status(commit=auto_commit) | |||
if smtpserver.append_emails_to_sent_folder and email_sent_to_any_recipient: | |||
smtpserver.email_account.append_email_to_sent_folder(message) | |||
except (smtplib.SMTPServerDisconnected, | |||
smtplib.SMTPConnectError, | |||
smtplib.SMTPHeloError, | |||
smtplib.SMTPAuthenticationError, | |||
smtplib.SMTPRecipientsRefused, | |||
JobTimeoutException): | |||
# bad connection/timeout, retry later | |||
if email_sent_to_any_recipient: | |||
frappe.db.sql("""update `tabEmail Queue` set status='Partially Sent', modified=%s where name=%s""", | |||
(now_datetime(), email.name), auto_commit=auto_commit) | |||
else: | |||
frappe.db.sql("""update `tabEmail Queue` set status='Not Sent', modified=%s where name=%s""", | |||
(now_datetime(), email.name), auto_commit=auto_commit) | |||
if email.communication: | |||
frappe.get_doc('Communication', email.communication).set_delivery_status(commit=auto_commit) | |||
# no need to attempt further | |||
return | |||
except Exception as e: | |||
frappe.db.rollback() | |||
if email.retry < 3: | |||
frappe.db.sql("""update `tabEmail Queue` set status='Not Sent', modified=%s, retry=retry+1 where name=%s""", | |||
(now_datetime(), email.name), auto_commit=auto_commit) | |||
else: | |||
if email_sent_to_any_recipient: | |||
frappe.db.sql("""update `tabEmail Queue` set status='Partially Errored', error=%s where name=%s""", | |||
(text_type(e), email.name), auto_commit=auto_commit) | |||
else: | |||
frappe.db.sql("""update `tabEmail Queue` set status='Error', error=%s | |||
where name=%s""", (text_type(e), email.name), auto_commit=auto_commit) | |||
if email.communication: | |||
frappe.get_doc('Communication', email.communication).set_delivery_status(commit=auto_commit) | |||
if now: | |||
print(frappe.get_traceback()) | |||
raise e | |||
else: | |||
# log to Error Log | |||
frappe.log_error('frappe.email.queue.flush') | |||
def prepare_message(email, recipient, recipients_list): | |||
message = email.message | |||
if not message: | |||
return "" | |||
# Parse "Email Account" from "Email Sender" | |||
email_account = EmailAccount.find_outgoing(match_by_email=email.sender) | |||
if frappe.conf.use_ssl and email_account.track_email_status: | |||
# Using SSL => Publically available domain => Email Read Reciept Possible | |||
message = message.replace("<!--email open check-->", quopri.encodestring('<img src="https://{}/api/method/frappe.core.doctype.communication.email.mark_email_as_seen?name={}"/>'.format(frappe.local.site, email.communication).encode()).decode()) | |||
else: | |||
# No SSL => No Email Read Reciept | |||
message = message.replace("<!--email open check-->", quopri.encodestring("".encode()).decode()) | |||
if email.add_unsubscribe_link and email.reference_doctype: # is missing the check for unsubscribe message but will not add as there will be no unsubscribe url | |||
unsubscribe_url = get_unsubcribed_url(email.reference_doctype, email.reference_name, recipient, | |||
email.unsubscribe_method, email.unsubscribe_params) | |||
message = message.replace("<!--unsubscribe url-->", quopri.encodestring(unsubscribe_url.encode()).decode()) | |||
if email.expose_recipients == "header": | |||
pass | |||
else: | |||
if email.expose_recipients == "footer": | |||
if isinstance(email.show_as_cc, string_types): | |||
email.show_as_cc = email.show_as_cc.split(",") | |||
email_sent_to = [r.recipient for r in recipients_list] | |||
email_sent_cc = ", ".join([e for e in email_sent_to if e in email.show_as_cc]) | |||
email_sent_to = ", ".join([e for e in email_sent_to if e not in email.show_as_cc]) | |||
if email_sent_cc: | |||
email_sent_message = _("This email was sent to {0} and copied to {1}").format(email_sent_to,email_sent_cc) | |||
else: | |||
email_sent_message = _("This email was sent to {0}").format(email_sent_to) | |||
message = message.replace("<!--cc message-->", quopri.encodestring(email_sent_message.encode()).decode()) | |||
message = message.replace("<!--recipient-->", recipient) | |||
message = (message and message.encode('utf8')) or '' | |||
message = safe_decode(message) | |||
if PY3: | |||
from email.policy import SMTPUTF8 | |||
message = Parser(policy=SMTPUTF8).parsestr(message) | |||
else: | |||
message = Parser().parsestr(message) | |||
if email.attachments: | |||
# On-demand attachments | |||
attachments = json.loads(email.attachments) | |||
for attachment in attachments: | |||
if attachment.get('fcontent'): | |||
continue | |||
fid = attachment.get("fid") | |||
if fid: | |||
_file = frappe.get_doc("File", fid) | |||
fcontent = _file.get_content() | |||
attachment.update({ | |||
'fname': _file.file_name, | |||
'fcontent': fcontent, | |||
'parent': message | |||
}) | |||
attachment.pop("fid", None) | |||
add_attachment(**attachment) | |||
elif attachment.get("print_format_attachment") == 1: | |||
attachment.pop("print_format_attachment", None) | |||
print_format_file = frappe.attach_print(**attachment) | |||
print_format_file.update({"parent": message}) | |||
add_attachment(**print_format_file) | |||
return safe_encode(message.as_string()) | |||
def clear_outbox(days=None): | |||
"""Remove low priority older than 31 days in Outbox or configured in Log Settings. | |||
Note: Used separate query to avoid deadlock | |||
@@ -9,11 +9,24 @@ import _socket, sys | |||
from frappe import _ | |||
from frappe.utils import cint, cstr, parse_addr | |||
CONNECTION_FAILED = _('Could not connect to outgoing email server') | |||
AUTH_ERROR_TITLE = _("Invalid Credentials") | |||
AUTH_ERROR = _("Incorrect email or password. Please check your login credentials.") | |||
SOCKET_ERROR_TITLE = _("Incorrect Configuration") | |||
SOCKET_ERROR = _("Invalid Outgoing Mail Server or Port") | |||
SEND_MAIL_FAILED = _("Unable to send emails at this time") | |||
EMAIL_ACCOUNT_MISSING = _('Email Account not setup. Please create a new Email Account from Setup > Email > Email Account') | |||
class InvalidEmailCredentials(frappe.ValidationError): | |||
pass | |||
def send(email, append_to=None, retry=1): | |||
"""Deprecated: Send the message or add it to Outbox Email""" | |||
def _send(retry): | |||
from frappe.email.doctype.email_account.email_account import EmailAccount | |||
try: | |||
smtpserver = SMTPServer(append_to=append_to) | |||
email_account = EmailAccount.find_outgoing(match_by_doctype=append_to) | |||
smtpserver = email_account.get_smtp_server() | |||
# validate is called in as_string | |||
email_body = email.as_string() | |||
@@ -34,102 +47,80 @@ def send(email, append_to=None, retry=1): | |||
_send(retry) | |||
class SMTPServer: | |||
def __init__(self, login=None, password=None, server=None, port=None, use_tls=None, use_ssl=None, append_to=None): | |||
# get defaults from mail settings | |||
self._sess = None | |||
self.email_account = None | |||
self.server = None | |||
self.append_emails_to_sent_folder = None | |||
if server: | |||
self.server = server | |||
self.port = port | |||
self.use_tls = cint(use_tls) | |||
self.use_ssl = cint(use_ssl) | |||
self.login = login | |||
self.password = password | |||
else: | |||
self.setup_email_account(append_to) | |||
def setup_email_account(self, append_to=None, sender=None): | |||
from frappe.email.doctype.email_account.email_account import EmailAccount | |||
self.email_account = EmailAccount.find_outgoing(match_by_doctype=append_to, match_by_email=sender) | |||
if self.email_account: | |||
self.server = self.email_account.smtp_server | |||
self.login = (getattr(self.email_account, "login_id", None) or self.email_account.email_id) | |||
if self.email_account.no_smtp_authentication or frappe.local.flags.in_test: | |||
self.password = None | |||
else: | |||
self.password = self.email_account._password | |||
self.port = self.email_account.smtp_port | |||
self.use_tls = self.email_account.use_tls | |||
self.sender = self.email_account.email_id | |||
self.use_ssl = self.email_account.use_ssl_for_outgoing | |||
self.append_emails_to_sent_folder = self.email_account.append_emails_to_sent_folder | |||
self.always_use_account_email_id_as_sender = cint(self.email_account.get("always_use_account_email_id_as_sender")) | |||
self.always_use_account_name_as_sender_name = cint(self.email_account.get("always_use_account_name_as_sender_name")) | |||
def __init__(self, server, login=None, password=None, port=None, use_tls=None, use_ssl=None): | |||
self.login = login | |||
self.password = password | |||
self._server = server | |||
self._port = port | |||
self.use_tls = use_tls | |||
self.use_ssl = use_ssl | |||
self._session = None | |||
if not self.server: | |||
frappe.msgprint(EMAIL_ACCOUNT_MISSING, raise_exception=frappe.OutgoingEmailError) | |||
@property | |||
def sess(self): | |||
"""get session""" | |||
if self._sess: | |||
return self._sess | |||
def port(self): | |||
port = self._port or (self.use_ssl and 465) or (self.use_tls and 587) | |||
return cint(port) | |||
# check if email server specified | |||
if not getattr(self, 'server'): | |||
err_msg = _('Email Account not setup. Please create a new Email Account from Setup > Email > Email Account') | |||
frappe.msgprint(err_msg) | |||
raise frappe.OutgoingEmailError(err_msg) | |||
try: | |||
if self.use_ssl: | |||
if not self.port: | |||
self.port = 465 | |||
@property | |||
def server(self): | |||
return cstr(self._server or "") | |||
self._sess = smtplib.SMTP_SSL((self.server or ""), cint(self.port)) | |||
else: | |||
if self.use_tls and not self.port: | |||
self.port = 587 | |||
def secure_session(self, conn): | |||
"""Secure the connection incase of TLS. | |||
""" | |||
if self.use_tls: | |||
conn.ehlo() | |||
conn.starttls() | |||
conn.ehlo() | |||
self._sess = smtplib.SMTP(cstr(self.server or ""), | |||
cint(self.port) or None) | |||
@property | |||
def session(self): | |||
if self.is_session_active(): | |||
return self._session | |||
if not self._sess: | |||
err_msg = _('Could not connect to outgoing email server') | |||
frappe.msgprint(err_msg) | |||
raise frappe.OutgoingEmailError(err_msg) | |||
SMTP = smtplib.SMTP_SSL if self.use_ssl else smtplib.SMTP | |||
if self.use_tls: | |||
self._sess.ehlo() | |||
self._sess.starttls() | |||
self._sess.ehlo() | |||
try: | |||
self._session = SMTP(self.server, self.port) | |||
if not self._session: | |||
frappe.msgprint(CONNECTION_FAILED, raise_exception=frappe.OutgoingEmailError) | |||
self.secure_session(self._session) | |||
if self.login and self.password: | |||
ret = self._sess.login(str(self.login or ""), str(self.password or "")) | |||
res = self._session.login(str(self.login or ""), str(self.password or "")) | |||
# check if logged correctly | |||
if ret[0]!=235: | |||
frappe.msgprint(ret[1]) | |||
raise frappe.OutgoingEmailError(ret[1]) | |||
if res[0]!=235: | |||
frappe.msgprint(res[1], raise_exception=frappe.OutgoingEmailError) | |||
return self._sess | |||
return self._session | |||
except smtplib.SMTPAuthenticationError as e: | |||
from frappe.email.doctype.email_account.email_account import EmailAccount | |||
EmailAccount.throw_invalid_credentials_exception() | |||
self.throw_invalid_credentials_exception() | |||
except _socket.error as e: | |||
# Invalid mail server -- due to refusing connection | |||
frappe.throw( | |||
_("Invalid Outgoing Mail Server or Port"), | |||
exc=frappe.ValidationError, | |||
title=_("Incorrect Configuration") | |||
) | |||
frappe.throw(SOCKET_ERROR, title=SOCKET_ERROR_TITLE) | |||
except smtplib.SMTPException: | |||
frappe.msgprint(_('Unable to send emails at this time')) | |||
frappe.msgprint(SEND_MAIL_FAILED) | |||
raise | |||
def is_session_active(self): | |||
if self._session: | |||
try: | |||
return self._session.noop()[0] == 250 | |||
except Exception: | |||
return False | |||
def quit(self): | |||
if self.is_session_active(): | |||
self._session.quit() | |||
@classmethod | |||
def throw_invalid_credentials_exception(cls): | |||
frappe.throw(AUTH_ERROR, title=AUTH_ERROR_TITLE, exc=InvalidEmailCredentials) |
@@ -7,10 +7,10 @@ from frappe import safe_decode | |||
from frappe.email.receive import Email | |||
from frappe.email.email_body import (replace_filename_with_cid, | |||
get_email, inline_style_in_html, get_header) | |||
from frappe.email.queue import prepare_message, get_email_queue | |||
from frappe.email.queue import get_email_queue | |||
from frappe.email.doctype.email_queue.email_queue import SendMailContext | |||
from six import PY3 | |||
class TestEmailBody(unittest.TestCase): | |||
def setUp(self): | |||
email_html = ''' | |||
@@ -57,7 +57,8 @@ This is the text version of this email | |||
content='<h1>' + uni_chr1 + 'abcd' + uni_chr2 + '</h1>', | |||
formatted='<h1>' + uni_chr1 + 'abcd' + uni_chr2 + '</h1>', | |||
text_content='whatever') | |||
result = prepare_message(email=email, recipient='test@test.com', recipients_list=[]) | |||
mail_ctx = SendMailContext(queue_doc = email) | |||
result = mail_ctx.build_message(recipient_email = 'test@test.com') | |||
self.assertTrue(b"<h1>=EA=80=80abcd=DE=B4</h1>" in result) | |||
def test_prepare_message_returns_cr_lf(self): | |||
@@ -68,8 +69,10 @@ This is the text version of this email | |||
content='<h1>\n this is a test of newlines\n' + '</h1>', | |||
formatted='<h1>\n this is a test of newlines\n' + '</h1>', | |||
text_content='whatever') | |||
result = safe_decode(prepare_message(email=email, | |||
recipient='test@test.com', recipients_list=[])) | |||
mail_ctx = SendMailContext(queue_doc = email) | |||
result = safe_decode(mail_ctx.build_message(recipient_email='test@test.com')) | |||
if PY3: | |||
self.assertTrue(result.count('\n') == result.count("\r")) | |||
else: | |||
@@ -75,4 +75,4 @@ def make_server(port, ssl, tls): | |||
use_tls = tls | |||
) | |||
server.sess | |||
server.session |