Bläddra i källkod

test: Execute enabled webhook trigger

version-14
Hussain Nagaria 4 år sedan
förälder
incheckning
2133777c44
2 ändrade filer med 66 tillägg och 1 borttagningar
  1. +1
    -1
      frappe/integrations/doctype/webhook/__init__.py
  2. +65
    -0
      frappe/integrations/doctype/webhook/test_webhook.py

+ 1
- 1
frappe/integrations/doctype/webhook/__init__.py Visa fil

@@ -22,7 +22,7 @@ def run_webhooks(doc, method):
# query webhooks
webhooks_list = frappe.get_all('Webhook',
fields=["name", "`condition`", "webhook_docevent", "webhook_doctype"],
filters={"enabled": "1"}
filters={"enabled": True}
)

# make webhooks map for cache


+ 65
- 0
frappe/integrations/doctype/webhook/test_webhook.py Visa fil

@@ -10,6 +10,44 @@ from frappe.integrations.doctype.webhook.webhook import get_webhook_headers, get


class TestWebhook(unittest.TestCase):
@classmethod
def setUpClass(cls):
# delete any existing webhooks
frappe.db.sql("DELETE FROM tabWebhook")
# create test webhooks
cls.create_sample_webhooks()

@classmethod
def create_sample_webhooks(cls):
samples_webhooks_data = [
{
"webhook_doctype": "User",
"webhook_docevent": "after_insert",
"request_url": "https://httpbin.org/post",
"condition": "doc.email",
"enabled": True
},
{
"webhook_doctype": "User",
"webhook_docevent": "after_insert",
"request_url": "https://httpbin.org/post",
"condition": "doc.first_name",
"enabled": False
}
]

cls.sample_webhooks = []
for wh_fields in samples_webhooks_data:
wh = frappe.new_doc("Webhook")
wh.update(wh_fields)
wh.insert()
cls.sample_webhooks.append(wh)

@classmethod
def tearDownClass(cls):
# delete any existing webhooks
frappe.db.sql("DELETE FROM tabWebhook")

def setUp(self):
# retrieve or create a User webhook for `after_insert`
webhook_fields = {
@@ -30,10 +68,37 @@ class TestWebhook(unittest.TestCase):
self.user.email = frappe.mock("email")
self.user.save()

# Create another test user specific to this test
self.test_user = frappe.new_doc("User")
self.test_user.email = "user1@integration.webhooks.test.com"
self.test_user.first_name = "user1"

def tearDown(self) -> None:
self.user.delete()
self.test_user.delete()
super().tearDown()

def test_webhook_trigger_with_enabled_webhooks(self):
"""Test webhook trigger for enabled webhooks"""

frappe.cache().delete_value('webhooks')
frappe.flags.webhooks = None

# Insert the user to db
self.test_user.insert()
self.assertTrue("User" in frappe.flags.webhooks)
# only 1 hook (enabled) must be queued
self.assertEqual(
len(frappe.flags.webhooks.get("User")),
1
)
self.assertTrue(self.test_user.email in frappe.flags.webhooks_executed)
self.assertEqual(
frappe.flags.webhooks_executed.get(self.test_user.email)[0],
self.sample_webhooks[0].name
)

def test_validate_doc_events(self):
"Test creating a submit-related webhook for a non-submittable DocType"



Laddar…
Avbryt
Spara