@@ -72,6 +72,26 @@ class Communication(Document): | |||
["email_id", "always_use_account_email_id_as_sender"], as_dict=True) or frappe._dict() | |||
def notify(self, print_html=None, print_format=None, attachments=None, recipients=None, except_recipient=False): | |||
"""Calls a delayed celery task 'sendmail' that enqueus email in Bulk Email queue | |||
:param print_html: Send given value as HTML attachment | |||
:param print_format: Attach print format of parent document | |||
:param attachments: A list of filenames that should be attached when sending this email | |||
:param recipients: Email recipients | |||
:param except_recipient: True when pulling email, the notification shouldn't go to the main recipient | |||
""" | |||
if frappe.flags.in_test: | |||
# for test cases, run synchronously | |||
self._notify(print_html=print_html, print_format=print_format, attachments=attachments, | |||
recipients=recipients, except_recipient=except_recipient) | |||
else: | |||
from frappe.tasks import sendmail | |||
sendmail.delay(frappe.local.site, self.name, | |||
print_html=print_html, print_format=print_format, attachments=attachments, | |||
recipients=recipients, except_recipient=except_recipient) | |||
def _notify(self, print_html=None, print_format=None, attachments=None, recipients=None, except_recipient=False): | |||
self.prepare_to_notify(print_html, print_format, attachments) | |||
if not recipients: | |||
recipients = self.get_recipients(except_recipient=except_recipient) | |||
@@ -256,6 +276,10 @@ def make(doctype=None, name=None, content=None, subject=None, sent_or_received = | |||
}) | |||
comm.insert(ignore_permissions=True) | |||
# needed for communication.notify which uses celery delay | |||
# if not committed, delayed task doesn't find the communication | |||
frappe.db.commit() | |||
recipients = None | |||
if send_email: | |||
comm.send_me_a_copy = send_me_a_copy | |||
@@ -124,7 +124,7 @@ class EmailAccount(Document): | |||
exceptions = [] | |||
for raw in incoming_mails: | |||
try: | |||
self.insert_communication(raw) | |||
communication = self.insert_communication(raw) | |||
except Exception: | |||
frappe.db.rollback() | |||
@@ -132,6 +132,7 @@ class EmailAccount(Document): | |||
else: | |||
frappe.db.commit() | |||
communication.notify(attachments=communication._attachments, except_recipient=True) | |||
if exceptions: | |||
raise Exception, frappe.as_json(exceptions) | |||
@@ -156,7 +157,7 @@ class EmailAccount(Document): | |||
communication.insert(ignore_permissions = 1) | |||
# save attachments | |||
email.save_attachments_in_doc(communication) | |||
communication._attachments = email.save_attachments_in_doc(communication) | |||
if self.enable_auto_reply and getattr(communication, "is_first", False): | |||
self.send_auto_reply(communication, email) | |||
@@ -164,7 +165,8 @@ class EmailAccount(Document): | |||
# notify all participants of this thread | |||
# convert content to HTML - by default text parts of replies are used. | |||
communication.content = markdown2.markdown(communication.content) | |||
communication.notify(attachments=email.attachments, except_recipient = True) | |||
return communication | |||
def set_thread(self, communication, email): | |||
"""Appends communication to parent based on thread ID. Will extract | |||
@@ -296,10 +296,13 @@ class Email: | |||
def save_attachments_in_doc(self, doc): | |||
"""Save email attachments in given document.""" | |||
from frappe.utils.file_manager import save_file, MaxFileSizeReachedError | |||
saved_attachments = [] | |||
for attachment in self.attachments: | |||
try: | |||
save_file(attachment['fname'], attachment['fcontent'], | |||
file_data = save_file(attachment['fname'], attachment['fcontent'], | |||
doc.doctype, doc.name) | |||
saved_attachments.append(file_data.file_name) | |||
except MaxFileSizeReachedError: | |||
# WARNING: bypass max file size exception | |||
pass | |||
@@ -307,6 +310,8 @@ class Email: | |||
# same file attached twice?? | |||
pass | |||
return saved_attachments | |||
def get_thread_id(self): | |||
"""Extract thread ID from `[]`""" | |||
import re | |||
@@ -7,6 +7,8 @@ from frappe.utils.scheduler import enqueue_events | |||
from frappe.celery_app import get_celery, celery_task, task_logger, LONGJOBS_PREFIX | |||
from frappe.utils import get_sites | |||
from frappe.utils.file_lock import create_lock, delete_lock | |||
import time | |||
import MySQLdb | |||
@celery_task() | |||
def sync_queues(): | |||
@@ -122,3 +124,33 @@ def pull_from_email_account(site, email_account): | |||
frappe.db.commit() | |||
finally: | |||
frappe.destroy() | |||
@celery_task() | |||
def sendmail(site, communication_name, print_html=None, print_format=None, attachments=None, recipients=None, except_recipient=False): | |||
try: | |||
frappe.connect(site=site) | |||
# upto 3 retries | |||
for i in xrange(3): | |||
try: | |||
communication = frappe.get_doc("Communication", communication_name) | |||
communication._notify(print_html=print_html, print_format=print_format, attachments=attachments, recipients=recipients, except_recipient=except_recipient) | |||
except MySQLdb.OperationalError, e: | |||
# deadlock, try again | |||
if e.args[0]==1213: | |||
frappe.db.rollback() | |||
time.sleep(1) | |||
continue | |||
else: | |||
raise | |||
else: | |||
break | |||
except: | |||
frappe.db.rollback() | |||
else: | |||
frappe.db.commit() | |||
finally: | |||
frappe.destroy() |