apache-beam - 检查 pcollection 是否为空的 Java 代码
问题描述
我正在尝试根据 pubsub 消息编写一个管道来插入/更新/删除 mysql 表。在插入特定表时,我必须检查数据是否存在于其他表中,并且仅在数据可用时才进行插入在另一张桌子上
当另一个表(PCollection)中没有数据时,我将不得不停止插入过程。
PCollection recordCount= windowedMatchedCollection.apply(Combine.globally(new CountElements()).withoutDefaults());
这条线似乎没有帮助。请对此有任何意见
解决方案
有点不清楚您到底要做什么,但这应该可以通过计算元素来实现。例如,假设您有
# A PCollection of (table, new_row) KVs.
new_data = ...
# A PCollection of (table, old_row) KVs.
old_data = ...
然后你可以做
rows_per_old_table = old_data | beam.CombinePerKey(
beam.combiners. CountCombineFn())
并使用它通过侧面输入过滤掉您的数据。
def maybe_filter_row(table_and_row, old_table_count):
# table_and_row comes from the PCollection new_data
# old_table_count is the side input as a Map
table = table_and_row[0]
if old_table_count.get(table) > 0:
yield table_and_row
new_data_to_update = new_data | beam.FlatMap(
maybe_filter_row,
old_table_count=beam.pvalue.AsMap(rows_per_old_table))
现在,您new_data_to_update
将只包含 old_data 中行数非零的表的数据。
如果您尝试以流式方式执行此操作,则必须对所有内容进行窗口化,包括old_data
,并且它只会过滤掉在同一窗口中具有数据的那些内容。你可以改为做类似的事情
# Create a PCollection containing the set of tables in new_data, per window.
tables_to_consider = (
new_data
| beam.GroupByKey()
| beam.MapTuple(lambda table, rows: table))
rows_per_old_table = tables_to_consider | beam.ParDo(
SomeDoFnLookingUpCurrentSizeOfEachTable())
# Continue as before.
推荐阅读
- c++ - 我对节日示例脚本的声音输出有疑问
- php - 无法包含文件,但代码确实有效
- vim - 有没有更快的方法将 vim-snippet 中的片段转换为 UltiSnips 格式?
- javascript - 在 node.js 服务器中,我们如何将错误(如果发生在传输之间)添加到包含图像/视频等文件数据流的响应中
- postgresql - Hibernate/Postgres revınfo 不存在
- python - 如何从 FITS 文件中进行圆形剪切并保留 WCS
- python - 如何从 web url 获取 Href Url 链接
- vue.js - Nuxt 和 Vuetify,如何在 v-toolbar 等组件中使用颜色
- facebook - Facebook 服务器端转换像素 API 是否需要域验证?
- c++ - 为什么我们可以在 C++ 中将类定义声明为结构对象?