apache-spark - 结构化流写入多个流
问题描述
我的情景
- 从流中获取数据并调用返回 json 字符串的 UDF。JSON 字符串中的属性之一是 UniqueId,UDF 生成为 guid.newGuid() (C#)。
- UDF 的 DataFrame 输出基于某些拟合器写入多个流/接收器。
问题:
- 每个接收器都会为 UDF 生成的 UniqueId 获取一个新值。如何为所有接收器维护相同的 UniqueId。
- 如果每个接收器的 UniqueId 值不同,这是否意味着每个接收器都会多次调用我的 UDF?
- 如果 UDF 被调用两次,那么有什么选项可以让它调用一次然后将相同的数据写入不同的接收器
inData = spark.readstream().format("eventhub")
udfdata = indata.select(from_json(myudf("column"), schema)).as("result").select(result.*)
filter1 = udfdata.filter("column =='filter1'")
filter 2 = udfdata.filter("column =='filter2'")
# write filter1 to two differnt sinks
filter1.writestream().format(delta).start(table1)
filter1.writestream().format(eventhub).start()
# write filter2 to two differnt sinks
filter2.writestream().format(delta).start(table2)
filter2.writestream().format(eventhub).start()
解决方案
每次您调用时,.writestream()....start()
您都在创建一个新的独立流式查询。
这意味着对于您定义的每个输出接收器,Spark 将再次从输入源读取并处理数据帧。
如果您只想读取和处理一次然后输出到多个接收器,您可以使用foreachBatch 接收器作为一种解决方法:
inData = spark.readstream().format("eventhub")
udfdata = indata.select(from_json(myudf("column"), schema)).as("result").select(result.*)
udfdata.writeStream().foreachBatch(filter_and_output).start()
def filter_and_output(udfdata, batchId):
# At this point udfdata is a batch dataframe, no more a streaming dataframe
udfdata.cache()
filter1 = udfdata.filter("column =='filter1'")
filter2 = udfdata.filter("column =='filter2'")
# write filter1
filter1.write().format(delta).save(table1)
filter1.write().format(eventhub).save()
# write filter2
filter2.write().format(delta).save(table2)
filter2.write().format(eventhub).save()
udfdata.unpersist()
您可以在Spark Structured Streaming 文档中了解有关 foreachBatch 的更多信息。
回答您的问题
- 如果您使用 foreachBatch,您的数据将只处理一次,并且所有接收器都将拥有相同的 UniqueId
- 是的
- 使用 foreachBatch 将解决问题
推荐阅读
- javascript - 使用多个搜索框过滤单个表
- android - 单击时,密码会在 Leanback 引导式步骤操作中显示
- java - Java 8:对返回 Function 对象的方法进行单元测试
- javascript - 如何调整大小
放置元素
- c++ - C ++捕获分叉子cout
- debugging - 需要用于调试模板的 Visual Studio 代码配置
- sql - SQL - SAP HANA - 控制台内容长 - 将一些代码传递给过程?
- php - 调用未定义的函数 sodium_randombytes_buf()
- javascript - 如何在画布中获取图像的实际大小?
- typescript - `string & any[]` 不应该导致 `never` 吗?