From 561b2490c42376a48fc4be6588f6957b912ee43a Mon Sep 17 00:00:00 2001 From: Rushabh Mehta Date: Mon, 1 Aug 2016 19:40:28 +0530 Subject: [PATCH] [minor] email queue system more optimized --- frappe/email/doctype/newsletter/newsletter.py | 2 +- frappe/email/queue.py | 34 ++++++++++++------- frappe/utils/redis_wrapper.py | 6 ++++ 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/frappe/email/doctype/newsletter/newsletter.py b/frappe/email/doctype/newsletter/newsletter.py index 9dcc5f88d2..f8973aeb48 100755 --- a/frappe/email/doctype/newsletter/newsletter.py +++ b/frappe/email/doctype/newsletter/newsletter.py @@ -37,7 +37,7 @@ class Newsletter(Document): self.validate_send() # using default queue with a longer timeout as this isn't a scheduled task - enqueue(send_newsletter, queue='default', timeout=1500, event='send_newsletter', newsletter=self.name) + enqueue(send_newsletter, queue='default', timeout=3000, event='send_newsletter', newsletter=self.name) else: self.queue_all() diff --git a/frappe/email/queue.py b/frappe/email/queue.py index e4ca335098..96f4416424 100755 --- a/frappe/email/queue.py +++ b/frappe/email/queue.py @@ -135,7 +135,7 @@ def add(email, sender, subject, formatted, text_content=None, e.reference_name = reference_name e.communication = communication e.send_after = send_after - e.insert(ignore_permissions=True) + e.db_insert() def check_email_limit(recipients): # if using settings from site_config.json, check email limit @@ -151,6 +151,9 @@ def check_email_limit(recipients): monthly_email_limit = frappe.conf.get('limits', {}).get('emails') or 500 + if frappe.flags.in_test: + monthly_email_limit = 500 + if (this_month + len(recipients)) > monthly_email_limit: throw(_("Cannot send this email. You have crossed the sending limit of {0} emails for this month.").format(monthly_email_limit), EmailLimitCrossedError) @@ -256,16 +259,12 @@ def flush(from_test=False): smtpserver = SMTPServer() + make_cache_queue() + for i in xrange(500): - # don't use for update here, as it leads deadlocks - email = frappe.db.sql('''select * from `tabEmail Queue` - where status='Not Sent' and (send_after is null or send_after < %(now)s) - order by priority desc, creation asc - limit 1''', { 'now': now_datetime() }, as_dict=True) - - if email: - email = email[0] - else: + email = frappe.cache().lpop('cache_email_queue') + + if not email: break send_one(email, smtpserver, auto_commit) @@ -273,12 +272,23 @@ def flush(from_test=False): # NOTE: removing commit here because we pass auto_commit # finally: # frappe.db.commit() +def make_cache_queue(): + '''cache values in queue before sendign''' + cache = frappe.cache() + cache.delete_value('cache_email_queue') + + for l in frappe.db.sql('''select name from `tabEmail Queue` + where status='Not Sent' and (send_after is null or send_after < %(now)s) + order by priority desc, creation asc + limit 500''', { 'now': now_datetime() }): + cache.lpush('cache_email_queue', l[0]) def send_one(email, smtpserver=None, auto_commit=True, now=False): '''Send Email Queue with given smtpserver''' - status = frappe.db.sql('''select status from `tabEmail Queue` where name=%s for update''', email.name)[0][0] - if status != 'Not Sent': + email = frappe.db.sql('''select name, status, communication, message, sender, recipient + from `tabEmail Queue` where name=%s for update''', email, as_dict=True)[0] + if email.status != 'Not Sent': # rollback to release lock and return frappe.db.rollback() return diff --git a/frappe/utils/redis_wrapper.py b/frappe/utils/redis_wrapper.py index fadf3f70d5..1a7e54c43f 100644 --- a/frappe/utils/redis_wrapper.py +++ b/frappe/utils/redis_wrapper.py @@ -117,6 +117,12 @@ class RedisWrapper(redis.Redis): if key in frappe.local.cache: del frappe.local.cache[key] + def lpush(self, key, value): + super(redis.Redis, self).lpush(self.make_key(key), value) + + def lpop(self, key): + return super(redis.Redis, self).lpop(self.make_key(key)) + def hset(self, name, key, value): if not name in frappe.local.cache: frappe.local.cache[name] = {}