首页 > 解决方案 > 时间序列数据的 sqlalchemy 查询,格式为 (step, next_step) 时间相邻样本对

问题描述

我有一些时间序列数据,其中有时间序列集,Timeseries其中每个实例与实例具有一对多的关系Point。下面是数据的简化表示。

表.py:

class Timeseries(Base):
    __tablename__ = "timeseries"

    id = Column("id", Integer, primary_key=True)
    points = relationship("Point", back_populates="ts")


class Point(Base):
    __tablename__ = "point"

    id = Column("id", Integer, primary_key=True)
    t = Column("t", Float)
    v = Column("v", Float)
    ts_id = Column(Integer, ForeignKey("timeseries.id"))
    ts = relationship("Timeseries", back_populates="points")

问题:我正在尝试使用以下类型的列进行查询:“timeseries_id”、“id”、“t”、“v”、“id_next”、“t_next”、“v_next”。也就是说,我希望能够按时间顺序查看时间序列中每个点的数据以及时间序列中的下一个点数据,但是我一直在努力获取一个不包含隐式连接的元素的表?(编辑:重要的一点是我希望能够在 sqlalchemy 中使用 100% 的查询和子查询对象来获取这个列表,因为我需要在进一步的连接、过滤器等中使用这个查询的表。)这是什么的基本开始我得到了,(请注意,我没有运行此代码,因为这是我实际数据库的简化版本,但这是相同的想法):

# The point data actually in the database.
sq = (session.query(
    Timeseries.id.label("timeseries_id"),
    Point.id,
    Point.t,
    Point.v)
.select_from(
    join(Timeseries, Point, Timeseries.id==Point.ts_id))
.group_by('timeseries_id')
.subquery())

# first point manually added to each list in query
sq_first = (session.query(
    Timeseries.id.label("timeseries_id"),
    sa.literal_column("-1", Integer).label("id"), # Some unused Point.id value
    sa.literal_column(-math.inf, Float).label("t"),
    sa.literal_column(-math.inf, Float).label("v"))
.select_from(
    join(Timeseries, Point, Timeseries.id==Point.ts_id))
.subquery())

# last point manually added to each list in query.
sq_last = (session.query(
    Timeseries.id.label("timeseries_id"),
    sa.literal_column("-2", Integer).label("id"), # Another unused Point.id value
    sa.literal_column(math.inf, Float).label("t"),
    sa.literal_column(math.inf, Float).label("v"))
.select_from(
    join(Timeseries, Point, Timeseries.id==Point.ts_id))
.subquery())

# Append each timeseries in `sq` table with last point
sq_points_curr = session.query(sa.union_all(sq_first, sq)).subquery()
sq_points_next = session.query(sa.union_all(sq, sq_last)).subquery()

假设我到目前为止所做的事情是有用的,这就是我卡住的部分:

#I guess rename the columns in `sq_points_next` to append them by "_next"....
sq_points_next = (session.query(
    sq_points_curr.c.timeseries_id
    sq_points_curr.c.id.label("id_next"),
    sq_points_curr.c.t.label("t_next"),
    sq_points_curr.c.v.label("v_next"))
.subquery())

# ... and then perform a join along "timeseries_id" somehow to get the table I originally wanted...
sq_point_pairs = (session.query(
    Timeseries.id.label("timeseries_id")
    "id",
    "t",
    "v",
    "id_next",
    "t_next",
    "v_next"
).select_from(
    sq_points, sq_points_next, sq_points.timeseries_id==sq_points_next.timeseries_id)
)

我什至不确定最后一个是否会在此时编译,因为它再次从真实代码改编/简化,但它不会产生相邻时间点的表等。

编辑(2019 年 8 月 10 日)

下面来自 Nathan 的简化查询肯定是接近工作的正确方法,但会引发 sqlite 错误。

sq = session.query(
        Timeseries.id.label("timeseries_id"),
        Point.t.label("point_t"),
        func.lead(Point.t).over().label('point_after_t')
    ).select_from(
        join(Timeseries, Point, Timeseries.id == Point.ts_id)
    ).order_by(Timeseries.id)

print(sq.all())

标签: pythonsqlsqlalchemytime-series

解决方案


假设您可以获得足够新版本的 sqlite3 python 模块(例如,通过使用 Anaconda),您可以使用LEAD窗口函数来实现您的目标。为了LEAD在进一步的查询中使用该函数的结果,您还需要使用 CTE。以下方法适用于您提供的架构:

sq = session.query(
        Timeseries.id.label("timeseries_id"),
        Point.id.label("point_id"),
        Point.t.label("point_t"),
        Point.v.label("point_v"),
        func.lead(Point.id).over().label('point_after_id'),
        func.lead(Point.v).over().label('point_after_v'),
        func.lead(Point.t).over().label('point_after_t')).select_from(
            join(Timeseries, Point, Timeseries.id == Point.ts_id)).order_by(Timeseries.id)

with_after = sq.cte()
session.execute(with_after.select().where(
        with_after.c.point_v < with_after.c.point_after_v)).fetchall()

推荐阅读