python - 使用水印从时间戳和 id 在 pyspark 中执行批量连接
问题描述
我有两个数据框,其中包含设备和设备周边信息。设备数据框中的一行具有以下架构:
on_equipment_df.head()
# Output of notebook:
Row(
on_eq_equipid='1',
on_eq_timestamp=datetime.datetime(2020, 10, 7, 15, 27, 42, 866098),
on_eq_charge=3.917107463423725, on_eq_electric_consumption=102.02754516792204, on_eq_force=10.551710736897613, on_eq_humidity=22.663245200558457, on_eq_pressure=10.813417893943944, on_eq_temperature=29.80448721128125, on_eq_vibration=4.376662536641158,
measurement_status='Bad')
并且从设备周围一排看起来是这样的:
equipment_surrounding_df.head()
# Output of notebook
Row(
eq_surrounding_equipid='1',
eq_surrounding_timestamp=datetime.datetime(2020, 10, 7, 15, 27, 42, 903198),
eq_surrounding_dust=24.0885168316774, eq_surrounding_humidity=16.949569353381793, eq_surrounding_noise=12.256649392702574, eq_surrounding_temperature=8.141877435145844,
measurement_status='Good')
请注意,这两个表都有标识设备的 ID 和与测量时间相关的时间戳。
问题:我想根据设备 ID 和时间戳在这两个数据帧之间执行连接。问题是时间戳是用非常精确的读数记录的,这使得通过时间戳进行连接是不可能的(除非我将时间戳四舍五入,我想将其作为最后的手段)。以不同的频率记录设备上的设备和设备周围的读数。因此,我想仅基于设备 ID 执行连接,但对于某些时间戳值之间的窗口。这类似于使用水印在结构化流中所做的。
为此,我尝试使用上述结构化流中使用的等效操作,称为水印。这是代码:
# Add watermark defined by the timestamp column of each df
watermarked_equipment_surr = equipment_surrounding_df.withWatermark("eq_surr_timestamp", "0.2 seconds")
watermarked_on_equipment = on_equipment_df.withWatermark("on_eq_timestamp", "0.2 seconds")
# Define new equipment dataframe based on the watermarking conditions described
equipment_df = watermarked_on_equipment.join(
watermarked_equipment_surr,
expr("""
on_eq_equipid = eq_surr_equipid AND
on_eq_timestamp >= eq_surrounding_timestamp AND
on_eq_timestamp <= eq_surrounding_timestamp + interval 0.2 seconds
"""))
# Perfrom a count (error appears here)
print("Equipment size:", equipment_df.count())
执行此操作时出现错误。基于此,我有两个问题:
- 这是解决此类用例/问题的正确方法吗?
- 如果是这样,为什么我的代码中会出现错误?
先感谢您
更新:
所以我相信我找到了一半的解决方案,灵感来自:
在python中按时加入两个spark数据帧(TimestampType)
本质上,该解决方案通过在其中一个数据帧中创建两列,这表示使用 udfs 的时间戳的上限和下限。代码:
def lower_range_func(x, offset_milli=250):
"""
From a timestamp and offset, get the timestamp obtained from subtractng the offset.
"""
return x - timedelta(seconds=offset_milli/1000)
def upper_range_func(x, offset_milli=250):
"""
From a timestamp and offset, get the timestamp obtained from adding the offset.
"""
return x + timedelta(seconds=offset_milli/1000)
# Create two dataframes
lower_range = udf(lower_range_func, TimestampType())
upper_range = udf(upper_range_func, TimestampType())
# Add these columns to the on_equipment dataframe
on_equipment_df = on_equipment_df\
.withColumn('lower_on_eq_timestamp', lower_range(on_equipment_df["on_eq_timestamp"]))\
.withColumn('upper_on_eq_timestamp', upper_range(on_equipment_df["on_eq_timestamp"]))
一旦我们有了这些列,我们就可以使用这些新列执行过滤连接。
# Join dataframes based on a filtered join
equipment_df = on_equipment_df.join(equipment_surrounding_df)\
.filter(on_equipment_df.on_eq_timestamp > equipment_surrounding_df.lower_eq_surr_timestamp)\
.filter(on_equipment_df.on_eq_timestamp < equipment_surrounding_df.upper_eq_surr_timestamp)
问题是,一旦我尝试使用设备 ID 加入,就像这样:
# Join dataframes based on a filtered join
equipment_df = on_equipment_df.join(
equipment_surrounding_df, on_equipment_df.on_eq_equipid == equipment_surrounding_df.eq_surr_equipid)\
.filter(on_equipment_df.on_eq_timestamp > equipment_surrounding_df.lower_eq_surr_timestamp)\
.filter(on_equipment_df.on_eq_timestamp < equipment_surrounding_df.upper_eq_surr_timestamp)
我得到一个错误。对这种方法有什么想法吗?
解决方案
推荐阅读
- vba - Excel VBA从形状中删除渐变
- php - Mark a certain cell in a html table - (voting table)
- vue.js - vue js 显示 obj-array 值 v-select
- java - Spring mvc Controller returns jsp code instead of jsp page
- python - 将特定值中的值替换为相应的值
- apex - 有没有人找到一种方法来抑制 PMD 的 APEX 代码中的误报警告?
- python - 搜索 csv 文件中的列
- angular - GoJs- Angular 4:错误类型错误:无法在 Function.F.fromJson.F.fromJSON 处读取 null 的属性“类”
- javascript - Javascript 创建、调用和触发自定义回调
- android - 处理活动初始化