首页 > 解决方案 > SQL Alchemy 查询 Oracle 日期时间过滤器

问题描述

我正在 SQLAlchemy 中构建一个从 Oracle 数据库中获取的查询。我正在尝试按通过 api 传递的日期进行过滤。

通过 API url 查询示例:

?fromDate=2020-06-01T00:00:00Z&toDate=2020-06-30T00:00:00Z

这些日期被传递给我的数据库查询函数

def get_events(env=None, from_date=None, to_date=None, is_active="Y"):
query = db.session.query(EventModel)
inactive_count = query.filter(EventModel.generic_event.any(GenericEventModel.ACTIVE_YN == "N")).count()

if from_date:
    query = query.filter(
        EventModel.generic_event.any(
            GenericEventModel.START_DT >= datetime.datetime.strptime(from_date, DATE_TIME_FORMAT)
        )
    )
if to_date:
    query = query.filter(
        EventModel.generic_event.any(
            GenericEventModel.END_DT <= datetime.datetime.strptime(to_date, DATE_TIME_FORMAT)
        )
    )
if is_active:
    query = query.filter(EventModel.generic_event.any(GenericEventModel.ACTIVE_YN == is_active))
if env:
    query = query.filter(EventModel.generic_event.any(GenericEventModel.ENV == env))

if not env and not from_date and not to_date and not is_active:
    query = query.all()

return {
    'events': query,
    'inactive_count': inactive_count
}

一切似乎都正常工作,直到我开始看到不在范围内的日期。

使用这些日期,出于某种奇怪的原因,我得到了日期为 3 月的事件。

它使用的 DATE_TIME_FORMAT 也是一个常量

DATE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ'

这是它返回的日期之一(在格式化为我的前端期望的正确日期时间字符串之后:

2020-03-03T05:00:00Z

我不知道为什么,查看日期,他们都没有在我的搜索范围内有关联的日期。我做错什么了吗?

这是表模型

事件模型

__tablename__ = 'RC_CAL'

RC_ID = db.Column(db.Integer, primary_key=True)
PRODUCT_ID = db.Column(db.Integer)
RELEASE_ID = db.Column(db.Integer)
RCPM_ID = db.Column(db.Integer)
FIT_DTTM = db.Column(db.String(25))
DATACTR_ID = db.Column(db.String(10))
COMMENT_TXT = db.Column(db.String(500))
DESCR_TXT = db.Column(db.String(300))
EVENTCMPLT_FLG = db.Column(db.String(1))
EVENTCMPLTONTM_FLG = db.Column(db.String(1))
URLS = db.Column(db.String(4000))
CREATE_DT = db.Column(db.DateTime)
MODIFIED_DT = db.Column(db.DateTime)
generic_event = db.relationship(
    "GenericEventModel",
    back_populates='rc_event',
    lazy='joined',
    cascade="all, delete, delete-orphan"
)
history_list = db.relationship(
    "HistoryModel",
    back_populates='rc_event',
    lazy='joined',
    cascade="all, delete, delete-orphan"
)

通用事件模型

__tablename__ = 'CALENDAR_EVENT'

EVENT_ID = db.Column(db.Integer, primary_key=True)
NAME = db.Column(db.String(300))
START_DT = db.Column(db.DateTime)
ENV = db.Column(db.String(12))
END_DT = db.Column(db.DateTime)
PARENT_ID = db.Column(db.Integer)
EVENT_TYPE_ID = db.Column(db.Integer)
TYPE = db.Column(db.String(20))
ACTIVE_YN = db.Column(db.String(1))
MODIFIED_DT = db.Column(db.DateTime)
RC_ID = db.Column(db.Integer, db.ForeignKey('RC_CAL.RC_ID'))
rc_event = db.relationship('EventModel')

谢谢您的帮助。

使用成功但较慢的连接查询进行编辑:

def get_events(env=None, from_date=None, to_date=None, is_active="Y"):
  query = db.session.query(EventModel)
  joined_query = query.join(GenericEventModel)
  inactive_count = query.filter(EventModel.generic_event.any(GenericEventModel.ACTIVE_YN == "N")).count()

  filters = []

  if from_date:
      converted_start = from_date.split("T", 1)
      filters.append(GenericEventModel.START_DT >= datetime.datetime.strptime(converted_start[0], "%Y-%m-%d"))
  if to_date:
      converted_end = to_date.split("T", 1)
      filters.append(GenericEventModel.START_DT <= datetime.datetime.strptime(converted_end[0], "%Y-%m-%d"))
  if is_active:
      filters.append(GenericEventModel.ACTIVE_YN == is_active)
  if env:
      filters.append(GenericEventModel.ENV == env)

  if not env and not from_date and not to_date and not is_active:
      query = query.all()
  else:
      query = joined_query.filter(*filters)

  return {
      'events': query,
      'inactive_count': inactive_count
  }

这产生的SQL:

SELECT
 "RC_CAL"."RC_ID" AS "RC_CAL_RC_ID",
 "RC_CAL"."PRODUCT_ID" AS "RC_CAL_PRODUCT_ID",
 "RC_CAL"."RELEASE_ID" AS "RC_CAL_RELEASE_ID",
 "RC_CAL"."RCPM_ID" AS "RC_CAL_RCPM_ID",
 "RC_CAL"."FIT_DTTM" AS "RC_CAL_FIT_DTTM",
 "RC_CAL"."DATACTR_ID" AS "RC_CAL_DATACTR_ID",
 "RC_CAL"."COMMENT_TXT" AS "RC_CAL_COMMENT_TXT",
 "RC_CAL"."DESCR_TXT" AS "RC_CAL_DESCR_TXT",
 "RC_CAL"."EVENTCMPLT_FLG" AS "RC_CAL_EVENTCMPLT_FLG",
 "RC_CAL"."EVE NTCMPLTONTM_FLG" AS "RC_CAL_EVENTCMPLTONTM_FL_1",
 "RC_CAL"."URLS" AS "RC_CAL_URLS",
 "RC_CAL"."CREATE_DT" AS "RC_CAL_CREATE_DT",
 "RC_CAL"."MODIFIED_DT" AS "RC_CAL_MODIFIED_DT",
 "CALENDAR_EVENT_1"."EVENT_ID" AS "CALENDAR_EVENT_1_EVENT_I_2",
 "CALENDAR_EVENT_1"."NAME" AS "CALENDAR_EVENT_1_NAME",
 "CALENDAR_EVENT_1"."START_DT" AS "CALENDAR_EVENT_1_START_D_3",
 "CALENDAR_EVENT_1"."ENV" AS "CALENDAR_EVENT_1_ENV",
 "CALENDAR_EVEN T_1"."END_DT" AS "CALENDAR_EVENT_1_END_DT",
 "CALENDAR_EVENT_1"."PARENT_ID" AS "CALENDAR_EVENT_1_PARENT__4",
 "CALENDAR_EVENT_1"."EVENT_TYPE_ID" AS "CALENDAR_EVENT_1_EVENT_T_5",
 "CALENDAR_EVENT_1"."TYPE" AS "CALENDAR_EVENT_1_TYPE",
 "CALENDAR_EVENT_1"."ACTIVE_YN" AS "CALENDAR_EVENT_1_ACTIVE__6",
 "CALENDAR_EVENT_1"."MODIFIED_DT" AS "CALENDAR_EVENT_1_MODIFIE_7",
 "CALENDAR_EVENT_1"."RC_ID" AS "CALENDAR_EVENT_1_RC_ID",
 "RC_HI STORY_1"."RC_HISTORY_ID" AS "RC_HISTORY_1_RC_HISTORY__8",
 "RC_HISTORY_1"."RC_HISTORY_DATA" AS "RC_HISTORY_1_RC_HISTORY__9",
 "RC_HISTORY_1"."RC_HISTORY_TYPE" AS "RC_HISTORY_1_RC_HISTORY__a",
 "RC_HISTORY_1"."CREATE_DT" AS "RC_HISTORY_1_CREATE_DT",
 "RC_HISTORY_1"."RC_ID" AS "RC_HISTORY_1_RC_ID" 
FROM
 "RC_CAL" 
JOIN
  "CALENDAR_EVENT" 
  ON "RC_CAL"."RC_ID" = "CALENDAR_EVENT"."RC_ID" 
LEFT OUTER JOIN
  "CALENDAR_EVENT" "CALENDAR_EVENT_1" 
  ON "RC_CAL"."RC_ID" = "CALENDAR_EVENT_1"."RC_ID" 
LEFT OUTER JOIN
  "RC_HISTORY" "RC_HISTORY_1" 
  ON "RC_CAL"."RC_ID" = "RC_HISTORY_1"."RC_ID" 
WHERE
 "CALENDAR_EVENT"."START_DT" >= :START_DT_1 
 AND "CALENDAR_EVENT"."START_DT" <= :START_DT_2 
 AND "CALENDAR_EVENT"."ACTIVE_YN" = :ACTIVE_YN_1

标签: pythonpython-3.xoraclesqlalchemyflask-sqlalchemy

解决方案


推荐阅读