* start of tests * rfc compliant emails for frappe. * fix for deepsource * fix length of strings. * fix bug that I'm not sure where it came from? * codacy and deepsource issues trying to keep them happy * take a punt in the dark * fix: use SMTPUTF8 instead of SMTP and other minor fixes Signed-off-by: Chinmay D. Pai <chinmaydpai@gmail.com>version-14
@@ -322,16 +322,16 @@ class EmailAccount(Document): | |||
unhandled_email.insert(ignore_permissions=True) | |||
frappe.db.commit() | |||
def insert_communication(self, msg, args={}): | |||
def insert_communication(self, msg, args=None): | |||
if isinstance(msg, list): | |||
raw, uid, seen = msg | |||
else: | |||
raw = msg | |||
uid = -1 | |||
seen = 0 | |||
if args.get("uid", -1): uid = args.get("uid", -1) | |||
if args.get("seen", 0): seen = args.get("seen", 0) | |||
if isinstance(args, dict): | |||
if args.get("uid", -1): uid = args.get("uid", -1) | |||
if args.get("seen", 0): seen = args.get("seen", 0) | |||
email = Email(raw) | |||
@@ -355,7 +355,7 @@ class EmailAccount(Document): | |||
name = names[0].get("name") | |||
# email is already available update communication uid instead | |||
frappe.db.set_value("Communication", name, "uid", uid, update_modified=False) | |||
return | |||
return frappe.get_doc("Communication", name) | |||
if email.content_type == 'text/html': | |||
email.content = clean_email_html(email.content) | |||
@@ -3,22 +3,25 @@ | |||
from __future__ import unicode_literals | |||
import frappe | |||
import sys | |||
from six.moves import html_parser as HTMLParser | |||
import smtplib, quopri, json | |||
from frappe import msgprint, throw, _, safe_decode | |||
from frappe import msgprint, _, safe_decode | |||
from frappe.email.smtp import SMTPServer, get_outgoing_email_account | |||
from frappe.email.email_body import get_email, get_formatted_html, add_attachment | |||
from frappe.utils.verified_command import get_signed_params, verify_request | |||
from html2text import html2text | |||
from frappe.utils import get_url, nowdate, encode, now_datetime, add_days, split_emails, cstr, cint | |||
from frappe.utils import get_url, nowdate, now_datetime, add_days, split_emails, cstr, cint | |||
from rq.timeouts import JobTimeoutException | |||
from six import text_type, string_types | |||
from six import text_type, string_types, PY3 | |||
from email.parser import Parser | |||
class EmailLimitCrossedError(frappe.ValidationError): pass | |||
def send(recipients=None, sender=None, subject=None, message=None, text_content=None, reference_doctype=None, | |||
reference_name=None, unsubscribe_method=None, unsubscribe_params=None, unsubscribe_message=None, | |||
attachments=None, reply_to=None, cc=[], bcc=[], message_id=None, in_reply_to=None, send_after=None, | |||
attachments=None, reply_to=None, cc=None, bcc=None, message_id=None, in_reply_to=None, send_after=None, | |||
expose_recipients=None, send_priority=1, communication=None, now=False, read_receipt=None, | |||
queue_separately=False, is_notification=False, add_unsubscribe_link=1, inline_images=None, | |||
header=None, print_letterhead=False): | |||
@@ -52,6 +55,11 @@ def send(recipients=None, sender=None, subject=None, message=None, text_content= | |||
if not recipients and not cc: | |||
return | |||
if not cc: | |||
cc = [] | |||
if not bcc: | |||
bcc = [] | |||
if isinstance(recipients, string_types): | |||
recipients = split_emails(recipients) | |||
@@ -68,7 +76,6 @@ def send(recipients=None, sender=None, subject=None, message=None, text_content= | |||
if not sender or sender == "Administrator": | |||
sender = email_account.default_sender | |||
if not text_content: | |||
try: | |||
text_content = html2text(message) | |||
@@ -404,7 +411,7 @@ def send_one(email, smtpserver=None, auto_commit=True, now=False, from_test=Fals | |||
message = prepare_message(email, recipient.recipient, recipients_list) | |||
if not frappe.flags.in_test: | |||
smtpserver.sess.sendmail(email.sender, recipient.recipient, encode(message)) | |||
smtpserver.sess.sendmail(email.sender, recipient.recipient, message) | |||
recipient.status = "Sent" | |||
frappe.db.sql("""update `tabEmail Queue Recipient` set status='Sent', modified=%s where name=%s""", | |||
@@ -509,37 +516,41 @@ def prepare_message(email, recipient, recipients_list): | |||
message = (message and message.encode('utf8')) or '' | |||
message = safe_decode(message) | |||
if not email.attachments: | |||
return message | |||
# On-demand attachments | |||
from email.parser import Parser | |||
msg_obj = Parser().parsestr(message) | |||
attachments = json.loads(email.attachments) | |||
for attachment in attachments: | |||
if attachment.get('fcontent'): continue | |||
fid = attachment.get("fid") | |||
if fid: | |||
_file = frappe.get_doc("File", fid) | |||
fcontent = _file.get_content() | |||
attachment.update({ | |||
'fname': _file.file_name, | |||
'fcontent': fcontent, | |||
'parent': msg_obj | |||
}) | |||
attachment.pop("fid", None) | |||
add_attachment(**attachment) | |||
elif attachment.get("print_format_attachment") == 1: | |||
attachment.pop("print_format_attachment", None) | |||
print_format_file = frappe.attach_print(**attachment) | |||
print_format_file.update({"parent": msg_obj}) | |||
add_attachment(**print_format_file) | |||
return msg_obj.as_string() | |||
if PY3: | |||
from email.policy import SMTPUTF8 | |||
message = Parser(policy=SMTPUTF8).parsestr(message) | |||
else: | |||
message = Parser().parsestr(message) | |||
if email.attachments: | |||
# On-demand attachments | |||
attachments = json.loads(email.attachments) | |||
for attachment in attachments: | |||
if attachment.get('fcontent'): | |||
continue | |||
fid = attachment.get("fid") | |||
if fid: | |||
_file = frappe.get_doc("File", fid) | |||
fcontent = _file.get_content() | |||
attachment.update({ | |||
'fname': _file.file_name, | |||
'fcontent': fcontent, | |||
'parent': message | |||
}) | |||
attachment.pop("fid", None) | |||
add_attachment(**attachment) | |||
elif attachment.get("print_format_attachment") == 1: | |||
attachment.pop("print_format_attachment", None) | |||
print_format_file = frappe.attach_print(**attachment) | |||
print_format_file.update({"parent": message}) | |||
add_attachment(**print_format_file) | |||
return message.as_string() | |||
def clear_outbox(): | |||
"""Remove low priority older than 31 days in Outbox and expire mails not sent for 7 days. | |||
@@ -5,7 +5,10 @@ from __future__ import unicode_literals | |||
import unittest, os, base64 | |||
from frappe.email.receive import Email | |||
from frappe.email.email_body import (replace_filename_with_cid, | |||
get_email, inline_style_in_html, get_header) | |||
get_email, inline_style_in_html, get_header) | |||
from frappe.email.queue import prepare_message, get_email_queue | |||
from six import PY3 | |||
class TestEmailBody(unittest.TestCase): | |||
def setUp(self): | |||
@@ -37,6 +40,53 @@ This is the text version of this email | |||
text_content=email_text | |||
).as_string() | |||
def test_prepare_message_returns_already_encoded_string(self): | |||
if PY3: | |||
uni_chr1 = chr(40960) | |||
uni_chr2 = chr(1972) | |||
else: | |||
uni_chr1 = unichr(40960) | |||
uni_chr2 = unichr(1972) | |||
email = get_email_queue( | |||
recipients=['test@example.com'], | |||
sender='me@example.com', | |||
subject='Test Subject', | |||
content='<h1>' + uni_chr1 + 'abcd' + uni_chr2 + '</h1>', | |||
formatted='<h1>' + uni_chr1 + 'abcd' + uni_chr2 + '</h1>', | |||
text_content='whatever') | |||
result = prepare_message(email=email, recipient='test@test.com', recipients_list=[]) | |||
self.assertTrue("<h1>=EA=80=80abcd=DE=B4</h1>" in result) | |||
def test_prepare_message_returns_cr_lf(self): | |||
email = get_email_queue( | |||
recipients=['test@example.com'], | |||
sender='me@example.com', | |||
subject='Test Subject', | |||
content='<h1>\n this is a test of newlines\n' + '</h1>', | |||
formatted='<h1>\n this is a test of newlines\n' + '</h1>', | |||
text_content='whatever') | |||
result = prepare_message(email=email, recipient='test@test.com', recipients_list=[]) | |||
if PY3: | |||
self.assertTrue(result.count('\n') == result.count("\r")) | |||
else: | |||
self.assertTrue(True) | |||
def test_rfc_5322_header_is_wrapped_at_998_chars(self): | |||
# unfortunately the db can only hold 140 chars so this can't be tested properly. test at max chars anyway. | |||
email = get_email_queue( | |||
recipients=['test@example.com'], | |||
sender='me@example.com', | |||
subject='Test Subject', | |||
content='<h1>Whatever</h1>', | |||
text_content='whatever', | |||
message_id= "a.really.long.message.id.that.should.not.wrap.until.998.if.it.does.then.exchange.will.break" + | |||
".really.long.message.id.that.should.not.wrap.unti") | |||
result = prepare_message(email=email, recipient='test@test.com', recipients_list=[]) | |||
self.assertTrue( | |||
"a.really.long.message.id.that.should.not.wrap.until.998.if.it.does.then.exchange.will.break" + | |||
".really.long.message.id.that.should.not.wrap.unti" in result) | |||
def test_image(self): | |||
img_signature = ''' | |||
@@ -49,7 +99,6 @@ Content-Disposition: inline; filename="favicon.png" | |||
self.assertTrue(img_signature in self.email_string) | |||
self.assertTrue(self.img_base64 in self.email_string) | |||
def test_text_content(self): | |||
text_content = ''' | |||
Content-Type: text/plain; charset="utf-8" | |||
@@ -62,7 +111,6 @@ This is the text version of this email | |||
''' | |||
self.assertTrue(text_content in self.email_string) | |||
def test_email_content(self): | |||
html_head = ''' | |||
Content-Type: text/html; charset="utf-8" | |||
@@ -79,7 +127,6 @@ w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> | |||
self.assertTrue(html_head in self.email_string) | |||
self.assertTrue(html in self.email_string) | |||
def test_replace_filename_with_cid(self): | |||
original_message = ''' | |||
<div> | |||
@@ -152,6 +199,7 @@ Reply-To: test2_@erpnext.com | |||
mail = Email(content_bytes) | |||
self.assertEqual(mail.text_content, text_content) | |||
def fixed_column_width(string, chunk_size): | |||
parts = [string[0+i:chunk_size+i] for i in range(0, len(string), chunk_size)] | |||
return '\n'.join(parts) | |||
parts = [string[0 + i:chunk_size + i] for i in range(0, len(string), chunk_size)] | |||
return '\n'.join(parts) |
@@ -4,11 +4,14 @@ | |||
from __future__ import unicode_literals | |||
import unittest, frappe, re, email | |||
from six import PY3 | |||
from frappe.test_runner import make_test_records | |||
make_test_records("User") | |||
make_test_records("Email Account") | |||
class TestEmail(unittest.TestCase): | |||
def setUp(self): | |||
frappe.db.sql("""delete from `tabEmail Unsubscribe`""") | |||
@@ -16,11 +19,11 @@ class TestEmail(unittest.TestCase): | |||
frappe.db.sql("""delete from `tabEmail Queue Recipient`""") | |||
def test_email_queue(self, send_after=None): | |||
frappe.sendmail(recipients = ['test@example.com', 'test1@example.com'], | |||
sender="admin@example.com", | |||
reference_doctype='User', reference_name='Administrator', | |||
subject='Testing Queue', message='This mail is queued!', | |||
unsubscribe_message="Unsubscribe", send_after=send_after) | |||
frappe.sendmail(recipients=['test@example.com', 'test1@example.com'], | |||
sender="admin@example.com", | |||
reference_doctype='User', reference_name='Administrator', | |||
subject='Testing Queue', message='This mail is queued!', | |||
unsubscribe_message="Unsubscribe", send_after=send_after) | |||
email_queue = frappe.db.sql("""select name,message from `tabEmail Queue` where status='Not Sent'""", as_dict=1) | |||
self.assertEqual(len(email_queue), 1) | |||
@@ -32,7 +35,7 @@ class TestEmail(unittest.TestCase): | |||
self.assertTrue('<!--unsubscribe url-->' in email_queue[0]['message']) | |||
def test_send_after(self): | |||
self.test_email_queue(send_after = 1) | |||
self.test_email_queue(send_after=1) | |||
from frappe.email.queue import flush | |||
flush(from_test=True) | |||
email_queue = frappe.db.sql("""select name from `tabEmail Queue` where status='Sent'""", as_dict=1) | |||
@@ -52,12 +55,13 @@ class TestEmail(unittest.TestCase): | |||
self.assertTrue('Unsubscribe' in frappe.safe_decode(frappe.flags.sent_mail)) | |||
def test_cc_header(self): | |||
#test if sending with cc's makes it into header | |||
# test if sending with cc's makes it into header | |||
frappe.sendmail(recipients=['test@example.com'], | |||
cc=['test1@example.com'], | |||
sender="admin@example.com", | |||
reference_doctype='User', reference_name="Administrator", | |||
subject='Testing Email Queue', message='This is mail is queued!', unsubscribe_message="Unsubscribe", expose_recipients="header") | |||
cc=['test1@example.com'], | |||
sender="admin@example.com", | |||
reference_doctype='User', reference_name="Administrator", | |||
subject='Testing Email Queue', message='This is mail is queued!', | |||
unsubscribe_message="Unsubscribe", expose_recipients="header") | |||
email_queue = frappe.db.sql("""select name from `tabEmail Queue` where status='Not Sent'""", as_dict=1) | |||
self.assertEqual(len(email_queue), 1) | |||
queue_recipients = [r.recipient for r in frappe.db.sql("""select recipient from `tabEmail Queue Recipient` | |||
@@ -71,12 +75,13 @@ class TestEmail(unittest.TestCase): | |||
self.assertTrue('CC: test1@example.com' in message) | |||
def test_cc_footer(self): | |||
#test if sending with cc's makes it into header | |||
# test if sending with cc's makes it into header | |||
frappe.sendmail(recipients=['test@example.com'], | |||
cc=['test1@example.com'], | |||
sender="admin@example.com", | |||
reference_doctype='User', reference_name="Administrator", | |||
subject='Testing Email Queue', message='This is mail is queued!', unsubscribe_message="Unsubscribe", expose_recipients="footer", now=True) | |||
cc=['test1@example.com'], | |||
sender="admin@example.com", | |||
reference_doctype='User', reference_name="Administrator", | |||
subject='Testing Email Queue', message='This is mail is queued!', | |||
unsubscribe_message="Unsubscribe", expose_recipients="footer", now=True) | |||
email_queue = frappe.db.sql("""select name from `tabEmail Queue` where status='Sent'""", as_dict=1) | |||
self.assertEqual(len(email_queue), 1) | |||
queue_recipients = [r.recipient for r in frappe.db.sql("""select recipient from `tabEmail Queue Recipient` | |||
@@ -84,15 +89,17 @@ class TestEmail(unittest.TestCase): | |||
self.assertTrue('test@example.com' in queue_recipients) | |||
self.assertTrue('test1@example.com' in queue_recipients) | |||
self.assertTrue('This email was sent to test@example.com and copied to test1@example.com' in frappe.safe_decode(frappe.flags.sent_mail)) | |||
self.assertTrue('This email was sent to test@example.com and copied to test1@example.com' in frappe.safe_decode( | |||
frappe.flags.sent_mail)) | |||
def test_expose(self): | |||
from frappe.utils.verified_command import verify_request | |||
frappe.sendmail(recipients=['test@example.com'], | |||
cc=['test1@example.com'], | |||
sender="admin@example.com", | |||
reference_doctype='User', reference_name="Administrator", | |||
subject='Testing Email Queue', message='This is mail is queued!', unsubscribe_message="Unsubscribe", now=True) | |||
cc=['test1@example.com'], | |||
sender="admin@example.com", | |||
reference_doctype='User', reference_name="Administrator", | |||
subject='Testing Email Queue', message='This is mail is queued!', | |||
unsubscribe_message="Unsubscribe", now=True) | |||
email_queue = frappe.db.sql("""select name from `tabEmail Queue` where status='Sent'""", as_dict=1) | |||
self.assertEqual(len(email_queue), 1) | |||
queue_recipients = [r.recipient for r in frappe.db.sql("""select recipient from `tabEmail Queue Recipient` | |||
@@ -109,7 +116,14 @@ class TestEmail(unittest.TestCase): | |||
content = part.get_payload(decode=True) | |||
if content: | |||
frappe.local.flags.signed_query_string = re.search(r'(?<=/api/method/frappe.email.queue.unsubscribe\?).*(?=\n)', content.decode()).group(0) | |||
if PY3: | |||
eol = "\r\n" | |||
else: | |||
eol = "\n" | |||
frappe.local.flags.signed_query_string = \ | |||
re.search(r'(?<=/api/method/frappe.email.queue.unsubscribe\?).*(?=' + eol + ')', | |||
content.decode()).group(0) | |||
self.assertTrue(verify_request()) | |||
break | |||
@@ -121,7 +135,7 @@ class TestEmail(unittest.TestCase): | |||
email_queue = frappe.db.sql("""select name from `tabEmail Queue` where status='Expired'""", as_dict=1) | |||
self.assertEqual(len(email_queue), 1) | |||
queue_recipients = [r.recipient for r in frappe.db.sql("""select recipient from `tabEmail Queue Recipient` | |||
where parent = %s""",email_queue[0].name, as_dict=1)] | |||
where parent = %s""", email_queue[0].name, as_dict=1)] | |||
self.assertTrue('test@example.com' in queue_recipients) | |||
self.assertTrue('test1@example.com' in queue_recipients) | |||
self.assertEqual(len(queue_recipients), 2) | |||
@@ -131,19 +145,20 @@ class TestEmail(unittest.TestCase): | |||
unsubscribe(doctype="User", name="Administrator", email="test@example.com") | |||
self.assertTrue(frappe.db.get_value("Email Unsubscribe", | |||
{"reference_doctype": "User", "reference_name": "Administrator", "email": "test@example.com"})) | |||
{"reference_doctype": "User", "reference_name": "Administrator", | |||
"email": "test@example.com"})) | |||
before = frappe.db.sql("""select count(name) from `tabEmail Queue` where status='Not Sent'""")[0][0] | |||
send(recipients = ['test@example.com', 'test1@example.com'], | |||
sender="admin@example.com", | |||
reference_doctype='User', reference_name= "Administrator", | |||
subject='Testing Email Queue', message='This is mail is queued!', unsubscribe_message="Unsubscribe") | |||
send(recipients=['test@example.com', 'test1@example.com'], | |||
sender="admin@example.com", | |||
reference_doctype='User', reference_name="Administrator", | |||
subject='Testing Email Queue', message='This is mail is queued!', unsubscribe_message="Unsubscribe") | |||
# this is sent async (?) | |||
email_queue = frappe.db.sql("""select name from `tabEmail Queue` where status='Not Sent'""", | |||
as_dict=1) | |||
as_dict=1) | |||
self.assertEqual(len(email_queue), before + 1) | |||
queue_recipients = [r.recipient for r in frappe.db.sql("""select recipient from `tabEmail Queue Recipient` | |||
where status='Not Sent'""", as_dict=1)] | |||
@@ -152,7 +167,6 @@ class TestEmail(unittest.TestCase): | |||
self.assertEqual(len(queue_recipients), 1) | |||
self.assertTrue('Unsubscribe' in frappe.safe_decode(frappe.flags.sent_mail)) | |||
def test_image_parsing(self): | |||
import re | |||
email_account = frappe.get_doc('Email Account', '_Test Email Account 1') | |||
@@ -166,6 +180,6 @@ class TestEmail(unittest.TestCase): | |||
self.assertTrue(re.search('''<img[^>]*src=["']/private/files/rtco2.png[^>]*>''', communication.content)) | |||
if __name__=='__main__': | |||
if __name__ == '__main__': | |||
frappe.connect() | |||
unittest.main() |