首页 > 解决方案 > 如何在 Apache Beam Java SDK 中的多个列上使用 aggregateField()?

问题描述

在 Apache Beam Python SDK 中,可以执行以下操作:

input
| GroupBy(account=lambda s: s["account"])
.aggregate_field(lambda x: x["wordsAddup"] - x["wordsSubtract"], sum, 'wordsRead')

我们如何在 Java SDK 中执行类似的操作?奇怪的是,编程指南中只有 Python中用于此转换的示例。

这是我在 Java 中生成等效项的尝试:

input.apply(
Group.byFieldNames("account")
.aggregateField(<INSERT EQUIVALENT HERE>, Sum.ofIntegers(), "wordsRead"));

标签: javaapache-beam

解决方案


https://beam.apache.org/documentation/programming-guide/#using-schemas中有一些 Java 示例。(请注意,您可能必须选择java同时具有 Java 和 Python 的选择器上的选项卡才能看到它们。)

在 Java 中,我不认为 aggregateField 的第一个参数可以采用任意表达式;它必须是字段名称。您可以使用为所需表达式添加新字段的投影来继续分组操作。例如

input
    .apply(SqlTransform.query(
        "SELECT *, wordsAddup - wordsSubtract AS wordsDiff from PCOLLECTION")
    .apply(Group.byFieldNames("account")
        .aggregateField("wordsDiff", Sum.ofIntegers(), "wordsRead"));

推荐阅读