Quellcode durchsuchen

feat: MySQL TIMESTAMP functionality for QB

version-14
Ankush Menat vor 3 Jahren
committed by Ankush Menat
Ursprung
Commit
1a0fb21645
2 geänderte Dateien mit 66 neuen und 3 gelöschten Zeilen
  1. +20
    -2
      frappe/query_builder/functions.py
  2. +46
    -1
      frappe/tests/test_query_builder.py

+ 20
- 2
frappe/query_builder/functions.py Datei anzeigen

@@ -1,5 +1,5 @@
from pypika.functions import *
from pypika.terms import Function
from pypika.terms import Function, CustomFunction, ArithmeticExpression, Arithmetic
from frappe.query_builder.utils import ImportMapper, db_type_is
from frappe.query_builder.custom import GROUP_CONCAT, STRING_AGG, MATCH, TO_TSVECTOR
from frappe.database.query import Query
@@ -25,6 +25,24 @@ Match = ImportMapper(
}
)

class _PostgresTimestamp(ArithmeticExpression):
def __init__(self, datepart, timepart, alias=None):
if isinstance(datepart, str):
datepart = Cast(datepart, "date")
if isinstance(timepart, str):
timepart = Cast(timepart, "time")

super().__init__(operator=Arithmetic.add,
left=datepart, right=timepart, alias=alias)


CombineDatetime = ImportMapper(
{
db_type_is.MARIADB: CustomFunction("TIMESTAMP", ["date", "time"]),
db_type_is.POSTGRES: _PostgresTimestamp,
}
)


def _aggregate(function, dt, fieldname, filters, **kwargs):
return (
@@ -46,4 +64,4 @@ def _avg(dt, fieldname, filters=None, **kwargs):
return _aggregate(Avg, dt, fieldname, filters, **kwargs)

def _sum(dt, fieldname, filters=None, **kwargs):
return _aggregate(Sum, dt, fieldname, filters, **kwargs)
return _aggregate(Sum, dt, fieldname, filters, **kwargs)

+ 46
- 1
frappe/tests/test_query_builder.py Datei anzeigen

@@ -3,7 +3,7 @@ from typing import Callable

import frappe
from frappe.query_builder.custom import ConstantColumn
from frappe.query_builder.functions import Coalesce, GroupConcat, Match
from frappe.query_builder.functions import Coalesce, GroupConcat, Match, CombineDatetime
from frappe.query_builder.utils import db_type_is
from frappe.query_builder import Case

@@ -32,6 +32,27 @@ class TestCustomFunctionsMariaDB(unittest.TestCase):
query.get_sql(), "SELECT `name`,'John' `User` FROM `tabDocType`"
)

def test_timestamp(self):
note = frappe.qb.DocType("Note")
self.assertEqual("TIMESTAMP(posting_date,posting_time)", CombineDatetime(note.posting_date, note.posting_time).get_sql())
self.assertEqual("TIMESTAMP('2021-01-01','00:00:21')", CombineDatetime("2021-01-01", "00:00:21").get_sql())

todo = frappe.qb.DocType("ToDo")
select_query = (frappe.qb
.from_(note)
.join(todo).on(todo.refernce_name == note.name)
.select(CombineDatetime(note.posting_date, note.posting_time)))
self.assertIn("select timestamp(`tabnote`.`posting_date`,`tabnote`.`posting_time`)", str(select_query).lower())

select_query = select_query.orderby(CombineDatetime(note.posting_date, note.posting_time))
self.assertIn("order by timestamp(`tabnote`.`posting_date`,`tabnote`.`posting_time`)", str(select_query).lower())

select_query = select_query.where(CombineDatetime(note.posting_date, note.posting_time) >= CombineDatetime("2021-01-01", "00:00:01"))
self.assertIn("timestamp(`tabnote`.`posting_date`,`tabnote`.`posting_time`)>=timestamp('2021-01-01','00:00:01')", str(select_query).lower())

select_query = select_query.select(CombineDatetime(note.posting_date, note.posting_time, alias="timestamp"))
self.assertIn("timestamp(`tabnote`.`posting_date`,`tabnote`.`posting_time`) `timestamp`", str(select_query).lower())


@run_only_if(db_type_is.POSTGRES)
class TestCustomFunctionsPostgres(unittest.TestCase):
@@ -52,6 +73,30 @@ class TestCustomFunctionsPostgres(unittest.TestCase):
query.get_sql(), 'SELECT "name",\'John\' "User" FROM "tabDocType"'
)

def test_timestamp(self):
note = frappe.qb.DocType("Note")
self.assertEqual("posting_date+posting_time", CombineDatetime(note.posting_date, note.posting_time).get_sql())
self.assertEqual("CAST('2021-01-01' AS DATE)+CAST('00:00:21' AS TIME)", CombineDatetime("2021-01-01", "00:00:21").get_sql())

todo = frappe.qb.DocType("ToDo")
select_query = (frappe.qb
.from_(note)
.join(todo).on(todo.refernce_name == note.name)
.select(CombineDatetime(note.posting_date, note.posting_time)))
self.assertIn('select "tabnote"."posting_date"+"tabnote"."posting_time"', str(select_query).lower())

select_query = select_query.orderby(CombineDatetime(note.posting_date, note.posting_time))
self.assertIn('order by "tabnote"."posting_date"+"tabnote"."posting_time"', str(select_query).lower())

select_query = select_query.where(
CombineDatetime(note.posting_date, note.posting_time) >= CombineDatetime('2021-01-01', '00:00:01')
)
self.assertIn("""where "tabnote"."posting_date"+"tabnote"."posting_time">=cast('2021-01-01' as date)+cast('00:00:01' as time)""",
str(select_query).lower())

select_query = select_query.select(CombineDatetime(note.posting_date, note.posting_time, alias="timestamp"))
self.assertIn('"tabnote"."posting_date"+"tabnote"."posting_time" "timestamp"', str(select_query).lower())


class TestBuilderBase(object):
def test_adding_tabs(self):


Laden…
Abbrechen
Speichern