首页 > 解决方案 > A 列和 B 列之间的流差由 C 列和 D 列汇总

问题描述

如何将以下内容流式传输到表格中:

由 C 列和 D 列汇总的 A 列和 B 列之间的差异。

+-------------+-------------------+--+-
| Column_A|Column_B |Column_C|Column_D|
+-------------+-------------------+--+-
|52       |67       |boy     |car     |
|44       |25       |girl    |bike    |
|98       |85       |boy     |car     |
|52       |41       |girl    |car     |
+-------------+-------------------+--+-

这是我的尝试,但它不起作用:

difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C")
differenceStream = difference.writeStream\
  .queryName("diff_aggr")\
  .format("memory").outputMode("append")\
  .start()

我收到此错误:“GroupedData”对象没有属性“writeStream”

标签: pythonapache-sparkpysparkspark-streaming

解决方案


取决于你想如何聚合分组数据 - 你可以做例如

先决条件(如果您尚未设置它们):

from pyspark.sql import functions as F 
from pyspark.sql.functions import *

对于sum

difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.sum(F.col("Difference")).alias("Difference"))

对于max

difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.max(F.col("Difference")).alias("Difference"))

接着:

differenceStream = difference.writeStream\
  .queryName("diff_aggr")\
  .format("memory").outputMode("append")\
  .start()

关键是 - 如果你这样做,groupBy你还需要通过聚合来减少。如果您想将值排序在一起,请尝试df.sort(...)


推荐阅读