java - 读取缓慢的更改查找和丰富流式输入集合的最佳方法是什么?
问题描述
我正在使用 Apache Beam,流式集合为 1.5GB。我的查找表是一个 JDBCio mysql 响应。
当我在没有侧面输入的情况下运行管道时,我的工作将在大约 2 分钟内完成。当我使用侧面输入运行我的工作时,我的工作永远不会完成,卡住和死亡。
这是我用来存储查找的代码(~1M 记录)
PCollectionView<Map<String,String>> sideData = pipeline.apply(JdbcIO.<KV<String, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://ip")
.withUsername("username")
.withPassword("password"))
.withQuery("select a_number from cell")
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() {
public KV<String, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getString(1), resultSet.getString(1));
}
})).apply(View.asMap());
这是我的流媒体集合的代码
pipeline
.apply("ReadMyFile", TextIO.read().from("/home/data/**")
.watchForNewFiles(Duration.standardSeconds(60), Watch.Growth.<String>never()))
.apply(Window.<String>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
.accumulatingFiredPanes()
.withAllowedLateness(ONE_DAY))
这是我的 parDo 的代码,用于迭代每个事件行(10M 记录)
.apply(ParDo.of(new DoFn<KV<String,Integer>,KV<String,Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String,Integer> i = c.element();
String sideInputData = c.sideInput(sideData).get(i.getKey());
if (sideInputData == null) {
c.output(i);
}
}
}).withSideInputs(sideData));
我正在使用 flink 集群,但使用直接运行器输出相同。
簇:
2 cpu 6 核 24gb 内存
我究竟做错了什么? 我已经关注了这个
解决方案
解决方案是创建一个“缓存”MAP。
sideInput 仅触发一次,然后我将其缓存到地图等效结构中。
所以,我避免为每个 processElement 做一个 sideInput。
.apply(ParDo.of(new DoFn<KV<String,Integer>,KV<String,Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
if (isFirstTime) {
myList = c.sideInput(sideData);
}
isFirstTime = false;
boolean result = myList.containsKey(c.element().getKey());
if (result == false) {
c.output(i);
}
}
}).withSideInputs(sideData));
推荐阅读
- webrtc - TURN 服务器不响应 TCP 中继候选
- php - Google Cloud Vision 能否返回更多语言的标签?
- python - 成功安装 PIL,同样 ImportError: cannot import name 'imread' from 'scipy.misc'
- python - 芹菜工人从哪个目录开始
- c# - Entity Framework Core 2.2.4 中 System.Data.Entity 的等价物是什么?
- c - C - 在结构数组中打印二维数组的元素
- java - Android可绘制链接失败
- r - ggplot:值的范围作为离散线范围图
- c# - 无法将字符串隐式转换为 int
- javascript - 如何在 javascript 事件中的前 2 个字符之后删除表单文本框中的第一个零?