python - A 列和 B 列之间的流差由 C 列和 D 列汇总
问题描述
如何将以下内容流式传输到表格中:
由 C 列和 D 列汇总的 A 列和 B 列之间的差异。
+-------------+-------------------+--+-
| Column_A|Column_B |Column_C|Column_D|
+-------------+-------------------+--+-
|52 |67 |boy |car |
|44 |25 |girl |bike |
|98 |85 |boy |car |
|52 |41 |girl |car |
+-------------+-------------------+--+-
这是我的尝试,但它不起作用:
difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C")
differenceStream = difference.writeStream\
.queryName("diff_aggr")\
.format("memory").outputMode("append")\
.start()
我收到此错误:“GroupedData”对象没有属性“writeStream”
解决方案
取决于你想如何聚合分组数据 - 你可以做例如
先决条件(如果您尚未设置它们):
from pyspark.sql import functions as F
from pyspark.sql.functions import *
对于sum
:
difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.sum(F.col("Difference")).alias("Difference"))
对于max
:
difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.max(F.col("Difference")).alias("Difference"))
接着:
differenceStream = difference.writeStream\
.queryName("diff_aggr")\
.format("memory").outputMode("append")\
.start()
关键是 - 如果你这样做,groupBy
你还需要通过聚合来减少。如果您想将值排序在一起,请尝试df.sort(...)
推荐阅读
- python - 我正在用python编写单元测试。导入模块时出现此错误
- typescript - 从多种类型中创建一个对象类型
- c - sigaction未初始化gcc 7
- reactjs - 如何获取格式化文本draft-js
- python - how to inherit __init__ attributes from parent class to __init__ in the child class?
- angularjs - DevExpress - 如果另一个单元格具有值,如何禁用一个单元格的编辑
- android - 错误代码 413:您的客户端发出的请求在 WEB VIEW 中太大
- substrate - 在 Polkadot 应用程序开发人员选项卡中,您如何编码 Rust 元组?
- python-3.x - 使用给定 url、代理和用户凭据的 python 下载文件的代码
- scala - 测试中*的含义是什么?