ソースを参照

Merge pull request #1846 from anandpdoshi/hotfix

[fix] index and for-update fix in bulk flush, and use only shortjob workers to monitor celery default queue
version-14
Anand Doshi 9年前
committed by GitHub
コミット
0e4bde0d34
4個のファイルの変更338行の追加311行の削除
  1. +55
    -34
      frappe/email/bulk.py
  2. +276
    -276
      frappe/email/doctype/bulk_email/bulk_email.json
  3. +4
    -0
      frappe/email/doctype/bulk_email/bulk_email.py
  4. +3
    -1
      frappe/tasks.py

+ 55
- 34
frappe/email/bulk.py ファイルの表示

@@ -11,6 +11,7 @@ from frappe.email.email_body import get_email, get_formatted_html
from frappe.utils.verified_command import get_signed_params, verify_request
from html2text import html2text
from frappe.utils import get_url, nowdate, encode, now_datetime, add_days, split_emails, cstr
from frappe.utils.scheduler import log

class BulkLimitCrossedError(frappe.ValidationError): pass

@@ -256,56 +257,76 @@ def flush(from_test=False):
where datediff(curdate(), creation) > 3 and status='Not Sent'""", auto_commit=auto_commit)

for i in xrange(500):
email = frappe.db.sql("""select * from `tabBulk Email` where
status='Not Sent' and ifnull(send_after, "2000-01-01 00:00:00") < %s
order by priority desc, creation asc limit 1 for update""", now_datetime(), as_dict=1)
# don't use for update here, as it leads deadlocks
email = frappe.db.sql('''select * from `tabBulk Email`
where status='Not Sent' and (send_after is null or send_after < %(now)s)
order by priority desc, creation asc
limit 1''', { 'now': now_datetime() }, as_dict=True)

if email:
email = email[0]
else:
break

frappe.db.sql("""update `tabBulk Email` set status='Sending' where name=%s""",
(email["name"],), auto_commit=auto_commit)
send_one(email, smtpserver, auto_commit)

if email.communication:
frappe.get_doc('Communication', email.communication).set_delivery_status(commit=auto_commit)
# NOTE: removing commit here because we pass auto_commit
# finally:
# frappe.db.commit()

def send_one(email, smtpserver=None, auto_commit=True, now=False):
status = frappe.db.sql('''select status from `tabBulk Email` where name=%s for update''', email.name)[0][0]
if status != 'Not Sent':
# rollback to release lock and return
frappe.db.rollback()
return

frappe.db.sql("""update `tabBulk Email` set status='Sending', modified=%s where name=%s""",
(now_datetime(), email["name"]), auto_commit=auto_commit)

if email.communication:
frappe.get_doc('Communication', email.communication).set_delivery_status(commit=auto_commit)

try:
if auto_commit:
if not smtpserver: smtpserver = SMTPServer()
smtpserver.setup_email_account(email.reference_doctype)
smtpserver.sess.sendmail(email["sender"], email["recipient"], encode(email["message"]))

try:
if not from_test:
smtpserver.setup_email_account(email.reference_doctype)
smtpserver.sess.sendmail(email["sender"], email["recipient"], encode(email["message"]))
frappe.db.sql("""update `tabBulk Email` set status='Sent', modified=%s where name=%s""",
(now_datetime(), email["name"]), auto_commit=auto_commit)

frappe.db.sql("""update `tabBulk Email` set status='Sent' where name=%s""",
(email["name"],), auto_commit=auto_commit)
if email.communication:
frappe.get_doc('Communication', email.communication).set_delivery_status(commit=auto_commit)

if email.communication:
frappe.get_doc('Communication', email.communication).set_delivery_status(commit=auto_commit)
except (smtplib.SMTPServerDisconnected,
smtplib.SMTPConnectError,
smtplib.SMTPHeloError,
smtplib.SMTPAuthenticationError):

except (smtplib.SMTPServerDisconnected,
smtplib.SMTPConnectError,
smtplib.SMTPHeloError,
smtplib.SMTPAuthenticationError):
# bad connection, retry later
frappe.db.sql("""update `tabBulk Email` set status='Not Sent', modified=%s where name=%s""",
(now_datetime(), email["name"]), auto_commit=auto_commit)

# bad connection, retry later
frappe.db.sql("""update `tabBulk Email` set status='Not Sent' where name=%s""",
(email["name"],), auto_commit=auto_commit)
if email.communication:
frappe.get_doc('Communication', email.communication).set_delivery_status(commit=auto_commit)

if email.communication:
frappe.get_doc('Communication', email.communication).set_delivery_status(commit=auto_commit)
# no need to attempt further
return

# no need to attempt further
return
except Exception, e:
frappe.db.sql("""update `tabBulk Email` set status='Error', error=%s
where name=%s""", (unicode(e), email["name"]), auto_commit=auto_commit)

except Exception, e:
frappe.db.sql("""update `tabBulk Email` set status='Error', error=%s
where name=%s""", (unicode(e), email["name"]), auto_commit=auto_commit)
if email.communication:
frappe.get_doc('Communication', email.communication).set_delivery_status(commit=auto_commit)

if email.communication:
frappe.get_doc('Communication', email.communication).set_delivery_status(commit=auto_commit)
if now:
raise

# NOTE: removing commit here because we pass auto_commit
# finally:
# frappe.db.commit()
else:
# log to scheduler log
log('frappe.email.bulk.flush', unicode(e))

def clear_outbox():
"""Remove mails older than 31 days in Outbox. Called daily via scheduler."""


+ 276
- 276
frappe/email/doctype/bulk_email/bulk_email.json ファイルの表示

@@ -1,304 +1,304 @@
{
"allow_copy": 0,
"allow_import": 0,
"allow_rename": 0,
"autoname": "hash",
"creation": "2012-08-02 15:17:28",
"custom": 0,
"description": "Bulk Email records.",
"docstatus": 0,
"doctype": "DocType",
"document_type": "System",
"allow_copy": 0,
"allow_import": 0,
"allow_rename": 0,
"autoname": "hash",
"creation": "2012-08-02 15:17:28",
"custom": 0,
"description": "Bulk Email records.",
"docstatus": 0,
"doctype": "DocType",
"document_type": "System",
"fields": [
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "sender",
"fieldtype": "Data",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "Sender",
"length": 0,
"no_copy": 0,
"options": "Email",
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "sender",
"fieldtype": "Data",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "Sender",
"length": 0,
"no_copy": 0,
"options": "Email",
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
},
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "recipient",
"fieldtype": "Data",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 1,
"label": "Recipient",
"length": 0,
"no_copy": 0,
"options": "Email",
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "recipient",
"fieldtype": "Data",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 1,
"label": "Recipient",
"length": 0,
"no_copy": 0,
"options": "Email",
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
},
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "message",
"fieldtype": "Code",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "Message",
"length": 0,
"no_copy": 0,
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "message",
"fieldtype": "Code",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "Message",
"length": 0,
"no_copy": 0,
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
},
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"default": "Not Sent",
"fieldname": "status",
"fieldtype": "Select",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 1,
"label": "Status",
"length": 0,
"no_copy": 0,
"options": "\nNot Sent\nSending\nSent\nError\nExpired",
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"default": "Not Sent",
"fieldname": "status",
"fieldtype": "Select",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 1,
"label": "Status",
"length": 0,
"no_copy": 0,
"options": "\nNot Sent\nSending\nSent\nError\nExpired",
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
},
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "error",
"fieldtype": "Code",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "Error",
"length": 0,
"no_copy": 0,
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "error",
"fieldtype": "Code",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "Error",
"length": 0,
"no_copy": 0,
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
},
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "reference_doctype",
"fieldtype": "Link",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "Reference DocType",
"length": 0,
"no_copy": 0,
"options": "DocType",
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 1,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "reference_doctype",
"fieldtype": "Link",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "Reference DocType",
"length": 0,
"no_copy": 0,
"options": "DocType",
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 1,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
},
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "reference_name",
"fieldtype": "Data",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "Reference DocName",
"length": 0,
"no_copy": 0,
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 1,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "reference_name",
"fieldtype": "Data",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "Reference DocName",
"length": 0,
"no_copy": 0,
"permlevel": 0,
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 1,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
},
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "communication",
"fieldtype": "Link",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "Communication",
"length": 0,
"no_copy": 0,
"options": "Communication",
"permlevel": 0,
"precision": "",
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "communication",
"fieldtype": "Link",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "Communication",
"length": 0,
"no_copy": 0,
"options": "Communication",
"permlevel": 0,
"precision": "",
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 0,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
},
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "send_after",
"fieldtype": "Datetime",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "Send After",
"length": 0,
"no_copy": 1,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 1,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"fieldname": "send_after",
"fieldtype": "Datetime",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "Send After",
"length": 0,
"no_copy": 1,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 1,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
},
},
{
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"default": "1",
"fieldname": "priority",
"fieldtype": "Int",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "Priority",
"length": 0,
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 1,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"allow_on_submit": 0,
"bold": 0,
"collapsible": 0,
"default": "1",
"fieldname": "priority",
"fieldtype": "Int",
"hidden": 0,
"ignore_user_permissions": 0,
"ignore_xss_filter": 0,
"in_filter": 0,
"in_list_view": 0,
"label": "Priority",
"length": 0,
"no_copy": 0,
"permlevel": 0,
"precision": "",
"print_hide": 0,
"print_hide_if_no_value": 0,
"read_only": 1,
"report_hide": 0,
"reqd": 0,
"search_index": 0,
"set_only_once": 0,
"unique": 0
}
],
"hide_heading": 0,
"hide_toolbar": 0,
"icon": "icon-envelope",
"idx": 1,
"in_create": 1,
"in_dialog": 0,
"is_submittable": 0,
"issingle": 0,
"istable": 0,
"max_attachments": 0,
"modified": "2016-04-18 05:13:07.741981",
"modified_by": "Administrator",
"module": "Email",
"name": "Bulk Email",
"owner": "Administrator",
],
"hide_heading": 0,
"hide_toolbar": 0,
"icon": "icon-envelope",
"idx": 1,
"in_create": 1,
"in_dialog": 0,
"is_submittable": 0,
"issingle": 0,
"istable": 0,
"max_attachments": 0,
"modified": "2016-04-18 05:13:07.741982",
"modified_by": "Administrator",
"module": "Email",
"name": "Bulk Email",
"owner": "Administrator",
"permissions": [
{
"amend": 0,
"apply_user_permissions": 0,
"cancel": 0,
"create": 0,
"delete": 1,
"email": 1,
"export": 0,
"if_owner": 0,
"import": 0,
"permlevel": 0,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"set_user_permissions": 0,
"share": 0,
"submit": 0,
"amend": 0,
"apply_user_permissions": 0,
"cancel": 0,
"create": 0,
"delete": 1,
"email": 1,
"export": 0,
"if_owner": 0,
"import": 0,
"permlevel": 0,
"print": 1,
"read": 1,
"report": 1,
"role": "System Manager",
"set_user_permissions": 0,
"share": 0,
"submit": 0,
"write": 0
}
],
"read_only": 0,
"read_only_onload": 0,
],
"read_only": 0,
"read_only_onload": 0,
"sort_order": "DESC"
}
}

+ 4
- 0
frappe/email/doctype/bulk_email/bulk_email.py ファイルの表示

@@ -15,3 +15,7 @@ def retry_sending(name):
if doc and doc.status == "Error":
doc.status = "Not Sent"
doc.save(ignore_permissions=True)

def on_doctype_update():
"""Add index in `tabCommunication` for `(reference_doctype, reference_name)`"""
frappe.db.add_index('Bulk Email', ('status', 'send_after', 'priority', 'creation'), 'index_bulk_flush')

+ 3
- 1
frappe/tasks.py ファイルの表示

@@ -81,7 +81,9 @@ def get_required_queues(app, prefix=''):
ret = []
for site in get_sites():
ret.append('{}{}'.format(prefix, site))
ret.append(app.conf['CELERY_DEFAULT_QUEUE'])
if not prefix:
# default queue only for shortjob workers
ret.append(app.conf['CELERY_DEFAULT_QUEUE'])
return ret

def is_site_in_maintenance_mode(queue, prefix):


読み込み中…
キャンセル
保存