apache-spark - Apache Beam Metrics Counter 使用 SparkRunner 给出不正确的计数
问题描述
我有 1000 万条记录和 250 列的源和目标 csv 文件。我正在运行一个 apache 光束管道,它连接来自源文件和目标文件的所有列。当我在 spark 集群上运行它时,管道无异常地正确执行,但是,当使用以下 spark 属性时,连接束度量计数器返回双倍计数。-- executor-memory "2g" 但是,当我将 excutor-memory 增加到 11g 时,它会返回正确的计数。
我试过下面的例子,
Pipeline pipeline = Pipeline.create(options);
final TupleTag<String> eventInfoTag = new TupleTag<>();
final TupleTag<String> countryInfoTag = new TupleTag<>();
PCollection<KV<String, String>> eventInfo =
eventsTable.apply(ParDo.of(new ExtractEventDataFn()));
PCollection<KV<String, String>> countryInfo =
countryCodes.apply(ParDo.of(new ExtractCountryInfoFn()));
PCollection<KV<String, CoGbkResult>> kvpCollection =
KeyedPCollectionTuple.of(eventInfoTag, eventInfo)
.and(countryInfoTag, countryInfo)
.apply(CoGroupByKey.create());
PCollection<KV<String, String>> finalResultCollection =
kvpCollection.apply(
"Process",
ParDo.of(
new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, CoGbkResult> e = c.element();
String countryCode = e.getKey();
String countryName = "none";
countryName = e.getValue().getOnly(countryInfoTag);
for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) {
Metrics.counter("count", "errorcount").inc();
c.output(
KV.of(
countryCode,
"Country name: " + countryName + ", Event info: " + eventInfo));
}
}
}));
final PipelineResult result = pipeline.run();
MetricQueryResults metrics =
result
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(MetricNameFilter.inNamespace("count"))
.build());
Iterable<MetricResult<Long>> counters = metrics.getCounters();
for (MetricResult<Long> counter : counters) {
System.out.println("Hi >> "+counter.getName().getName() + " : " + counter.getAttempted() + " " + counter.getCommittedOrNull());
}
我需要这方面的帮助。谢谢
解决方案
在您的代码中,当Metrics.counter("count", "errorcount")
您定义计数器时。但它是在一个循环中定义的,该循环也是一种循环(processElement)。您应该将您的计数器定义为 DoFn 中的一个字段。不用担心 DoFn 会被重用于处理捆绑包。例如:private final Counter counter = Metrics.counter(MyClass.class, COUNTER_NAME);
您也只显示了部分代码,但我没有看到done
布尔值设置为 true。但这只是出于好奇。
最后但并非最不重要的一点是,您应该在 Beam 的主分支上尝试 spark runner,因为昨天合并了一个关于指标的修复(在同一 JVM 中运行多个管道时指标不会重置)。我不知道它是否符合您的用例,但值得一试。
推荐阅读
- kubernetes - GET_HOSTS_FROM 变量的值是多少?
- c - UndefinedBehaviorSanitizer 因为空指针
- spring-boot - 尝试从部署的任何进程启动实例时出错
- python - 当我想在单元测试中使用反向函数时,会发生 django urls.exceptions.NoReverseMatch 错误?
- java - 字符串替换方法是否使用正则表达式?
- python - Pandas - 使用循环操作和组合多个数据帧
- python - 带有正则表达式的 Python For 循环
- sql - PostgreSQL:有没有办法将我的数据一块一块地插入到空表中以减少加载时间?
- c# - 在列表中获取响应。将 sql 更改为 Linq 查询以聚合数据
- google-apps-script - 嘿,如何在电子邮件中将值 [1] 设为粗体,如何更改文本的颜色?