scala - 如何在结构化流中创建列的所有值的列表?
问题描述
我有一个 spark 结构化流作业,它从 Kafka 获取记录(10,000 作为 maxOffsetsPerTrigger)。我通过 spark 的 readStream 方法获得所有这些记录。此数据框有一个名为“key”的列。
我需要 string(set(all values in that column 'key')) 在对 ElasticSearch 的查询中使用此字符串。
我已经尝试过df.select("key").collect().distinct()
,但它抛出异常:
collect() is not supported with structured streaming.
谢谢。
编辑:数据框:
+-------+-------------------+----------+
| key| ex|new column|
+-------+-------------------+----------+
| fruits| [mango, apple]| |
|animals| [cat, dog, horse]| |
| human|[ram, shyam, karun]| |
+-------+-------------------+----------+
架构:
root
|-- key: string (nullable = true)
|-- ex: array (nullable = true)
| |-- element: string (containsNull = true)
|-- new column: string (nullable = true)
我需要的字符串:
'["fruits", "animals", "human"]'
解决方案
对于这种用例,我建议使用foreachBatch
运算符:
foreachBatch(函数: (Dataset[T], Long) ⇒ Unit): DataStreamWriter[T]
设置要使用提供的函数处理的流查询的输出。这仅在微批处理执行模式中支持(即,当触发器不连续时)。
在每个微批次中,将在每个微批次中调用所提供的函数,其中 (i) 输出行作为数据集和 (ii) 批次标识符。
batchId 可用于去重和事务性地将输出(即提供的数据集)写入外部系统。对于相同的 batchId,输出数据集保证完全相同(假设所有操作在查询中都是确定性的)。
引用官方文档(有一些修改):
该
foreachBatch
操作允许您在流式查询的输出上应用任意操作和编写逻辑。
foreachBatch
允许对每个微批次的输出进行任意操作和自定义逻辑。
在同一个官方文档中,您可以找到一个示例代码,表明您可以相当轻松地完成您的用例。
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.select("key").collect().distinct()
}
推荐阅读
- java - 具有自定义路径和 API 名称的 Google Cloud Endpoints
- r - 生成随机字符串并在 R 中进行模拟
- vb.net - 填充类属性的首选方法?
- javascript - HighChart - 如何在 HighChart Print 中应用覆盖字幕样式 css
- node.js - 为什么 array.push() 在我的 Mongoose 模型中不起作用?
- angular - *ngIf 到底在做什么?
- c++ - 如何从基类动态创建和使用派生类?
- python-3.x - 使用 python 和 boto 按实例列出所有 T2 实例类型和 cpucredits
- java - 带有 Google Play 服务登录的 Libgdx
- html - 只有我导航栏中的最后一个链接有效,其余链接甚至在悬停时都不会改变