java - Apache Beam MapElements 编码器问题
问题描述
我正在尝试使用 Beam 中的 MapElements。我正在从 Sql Server 加载一些行,并为每一行生成一个 KV,其中 K 是大陆名称,V 是行。后来我想按大陆分组,然后对人口求和。我为此尝试了各种方法。一种是使用 MapElements 生成 KV,然后使用 sum 进行分组。这种方法更多地用于理解基本的变换用法。但是,我遇到了与 Coder 相关的问题。请注意,此代码几个月前可以与旧版本的 Beam 一起使用。可能一些错误修复或新错误现在正在破坏它。代码如下。
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
JdbcIO.DataSourceConfiguration dsConfig = JdbcIO.DataSourceConfiguration
.create("com.microsoft.sqlserver.jdbc.SQLServerDriver",
"jdbc:sqlserver://PHANTOMLAP;databaseName=SSISTestDB;")
.withUsername("**")
.withPassword("**");
Schema schemaOut = Schema.builder()
.addStringField("CountryName")
.addInt32Field("CountryId")
.addStringField("Continent")
.addInt64Field("LatestRecordedPopulation")
.build();
// Calculate total population of a continent based on country populations
PCollection<Row> rawData = pipeline.apply(JdbcIO.<Row>readRows()
.withDataSourceConfiguration(dsConfig)
.withQuery("select CountryName, CountryID, Continent, [LatestRecordedPopulation] from Countries where countryid < 10")
);
PCollection<KV<String, Row>> kvData = rawData
.apply(MapElements
.via(new RowExtractFn(schemaOut))).setRowSchema(schemaOut);
RowExtractFn 是:
// @DefaultCoder(KvCoder.class)
static class RowExtractFn extends InferableFunction<Row, KV<String, Row>> {
Schema schema;
protected RowExtractFn(Schema schema) {
super();
this.schema = schema;
}
@Override
public KV<String, Row> apply(Row input) throws Exception {
org.joda.time.Instant ts = org.joda.time.Instant.now();
List<Object> values = Collections.synchronizedList(new ArrayList<>());
for(Object val : input.getValues()) {
values.add(val);
}
Row input2 = Row.withSchema(schema).addValues(input.getValues()).build();
KV<String, Row> op = KV.of(input.getString("Continent"), input2);
return op;
}
我收到以下错误:
java.lang.ClassCastException: org.apache.beam.sdk.values.KV cannot be cast to org.apache.beam.sdk.values.Row
不知道这发生在哪里。我尝试过使用 ProcessFunction(以前可以工作)、SerializableFunction、SimpleFunction 等。没有任何效果。我还尝试在 RowExtractFn 上设置默认编码器,但不起作用。这里似乎有什么问题?
问候
解决方案
推荐阅读
- python - Jaden Casing String:如何返回带有大写单词的句子字符串?
- kubernetes - 如何修复 NetworkUnavailable:kubernetes 节点中的 True 错误
- python-3.x - 试图抓取“hellopeter.com”无法深入了解实际评论
- python - 检测在哪里捕获了异常
- angular - Angular:测试拦截器正确的请求顺序
- node.js - 资金无法发送到位于我们的账户,因为它在您的平台区域之外
- c# - Blazor .Net Core 3.0 Preview 9 - AuthenticationStateProvider 实现问题
- electron - 如何将 Electron 的 crashReporter 的 minidump 文件上传到远程服务器并存储在本地?
- ios - CloudKit CKError 扩展在 Objective-C 中不可用?
- c# - 如何使用 Newtonsoft 解析这个 Json