Bläddra i källkod

enqueue_after_commit option added to frappe.enqueue (#4167)

* enqueue_after_commit option added to frappe.enqueue

* Fixes

Codacy https://www.codacy.com/app/frappe/frappe/pullRequest?prid=934733
clear frappe.flags.enqueue_after_commit after executing enqueued jobs

* sync_global_search() uses enqueue_after_commit=True

* Update database.py

* webhooks enqueue after commit

* Update __init__.py
version-14
Revant Nandgaonkar 7 år sedan
committed by Rushabh Mehta
förälder
incheckning
4df6135c3c
4 ändrade filer med 47 tillägg och 26 borttagningar
  1. +10
    -15
      frappe/database.py
  2. +2
    -1
      frappe/integrations/doctype/webhook/__init__.py
  3. +22
    -10
      frappe/utils/background_jobs.py
  4. +13
    -0
      frappe/utils/global_search.py

+ 10
- 15
frappe/database.py Visa fil

@@ -14,15 +14,13 @@ import frappe
import frappe.defaults import frappe.defaults
import frappe.async import frappe.async
import re import re
import redis
import frappe.model.meta import frappe.model.meta
from frappe.utils import now, get_datetime, cstr from frappe.utils import now, get_datetime, cstr
from frappe import _ from frappe import _
from six import text_type, binary_type, string_types, integer_types from six import text_type, binary_type, string_types, integer_types
from frappe.utils.global_search import sync_global_search
from frappe.model.utils.link_count import flush_local_link_count from frappe.model.utils.link_count import flush_local_link_count
from six import iteritems, text_type from six import iteritems, text_type
from frappe.utils.background_jobs import execute_job, get_queue


class Database: class Database:
""" """
@@ -740,20 +738,9 @@ class Database:
self.sql("commit") self.sql("commit")
frappe.local.rollback_observers = [] frappe.local.rollback_observers = []
self.flush_realtime_log() self.flush_realtime_log()
self.enqueue_global_search()
enqueue_jobs_after_commit()
flush_local_link_count() flush_local_link_count()


def enqueue_global_search(self):
if frappe.flags.update_global_search:
try:
frappe.enqueue('frappe.utils.global_search.sync_global_search',
now=frappe.flags.in_test or frappe.flags.in_install or frappe.flags.in_migrate,
flags=frappe.flags.update_global_search)
except redis.exceptions.ConnectionError:
sync_global_search()

frappe.flags.update_global_search = []

def flush_realtime_log(self): def flush_realtime_log(self):
for args in frappe.local.realtime_log: for args in frappe.local.realtime_log:
frappe.async.emit_via_redis(*args) frappe.async.emit_via_redis(*args)
@@ -895,3 +882,11 @@ class Database:
s = s.replace("%", "%%") s = s.replace("%", "%%")


return s return s

def enqueue_jobs_after_commit():
if frappe.flags.enqueue_after_commit and len(frappe.flags.enqueue_after_commit) > 0:
for job in frappe.flags.enqueue_after_commit:
q = get_queue(job.get("queue"), async=job.get("async"))
q.enqueue_call(execute_job, timeout=job.get("timeout"),
kwargs=job.get("queue_args"))
frappe.flags.enqueue_after_commit = []

+ 2
- 1
frappe/integrations/doctype/webhook/__init__.py Visa fil

@@ -37,7 +37,8 @@ def run_webhooks(doc, method):


def _webhook_request(webhook): def _webhook_request(webhook):
if not webhook.name in frappe.flags.webhooks_executed.get(doc.name, []): if not webhook.name in frappe.flags.webhooks_executed.get(doc.name, []):
frappe.enqueue("frappe.integrations.doctype.webhook.webhook.enqueue_webhook", doc=doc, webhook=webhook)
frappe.enqueue("frappe.integrations.doctype.webhook.webhook.enqueue_webhook",
enqueue_after_commit=True, doc=doc, webhook=webhook)


# keep list of webhooks executed for this doc in this request # keep list of webhooks executed for this doc in this request
# so that we don't run the same webhook for the same document multiple times # so that we don't run the same webhook for the same document multiple times


+ 22
- 10
frappe/utils/background_jobs.py Visa fil

@@ -18,7 +18,7 @@ queue_timeout = {
} }


def enqueue(method, queue='default', timeout=300, event=None, def enqueue(method, queue='default', timeout=300, event=None,
async=True, job_name=None, now=False, **kwargs):
async=True, job_name=None, now=False, enqueue_after_commit=False, **kwargs):
''' '''
Enqueue method to be executed using a background worker Enqueue method to be executed using a background worker


@@ -37,17 +37,29 @@ def enqueue(method, queue='default', timeout=300, event=None,
q = get_queue(queue, async=async) q = get_queue(queue, async=async)
if not timeout: if not timeout:
timeout = queue_timeout.get(queue) or 300 timeout = queue_timeout.get(queue) or 300

return q.enqueue_call(execute_job, timeout=timeout,
kwargs={
"site": frappe.local.site,
"user": frappe.session.user,
"method": method,
"event": event,
"job_name": job_name or cstr(method),
queue_args = {
"site": frappe.local.site,
"user": frappe.session.user,
"method": method,
"event": event,
"job_name": job_name or cstr(method),
"async": async,
"kwargs": kwargs
}
if enqueue_after_commit:
if not frappe.flags.enqueue_after_commit:
frappe.flags.enqueue_after_commit = []

frappe.flags.enqueue_after_commit.append({
"queue": queue,
"async": async, "async": async,
"kwargs": kwargs
"timeout": timeout,
"queue_args":queue_args
}) })
return frappe.flags.enqueue_after_commit
else:
return q.enqueue_call(execute_job, timeout=timeout,
kwargs=queue_args)


def enqueue_doc(doctype, name=None, method=None, queue='default', timeout=300, def enqueue_doc(doctype, name=None, method=None, queue='default', timeout=300,
now=False, **kwargs): now=False, **kwargs):


+ 13
- 0
frappe/utils/global_search.py Visa fil

@@ -5,6 +5,7 @@ from __future__ import unicode_literals


import frappe import frappe
import re import re
import redis
from frappe.utils import cint, strip_html_tags from frappe.utils import cint, strip_html_tags
from frappe.model.base_document import get_controller from frappe.model.base_document import get_controller
from six import text_type from six import text_type
@@ -232,6 +233,18 @@ def update_global_search(doc):
frappe.flags.update_global_search.append( frappe.flags.update_global_search.append(
dict(doctype=doc.doctype, name=doc.name, content=' ||| '.join(content or ''), dict(doctype=doc.doctype, name=doc.name, content=' ||| '.join(content or ''),
published=published, title=doc.get_title(), route=doc.get('route'))) published=published, title=doc.get_title(), route=doc.get('route')))
enqueue_global_search()

def enqueue_global_search():
if frappe.flags.update_global_search:
try:
frappe.enqueue('frappe.utils.global_search.sync_global_search',
now=frappe.flags.in_test or frappe.flags.in_install or frappe.flags.in_migrate,
flags=frappe.flags.update_global_search, enqueue_after_commit=True)
except redis.exceptions.ConnectionError:
sync_global_search()

frappe.flags.update_global_search = []


def get_formatted_value(value, field): def get_formatted_value(value, field):
'''Prepare field from raw data''' '''Prepare field from raw data'''


Laddar…
Avbryt
Spara