首页 > 解决方案 > 使用水印从时间戳和 id 在 pyspark 中执行批量连接

问题描述

我有两个数据框,其中包含设备和设备周边信息。设备数据框中的一行具有以下架构:

on_equipment_df.head()

# Output of notebook:
Row(
on_eq_equipid='1', 
on_eq_timestamp=datetime.datetime(2020, 10, 7, 15, 27, 42, 866098), 
on_eq_charge=3.917107463423725, on_eq_electric_consumption=102.02754516792204, on_eq_force=10.551710736897613, on_eq_humidity=22.663245200558457, on_eq_pressure=10.813417893943944, on_eq_temperature=29.80448721128125, on_eq_vibration=4.376662536641158, 
measurement_status='Bad')

并且从设备周围一排看起来是这样的:

equipment_surrounding_df.head()

# Output of notebook
Row(
eq_surrounding_equipid='1', 
eq_surrounding_timestamp=datetime.datetime(2020, 10, 7, 15, 27, 42, 903198),
eq_surrounding_dust=24.0885168316774, eq_surrounding_humidity=16.949569353381793, eq_surrounding_noise=12.256649392702574, eq_surrounding_temperature=8.141877435145844,
measurement_status='Good')

请注意,这两个表都有标识设备的 ID 和与测量时间相关的时间戳。

问题:我想根据设备 ID 和时间戳在这两个数据帧之间执行连接。问题是时间戳是用非常精确的读数记录的,这使得通过时间戳进行连接是不可能的(除非我将时间戳四舍五入,我想将其作为最后的手段)。以不同的频率记录设备上的设备和设备周围的读数。因此,我想仅基于设备 ID 执行连接,但对于某些时间戳值之间的窗口。这类似于使用水印在结构化流中所做的。

为此,我尝试使用上述结构化流中使用的等效操作,称为水印。这是代码:

# Add watermark defined by the timestamp column of each df
watermarked_equipment_surr = equipment_surrounding_df.withWatermark("eq_surr_timestamp", "0.2 seconds")
watermarked_on_equipment = on_equipment_df.withWatermark("on_eq_timestamp", "0.2 seconds")

# Define new equipment dataframe based on the watermarking conditions described
equipment_df = watermarked_on_equipment.join(
    watermarked_equipment_surr, 
    expr("""
        on_eq_equipid = eq_surr_equipid AND
        on_eq_timestamp >= eq_surrounding_timestamp AND
        on_eq_timestamp <= eq_surrounding_timestamp + interval 0.2 seconds
        """))

# Perfrom a count (error appears here)
print("Equipment size:", equipment_df.count())

执行此操作时出现错误。基于此,我有两个问题:

  1. 这是解决此类用例/问题的正确方法吗?
  2. 如果是这样,为什么我的代码中会出现错误?

先感谢您

更新:

所以我相信我找到了一半的解决方案,灵感来自:

在python中按时加入两个spark数据帧(TimestampType)

本质上,该解决方案通过在其中一个数据帧中创建两列,这表示使用 udfs 的时间戳的上限和下限。代码:


def lower_range_func(x, offset_milli=250):
    """
    From a timestamp and offset, get the timestamp obtained from subtractng the offset.
    """
    return x - timedelta(seconds=offset_milli/1000)

def upper_range_func(x,  offset_milli=250):
    """
    From a timestamp and offset, get the timestamp obtained from adding the offset.
    """
    return x + timedelta(seconds=offset_milli/1000)

# Create two dataframes
lower_range = udf(lower_range_func, TimestampType())
upper_range = udf(upper_range_func, TimestampType())

# Add these columns to the on_equipment dataframe
on_equipment_df = on_equipment_df\
    .withColumn('lower_on_eq_timestamp', lower_range(on_equipment_df["on_eq_timestamp"]))\
    .withColumn('upper_on_eq_timestamp', upper_range(on_equipment_df["on_eq_timestamp"]))

一旦我们有了这些列,我们就可以使用这些新列执行过滤连接。

# Join dataframes based on a filtered join
equipment_df = on_equipment_df.join(equipment_surrounding_df)\
    .filter(on_equipment_df.on_eq_timestamp > equipment_surrounding_df.lower_eq_surr_timestamp)\
    .filter(on_equipment_df.on_eq_timestamp < equipment_surrounding_df.upper_eq_surr_timestamp)

问题是,一旦我尝试使用设备 ID 加入,就像这样:

# Join dataframes based on a filtered join
equipment_df = on_equipment_df.join(
equipment_surrounding_df, on_equipment_df.on_eq_equipid == equipment_surrounding_df.eq_surr_equipid)\
    .filter(on_equipment_df.on_eq_timestamp > equipment_surrounding_df.lower_eq_surr_timestamp)\
    .filter(on_equipment_df.on_eq_timestamp < equipment_surrounding_df.upper_eq_surr_timestamp)

我得到一个错误。对这种方法有什么想法吗?

标签: pythonpysparkapache-spark-sql

解决方案


推荐阅读