diff --git a/frappe/database.py b/frappe/database.py index 33c3aa8a44..1092636461 100644 --- a/frappe/database.py +++ b/frappe/database.py @@ -14,15 +14,13 @@ import frappe import frappe.defaults import frappe.async import re -import redis import frappe.model.meta from frappe.utils import now, get_datetime, cstr from frappe import _ from six import text_type, binary_type, string_types, integer_types -from frappe.utils.global_search import sync_global_search from frappe.model.utils.link_count import flush_local_link_count from six import iteritems, text_type - +from frappe.utils.background_jobs import execute_job, get_queue class Database: """ @@ -740,20 +738,9 @@ class Database: self.sql("commit") frappe.local.rollback_observers = [] self.flush_realtime_log() - self.enqueue_global_search() + enqueue_jobs_after_commit() flush_local_link_count() - def enqueue_global_search(self): - if frappe.flags.update_global_search: - try: - frappe.enqueue('frappe.utils.global_search.sync_global_search', - now=frappe.flags.in_test or frappe.flags.in_install or frappe.flags.in_migrate, - flags=frappe.flags.update_global_search) - except redis.exceptions.ConnectionError: - sync_global_search() - - frappe.flags.update_global_search = [] - def flush_realtime_log(self): for args in frappe.local.realtime_log: frappe.async.emit_via_redis(*args) @@ -895,3 +882,11 @@ class Database: s = s.replace("%", "%%") return s + +def enqueue_jobs_after_commit(): + if frappe.flags.enqueue_after_commit and len(frappe.flags.enqueue_after_commit) > 0: + for job in frappe.flags.enqueue_after_commit: + q = get_queue(job.get("queue"), async=job.get("async")) + q.enqueue_call(execute_job, timeout=job.get("timeout"), + kwargs=job.get("queue_args")) + frappe.flags.enqueue_after_commit = [] diff --git a/frappe/integrations/doctype/webhook/__init__.py b/frappe/integrations/doctype/webhook/__init__.py index 3c8bf1e2af..b72acc0578 100644 --- a/frappe/integrations/doctype/webhook/__init__.py +++ b/frappe/integrations/doctype/webhook/__init__.py @@ -37,7 +37,8 @@ def run_webhooks(doc, method): def _webhook_request(webhook): if not webhook.name in frappe.flags.webhooks_executed.get(doc.name, []): - frappe.enqueue("frappe.integrations.doctype.webhook.webhook.enqueue_webhook", doc=doc, webhook=webhook) + frappe.enqueue("frappe.integrations.doctype.webhook.webhook.enqueue_webhook", + enqueue_after_commit=True, doc=doc, webhook=webhook) # keep list of webhooks executed for this doc in this request # so that we don't run the same webhook for the same document multiple times diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 02a841336e..c540b5bb52 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -18,7 +18,7 @@ queue_timeout = { } def enqueue(method, queue='default', timeout=300, event=None, - async=True, job_name=None, now=False, **kwargs): + async=True, job_name=None, now=False, enqueue_after_commit=False, **kwargs): ''' Enqueue method to be executed using a background worker @@ -37,17 +37,29 @@ def enqueue(method, queue='default', timeout=300, event=None, q = get_queue(queue, async=async) if not timeout: timeout = queue_timeout.get(queue) or 300 - - return q.enqueue_call(execute_job, timeout=timeout, - kwargs={ - "site": frappe.local.site, - "user": frappe.session.user, - "method": method, - "event": event, - "job_name": job_name or cstr(method), + queue_args = { + "site": frappe.local.site, + "user": frappe.session.user, + "method": method, + "event": event, + "job_name": job_name or cstr(method), + "async": async, + "kwargs": kwargs + } + if enqueue_after_commit: + if not frappe.flags.enqueue_after_commit: + frappe.flags.enqueue_after_commit = [] + + frappe.flags.enqueue_after_commit.append({ + "queue": queue, "async": async, - "kwargs": kwargs + "timeout": timeout, + "queue_args":queue_args }) + return frappe.flags.enqueue_after_commit + else: + return q.enqueue_call(execute_job, timeout=timeout, + kwargs=queue_args) def enqueue_doc(doctype, name=None, method=None, queue='default', timeout=300, now=False, **kwargs): diff --git a/frappe/utils/global_search.py b/frappe/utils/global_search.py index f9a07e3212..84c3308e96 100644 --- a/frappe/utils/global_search.py +++ b/frappe/utils/global_search.py @@ -5,6 +5,7 @@ from __future__ import unicode_literals import frappe import re +import redis from frappe.utils import cint, strip_html_tags from frappe.model.base_document import get_controller from six import text_type @@ -232,6 +233,18 @@ def update_global_search(doc): frappe.flags.update_global_search.append( dict(doctype=doc.doctype, name=doc.name, content=' ||| '.join(content or ''), published=published, title=doc.get_title(), route=doc.get('route'))) + enqueue_global_search() + +def enqueue_global_search(): + if frappe.flags.update_global_search: + try: + frappe.enqueue('frappe.utils.global_search.sync_global_search', + now=frappe.flags.in_test or frappe.flags.in_install or frappe.flags.in_migrate, + flags=frappe.flags.update_global_search, enqueue_after_commit=True) + except redis.exceptions.ConnectionError: + sync_global_search() + + frappe.flags.update_global_search = [] def get_formatted_value(value, field): '''Prepare field from raw data'''