首页 > 解决方案 > Apache Beam MapElements 编码器问题

问题描述

我正在尝试使用 Beam 中的 MapElements。我正在从 Sql Server 加载一些行,并为每一行生成一个 KV,其中 K 是大陆名称,V 是行。后来我想按大陆分组,然后对人口求和。我为此尝试了各种方法。一种是使用 MapElements 生成 KV,然后使用 sum 进行分组。这种方法更多地用于理解基本的变换用法。但是,我遇到了与 Coder 相关的问题。请注意,此代码几个月前可以与旧版本的 Beam 一起使用。可能一些错误修复或新错误现在正在破坏它。代码如下。

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
    
JdbcIO.DataSourceConfiguration dsConfig = JdbcIO.DataSourceConfiguration
        .create("com.microsoft.sqlserver.jdbc.SQLServerDriver",
                "jdbc:sqlserver://PHANTOMLAP;databaseName=SSISTestDB;")
        .withUsername("**")
        .withPassword("**");

Schema schemaOut = Schema.builder()
        .addStringField("CountryName")
        .addInt32Field("CountryId")
        .addStringField("Continent")
        .addInt64Field("LatestRecordedPopulation")
        .build();

// Calculate total population of a continent based on country populations
PCollection<Row> rawData = pipeline.apply(JdbcIO.<Row>readRows()
        .withDataSourceConfiguration(dsConfig)
        .withQuery("select CountryName, CountryID, Continent, [LatestRecordedPopulation] from Countries where countryid < 10")
);

PCollection<KV<String, Row>> kvData = rawData
                .apply(MapElements
                        .via(new RowExtractFn(schemaOut))).setRowSchema(schemaOut);

RowExtractFn 是:

//    @DefaultCoder(KvCoder.class)
    static class RowExtractFn extends InferableFunction<Row, KV<String, Row>> {
        Schema schema;
        protected RowExtractFn(Schema schema) {
            super();
            this.schema = schema;
        }

        @Override
        public KV<String, Row> apply(Row input) throws Exception {
            org.joda.time.Instant ts = org.joda.time.Instant.now();
            List<Object> values = Collections.synchronizedList(new ArrayList<>());
            for(Object val : input.getValues()) {
                values.add(val);
            }

            Row input2 = Row.withSchema(schema).addValues(input.getValues()).build();
            KV<String, Row> op = KV.of(input.getString("Continent"), input2);
            return op;
        }

我收到以下错误:

java.lang.ClassCastException: org.apache.beam.sdk.values.KV cannot be cast to org.apache.beam.sdk.values.Row

不知道这发生在哪里。我尝试过使用 ProcessFunction(以前可以工作)、SerializableFunction、SimpleFunction 等。没有任何效果。我还尝试在 RowExtractFn 上设置默认编码器,但不起作用。这里似乎有什么问题?

问候

标签: javaapache-beam

解决方案


推荐阅读