apache-spark - 如何计算 Spark Structured Streaming 中的滞后差异?
问题描述
我正在编写一个 Spark Structured Streaming 程序。我需要创建一个具有滞后差异的附加列。
为了重现我的问题,我提供了代码片段。此代码data.json
使用存储在data
文件夹中的文件:
[
{"id": 77,"type": "person","timestamp": 1532609003},
{"id": 77,"type": "person","timestamp": 1532609005},
{"id": 78,"type": "crane","timestamp": 1532609005}
]
代码:
from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.sql.window import Window
from pyspark.sql.types import *
spark = SparkSession \
.builder \
.appName("Test") \
.master("local[2]") \
.getOrCreate()
schema = StructType([
StructField("id", IntegerType()),
StructField("type", StringType()),
StructField("timestamp", LongType())
])
ds = spark \
.readStream \
.format("json") \
.schema(schema) \
.load("data/")
diff_window = Window.partitionBy("id").orderBy("timestamp")
ds = ds.withColumn("prev_timestamp", func.lag(ds.timestamp).over(diff_window))
query = ds \
.writeStream \
.format('console') \
.start()
query.awaitTermination()
我收到此错误:
pyspark.sql.utils.AnalysisException: u'流数据帧/数据集不支持非基于时间的窗口;\nWindow [lag(timestamp#71L, 1, null) windowspecdefinition(host_id#68, timestamp#71L ASC NULLS首先,前 1 行和前 1 行之间的行)作为 prev_timestamp#129L]
解决方案
pyspark.sql.utils.AnalysisException: u'流数据帧/数据集不支持非基于时间的窗口
这意味着您的窗口应该基于timestamp
列。因此,如果您每秒都有一个数据点,并且您创建一个30s
带有 astride
的窗口10s
,那么您的结果窗口将创建一个新window
列,其中start
和end
列将包含差异为 的时间戳30s
。
您应该以这种方式使用窗口:
words = words.withColumn('date_time', F.col('date_time').cast('timestamp'))
w = F.window('date_time', '30 seconds', '10 seconds')
words = words \
.withWatermark('date_format', '1 minutes') \
.groupBy(w).agg(F.mean('value'))
推荐阅读
- c# - “x : {0}”到底是什么意思?
- ruby-on-rails - Rails 验证多态关联模型属性
- python - “AnonymousUser”对象没有属性“_meta”
- java - 通过 Hibernate 获取数据而不传递主键
- javascript - Javascript字符串到数组深度对象
- python - 获取字典中所有值的扁平列表
- cassandra - nodetool 退役行为很奇怪
- google-drive-api - 使用网络应用程序共享存在于谷歌驱动器中的文件
- regex - 如何在记事本++中为每一行添加右大括号
- c# - 有什么方法可以检测外部进程是否在独占全屏模式下运行?