Delay sending of notification and provide upto 3 attempts in case of deadlockversion-14
@@ -103,7 +103,7 @@ def _is_scheduler_enabled(): | |||||
enable_scheduler = False | enable_scheduler = False | ||||
try: | try: | ||||
frappe.connect() | frappe.connect() | ||||
enable_scheduler = cint(frappe.db.get_default("enable_scheduler")) | |||||
enable_scheduler = cint(frappe.db.get_single_value("System Settings", "enable_scheduler")) and True or False | |||||
except: | except: | ||||
pass | pass | ||||
finally: | finally: | ||||
@@ -72,6 +72,26 @@ class Communication(Document): | |||||
["email_id", "always_use_account_email_id_as_sender"], as_dict=True) or frappe._dict() | ["email_id", "always_use_account_email_id_as_sender"], as_dict=True) or frappe._dict() | ||||
def notify(self, print_html=None, print_format=None, attachments=None, recipients=None, except_recipient=False): | def notify(self, print_html=None, print_format=None, attachments=None, recipients=None, except_recipient=False): | ||||
"""Calls a delayed celery task 'sendmail' that enqueus email in Bulk Email queue | |||||
:param print_html: Send given value as HTML attachment | |||||
:param print_format: Attach print format of parent document | |||||
:param attachments: A list of filenames that should be attached when sending this email | |||||
:param recipients: Email recipients | |||||
:param except_recipient: True when pulling email, the notification shouldn't go to the main recipient | |||||
""" | |||||
if frappe.flags.in_test: | |||||
# for test cases, run synchronously | |||||
self._notify(print_html=print_html, print_format=print_format, attachments=attachments, | |||||
recipients=recipients, except_recipient=except_recipient) | |||||
else: | |||||
from frappe.tasks import sendmail | |||||
sendmail.delay(frappe.local.site, self.name, | |||||
print_html=print_html, print_format=print_format, attachments=attachments, | |||||
recipients=recipients, except_recipient=except_recipient) | |||||
def _notify(self, print_html=None, print_format=None, attachments=None, recipients=None, except_recipient=False): | |||||
self.prepare_to_notify(print_html, print_format, attachments) | self.prepare_to_notify(print_html, print_format, attachments) | ||||
if not recipients: | if not recipients: | ||||
recipients = self.get_recipients(except_recipient=except_recipient) | recipients = self.get_recipients(except_recipient=except_recipient) | ||||
@@ -256,6 +276,10 @@ def make(doctype=None, name=None, content=None, subject=None, sent_or_received = | |||||
}) | }) | ||||
comm.insert(ignore_permissions=True) | comm.insert(ignore_permissions=True) | ||||
# needed for communication.notify which uses celery delay | |||||
# if not committed, delayed task doesn't find the communication | |||||
frappe.db.commit() | |||||
recipients = None | recipients = None | ||||
if send_email: | if send_email: | ||||
comm.send_me_a_copy = send_me_a_copy | comm.send_me_a_copy = send_me_a_copy | ||||
@@ -124,7 +124,7 @@ class EmailAccount(Document): | |||||
exceptions = [] | exceptions = [] | ||||
for raw in incoming_mails: | for raw in incoming_mails: | ||||
try: | try: | ||||
self.insert_communication(raw) | |||||
communication = self.insert_communication(raw) | |||||
except Exception: | except Exception: | ||||
frappe.db.rollback() | frappe.db.rollback() | ||||
@@ -132,6 +132,7 @@ class EmailAccount(Document): | |||||
else: | else: | ||||
frappe.db.commit() | frappe.db.commit() | ||||
communication.notify(attachments=communication._attachments, except_recipient=True) | |||||
if exceptions: | if exceptions: | ||||
raise Exception, frappe.as_json(exceptions) | raise Exception, frappe.as_json(exceptions) | ||||
@@ -156,7 +157,7 @@ class EmailAccount(Document): | |||||
communication.insert(ignore_permissions = 1) | communication.insert(ignore_permissions = 1) | ||||
# save attachments | # save attachments | ||||
email.save_attachments_in_doc(communication) | |||||
communication._attachments = email.save_attachments_in_doc(communication) | |||||
if self.enable_auto_reply and getattr(communication, "is_first", False): | if self.enable_auto_reply and getattr(communication, "is_first", False): | ||||
self.send_auto_reply(communication, email) | self.send_auto_reply(communication, email) | ||||
@@ -164,7 +165,8 @@ class EmailAccount(Document): | |||||
# notify all participants of this thread | # notify all participants of this thread | ||||
# convert content to HTML - by default text parts of replies are used. | # convert content to HTML - by default text parts of replies are used. | ||||
communication.content = markdown2.markdown(communication.content) | communication.content = markdown2.markdown(communication.content) | ||||
communication.notify(attachments=email.attachments, except_recipient = True) | |||||
return communication | |||||
def set_thread(self, communication, email): | def set_thread(self, communication, email): | ||||
"""Appends communication to parent based on thread ID. Will extract | """Appends communication to parent based on thread ID. Will extract | ||||
@@ -296,10 +296,13 @@ class Email: | |||||
def save_attachments_in_doc(self, doc): | def save_attachments_in_doc(self, doc): | ||||
"""Save email attachments in given document.""" | """Save email attachments in given document.""" | ||||
from frappe.utils.file_manager import save_file, MaxFileSizeReachedError | from frappe.utils.file_manager import save_file, MaxFileSizeReachedError | ||||
saved_attachments = [] | |||||
for attachment in self.attachments: | for attachment in self.attachments: | ||||
try: | try: | ||||
save_file(attachment['fname'], attachment['fcontent'], | |||||
file_data = save_file(attachment['fname'], attachment['fcontent'], | |||||
doc.doctype, doc.name) | doc.doctype, doc.name) | ||||
saved_attachments.append(file_data.file_name) | |||||
except MaxFileSizeReachedError: | except MaxFileSizeReachedError: | ||||
# WARNING: bypass max file size exception | # WARNING: bypass max file size exception | ||||
pass | pass | ||||
@@ -307,6 +310,8 @@ class Email: | |||||
# same file attached twice?? | # same file attached twice?? | ||||
pass | pass | ||||
return saved_attachments | |||||
def get_thread_id(self): | def get_thread_id(self): | ||||
"""Extract thread ID from `[]`""" | """Extract thread ID from `[]`""" | ||||
import re | import re | ||||
@@ -7,6 +7,8 @@ from frappe.utils.scheduler import enqueue_events | |||||
from frappe.celery_app import get_celery, celery_task, task_logger, LONGJOBS_PREFIX | from frappe.celery_app import get_celery, celery_task, task_logger, LONGJOBS_PREFIX | ||||
from frappe.utils import get_sites | from frappe.utils import get_sites | ||||
from frappe.utils.file_lock import create_lock, delete_lock | from frappe.utils.file_lock import create_lock, delete_lock | ||||
import time | |||||
import MySQLdb | |||||
@celery_task() | @celery_task() | ||||
def sync_queues(): | def sync_queues(): | ||||
@@ -122,3 +124,33 @@ def pull_from_email_account(site, email_account): | |||||
frappe.db.commit() | frappe.db.commit() | ||||
finally: | finally: | ||||
frappe.destroy() | frappe.destroy() | ||||
@celery_task() | |||||
def sendmail(site, communication_name, print_html=None, print_format=None, attachments=None, recipients=None, except_recipient=False): | |||||
try: | |||||
frappe.connect(site=site) | |||||
# upto 3 retries | |||||
for i in xrange(3): | |||||
try: | |||||
communication = frappe.get_doc("Communication", communication_name) | |||||
communication._notify(print_html=print_html, print_format=print_format, attachments=attachments, recipients=recipients, except_recipient=except_recipient) | |||||
except MySQLdb.OperationalError, e: | |||||
# deadlock, try again | |||||
if e.args[0]==1213: | |||||
frappe.db.rollback() | |||||
time.sleep(1) | |||||
continue | |||||
else: | |||||
raise | |||||
else: | |||||
break | |||||
except: | |||||
frappe.db.rollback() | |||||
else: | |||||
frappe.db.commit() | |||||
finally: | |||||
frappe.destroy() |