首页 > 解决方案 > 如何在结构化流中创建列的所有值的列表?

问题描述

我有一个 spark 结构化流作业,它从 Kafka 获取记录(10,000 作为 maxOffsetsPerTrigger)。我通过 spark 的 readStream 方法获得所有这些记录。此数据框有一个名为“key”的列。

我需要 string(set(all values in that column 'key')) 在对 ElasticSearch 的查询中使用此字符串。

我已经尝试过df.select("key").collect().distinct(),但它抛出异常:

 collect() is not supported with structured streaming.

谢谢。

编辑:数据框:

+-------+-------------------+----------+
|    key|                 ex|new column|
+-------+-------------------+----------+
| fruits|     [mango, apple]|          |
|animals|  [cat, dog, horse]|          |
|  human|[ram, shyam, karun]|          |
+-------+-------------------+----------+

架构:

root
 |-- key: string (nullable = true)
 |-- ex: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- new column: string (nullable = true)

我需要的字符串:

'["fruits", "animals", "human"]'

标签: scalaapache-sparkelasticsearchspark-structured-streaming

解决方案


对于这种用例,我建议使用foreachBatch运算符:

foreachBatch(函数: (Dataset[T], Long) ⇒ Unit): DataStreamWriter[T]

设置要使用提供的函数处理的流查询的输出。这仅在微批处理执行模式中支持(即,当触发器不连续时)。

在每个微批次中,将在每个微批次中调用所提供的函数,其中 (i) 输出行作为数据集和 (ii) 批次标识符。

batchId 可用于去重和事务性地将输出(即提供的数据集)写入外部系统。对于相同的 batchId,输出数据集保证完全相同(假设所有操作在查询中都是确定性的)。

引用官方文档(有一些修改):

foreachBatch操作允许您在流式查询的输出上应用任意操作和编写逻辑。

foreachBatch允许对每个微批次的输出进行任意操作和自定义逻辑。

在同一个官方文档中,您可以找到一个示例代码,表明您可以相当轻松地完成您的用例。

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.select("key").collect().distinct()
}

推荐阅读