Kaynağa Gözat

fix: RQ job name when func object is passed (#18112) (#18113)

[skip ci]

(cherry picked from commit 4120d5a8e3)

Co-authored-by: Ankush Menat <ankush@frappe.io>
version-14
mergify[bot] 2 yıl önce
committed by GitHub
ebeveyn
işleme
5074afaafa
Veri tabanında bu imza için bilinen anahtar bulunamadı GPG Anahtar Kimliği: 4AEE18F83AFDEB23
2 değiştirilmiş dosya ile 13 ekleme ve 3 silme
  1. +8
    -1
      frappe/core/doctype/rq_job/rq_job.py
  2. +5
    -2
      frappe/core/doctype/rq_job/test_rq_job.py

+ 8
- 1
frappe/core/doctype/rq_job/rq_job.py Dosyayı Görüntüle

@@ -2,6 +2,7 @@
# For license information, please see license.txt

import functools
import re

from rq.command import send_stop_job_command
from rq.job import Job
@@ -113,12 +114,18 @@ class RQJob(Document):

def serialize_job(job: Job) -> frappe._dict:
modified = job.last_heartbeat or job.ended_at or job.started_at or job.created_at
job_name = job.kwargs.get("kwargs", {}).get("job_type") or str(job.kwargs.get("job_name"))

# function objects have this repr: '<function functionname at 0xmemory_address >'
# This regex just removes unnecessary things around it.
if matches := re.match(r"<function (?P<func_name>.*) at 0x.*>", job_name):
job_name = matches.group("func_name")

return frappe._dict(
name=job.id,
job_id=job.id,
queue=job.origin.rsplit(":", 1)[1],
job_name=job.kwargs.get("kwargs", {}).get("job_type") or str(job.kwargs.get("job_name")),
job_name=job_name,
status=job.get_status(),
started_at=convert_utc_to_user_timezone(job.started_at) if job.started_at else "",
ended_at=convert_utc_to_user_timezone(job.ended_at) if job.ended_at else "",


+ 5
- 2
frappe/core/doctype/rq_job/test_rq_job.py Dosyayı Görüntüle

@@ -29,11 +29,9 @@ class TestRQJob(FrappeTestCase):
def test_serialization(self):

job = frappe.enqueue(method=self.BG_JOB, queue="short")

rq_job = frappe.get_doc("RQ Job", job.id)

self.assertEqual(job, rq_job.job)

self.assertDocumentEqual(
{
"name": job.id,
@@ -46,6 +44,11 @@ class TestRQJob(FrappeTestCase):
)
self.check_status(job, "finished")

def test_func_obj_serialization(self):
job = frappe.enqueue(method=test_func, queue="short")
rq_job = frappe.get_doc("RQ Job", job.id)
self.assertEqual(rq_job.job_name, "test_func")

def test_get_list_filtering(self):

# Check failed job clearning and filtering


Yükleniyor…
İptal
Kaydet