apache-beam - Beam SQL - SqlValidatorException:找不到对象'PCOLLECTION'
问题描述
我正在用 Beam SQL 做一些实验。我PCollection<Row>
从转换中得到 aSampleSource
并将其输出传递给 a SqlTransform
。
String sql1 = "select c1, c2, c3 from PCOLLECTION where c1 > 1";
下面的代码运行没有任何错误。
POutput it = p.apply(new SampleSource()).apply(SqlTransform.query(sql1));
p.run().waitUntilFinish();
但是,当我尝试以下代码行时,出现运行时错误。
POutput it = p.apply(new SampleSource());
it.getPipeline().apply(SqlTransform.query(sql1));
p.run().waitUntilFinish();
错误详细信息是
Caused by: org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql.validate.SqlValidatorException: Object 'PCOLLECTION' not found
请提供一些指示。
解决方案
它不起作用,因为您将 aSqlTransform
应用于管道,而不是PCollection
.
您可能希望按照以下方式更改它:
// source probably returns a PCollection,
// would make sense to change 'it' to PCollection:
PCollection<...> it = p.apply(new SampleSource());
// then apply SqlTransform to the PCollection from the previous step,
// that is apply it directly to 'it':
it.apply(SqlTransform.query(sql1));
...
从高层次的角度来看,Beam 管道是如何工作的:
- 创建管道;
- 应用
PTransform
从某个源读取的 IO 并生成从源PColelction
读取的某些元素; - 链式应用更多
PTransforms
来自PCollection
上一步的数据来处理数据(从概念上讲,PCollections
每一步都会产生不同的结果); - 重复;
SqlTransform
是一个 normal PTransform
,它应该被应用到一个PCollection
元素上并PCollection
作为结果输出另一个。您在其中指定的查询将SqlTransform.create()
应用于PCollection
. 它期望数据来自一个神奇的PCOLLECTION
表,代表PCollection
你应用的那个SqlTransform
。
您在示例中所做的是不同的:
- 创建管道;
- 应用
PTransform
产生 aPOutput
不一定 a的源PCollection
; - 如果您的源代码,则忽略输出,而是采用原始管道并将 a
SqlTransform
直接应用于它;
所以发生的情况是,SqlTransform
在这种情况下,它被应用于管道的“根”,而不是PCollection
来自源的那个。而不是一个接一个地应用链,PTransforms
您现在有两个PTransforms
彼此独立地应用于根。
另一个需要注意的是,SqlTransform
期望输入元素是Rows
,因为 SQL 作为一种语言仅适用于表示为行的数据。有两种方法可以实现这一点:
- 通过在源和之间
Rows
应用另一个元素,手动将源生成的元素转换为;ParDo
SqlTransform
- 使用 Beam 的
Schema
框架(例如 check outPCollection.setSchema()
方法),它允许 Beam SQL 自动将输入元素转换为Rows
;
推荐阅读
- node.js - JS 应用程序 REACTDOM.render() 崩溃
- angular - 跳过 TypeScript 函数中的参数
- memory - 释放 SAS 内存
- python - 如何将扩展的 html 重定向到 Flask 中的登录表单
- confluence - 尝试使用 REST API 创建页面时出现 500 错误
- python - 使用 Python 获取默认 Internet 浏览器
- c# - nuget 包与其源之间的联系是什么?
- kubernetes-helm - 让 Helm 在图表中包含文件但不解析它们并能够使用 -f 引用它们
- c++ - vtable 中未对齐的地址
- java - 我的代码确实应该打印它不起作用的素数之间的差距。“没有错误”