java - Apache Flink 减少了许多值而不是一个值
问题描述
我正在尝试在 WindowedStream 上实现减少,如下所示:
.keyBy(t -> t.key)
.timeWindow(Time.of(15, MINUTES), Time.of(1, MINUTES))
.reduce(new ReduceFunction<TwitterSentiments>() {
@Override
public TwitterSentiments reduce(TwitterSentiments t2, TwitterSentiments t1) throws Exception {
t2.positive += t1.positive;
t2.neutral += t1.neutral;
t2.negative += t1.negative;
return t2;
}
});
我遇到的问题是,当我调用 stream.print() 时,我得到了许多值(看起来像每个 TwitterSentiments 对象一个,而不是单个聚合对象。
我也尝试过使用这样的 AggregationFunction ,但存在同样的问题:
.aggregate(new AggregateFunction<TwitterSentiments, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
@Override
public Tuple3<Long, Long, Long> createAccumulator() {
return new Tuple3<Long, Long, Long>(0L,0L,0L);
}
@Override
public Tuple3<Long, Long, Long> add(TwitterSentiments ts, Tuple3<Long, Long, Long> accumulator) {
return new Tuple3<Long, Long, Long>(
accumulator.f0 + ts.positive.longValue(),
accumulator.f1 + ts.neutral.longValue(),
accumulator.f2 + ts.negative.longValue()
);
}
@Override
public Tuple3<Long, Long, Long> getResult(Tuple3<Long, Long, Long> accumulator) {
return accumulator;
}
@Override
public Tuple3<Long, Long, Long> merge(Tuple3<Long, Long, Long> accumulator1, Tuple3<Long, Long, Long> accumulator2) {
return new Tuple3<Long, Long, Long>(
accumulator1.f0 + accumulator2.f0,
accumulator1.f1 + accumulator2.f1,
accumulator1.f2 + accumulator2.f1);
}
});
stream.print() 在这些聚合之后仍然会输出许多记录的原因是什么?
解决方案
如果您不需要每个键的结果,则可以使用 timeWindowAll 生成单个结果。但是,timeWindowAll 不会并行运行。如果您想以更具可扩展性的方式计算结果,您可以这样做:
.keyBy(t -> t.key)
.timeWindow(<time specification>)
.reduce(<reduce function>)
.timeWindowAll(<same time specification>)
.reduce(<same reduce function>)
您可能希望 Flink 的运行时足够智能,可以为您执行此并行预聚合(假设您使用的是 ReduceFunction 或 AggregateFunction),但事实并非如此。
推荐阅读
- android - 在 Buildozer 中使用 CrystaX 构建 APK 时出错
- java - Weka CSVloader - 错误(错误的值数。读取)
- reactjs - 使用 React Router 的 Docker 反向代理到 React App
- android - 然后比较不同时间并显示错误消息android
- codenameone - 生产应用程序上的 Google 控制台崩溃报告
- blockchain - 在本地运行以太坊钱包软件
- visual-studio-code - 更改图标状态栏的颜色
- amazon-web-services - AWS CodeBuild 无法同步到 S3 存储桶 ListObject 被拒绝权限
- sql - 获取增量模式
- python-3.x - 使用 Python 绘制 Alpha Vantage API