google-cloud-dataflow - 无法从 ApacheBeam sql 调用“ApproximateDistinct.ApproximateDistinctFn”
问题描述
尝试使用ApproximateDistinct.ApproximateDistinctFn
apache beam sql 中的聚合函数,但失败了。
我的 SQL:
SELECT
ApproximateDistinct(user_id) as distinct_count,
profile,
country_code,
FROM PCOLLECTION
GROUP BY profile,country_code
我在 Apache-Beam 中调用此查询的代码:
events.apply(SqlTransform.query(query)
.registerUdaf("ApproximateDistinct", ApproximateDistinct.ApproximateDistinctFn.create(StringUtf8Coder.of()).withSparseRepresentation(16)))
这引发了这个异常:
aused by: java.lang.ClassCastException: class sun.reflect.generics.reflectiveObjects.TypeVariableImpl cannot be cast to class java.lang.Class (sun.reflect.generics.reflectiveObjects.TypeVariableImpl and java.lang.Class are in module java.base of loader 'bootstrap')
at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.sqlTypeWithAutoCast(CalciteUtils.java:210)
at org.apache.beam.sdk.extensions.sql.impl.UdafImpl$1.getType(UdafImpl.java:69)
at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.prepare.CalciteCatalogReader.toOp(CalciteCatalogReader.java:313)
at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.prepare.CalciteCatalogReader.toOp(CalciteCatalogReader.java:300)
at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.prepare.CalciteCatalogReader.lambda$lookupOperatorOverloads$3(CalciteCatalogReader.java:269)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1654)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.forEachOrdered(ReferencePipeline.java:502)
at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.prepare.CalciteCatalogReader.lookupOperatorOverloads(CalciteCatalogReader.java:270)
at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:72)
at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1162)
at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1147)
at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1176)
at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1147)
at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:921)
at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:632)
at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.prepare.PlannerImpl.validate(PlannerImpl.java:188)
解决方案
发现自己如何克服这一点。输入编码器的泛型类型存在问题。此外,组合函数输出不是有效的 SQL 类型。因此,我复制了该类,删除了泛型,并更改了输出类型。这就是我得到的:
public class ApproximateDistinctCardinalityFn
extends Combine.CombineFn<String, HyperLogLogPlus, Long> {
private final int p;
private final int sp;
private final Coder<String> inputCoder;
\\ here copy rest of the code
}
和提取功能:
@Override
public Long extractOutput(HyperLogLogPlus accumulator) {
return accumulator.cardinality();
}
推荐阅读
- r - rpart 可以同时处理数字数据和分类数据吗?
- angularjs - 尝试关闭模式时可能未处理的拒绝
- c - Duplicating a data structure with pointers
- python - 列表中的值在 Python3 中被错误地解析
- ios11 - 在推送通知期间来自后台时,ios 11 上的 PJSIP 连接错误
- json - Parse and array of objects in JSON to an Object in Swift
- couchdb - Couchdb database name
- c# - Merge, Union, Intersect C# List of Objects
- javascript - Normalizr:如何规范嵌套键并恢复到相同的形状?
- ssl - 如何在 Ubuntu 18.04 上使用 SSL 安装 NGINX + Varnish 6.1.0???