首页 > 解决方案 > Apache Beam Metrics Counter 使用 SparkRunner 给出不正确的计数

问题描述

我有 1000 万条记录和 250 列的源和目标 csv 文件。我正在运行一个 apache 光束管道,它连接来自源文件和目标文件的所有列。当我在 spark 集群上运行它时,管道无异常地正确执行,但是,当使用以下 spark 属性时,连接束度量计数器返回双倍计数。-- executor-memory "2g" 但是,当我将 excutor-memory 增加到 11g 时,它会返回正确的计数。

我试过下面的例子,

    Pipeline pipeline = Pipeline.create(options);
    final TupleTag<String> eventInfoTag = new TupleTag<>();
    final TupleTag<String> countryInfoTag = new TupleTag<>();



    PCollection<KV<String, String>> eventInfo =
    eventsTable.apply(ParDo.of(new ExtractEventDataFn()));
    PCollection<KV<String, String>> countryInfo =
    countryCodes.apply(ParDo.of(new ExtractCountryInfoFn()));



    PCollection<KV<String, CoGbkResult>> kvpCollection =
    KeyedPCollectionTuple.of(eventInfoTag, eventInfo)
        .and(countryInfoTag, countryInfo)
        .apply(CoGroupByKey.create());

    PCollection<KV<String, String>> finalResultCollection =
    kvpCollection.apply(
        "Process",
        ParDo.of(
            new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
              @ProcessElement
              public void processElement(ProcessContext c) {
                KV<String, CoGbkResult> e = c.element();
                String countryCode = e.getKey();
                String countryName = "none";
                countryName = e.getValue().getOnly(countryInfoTag);
                for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) {
                    Metrics.counter("count", "errorcount").inc();
                  c.output(
                      KV.of(
                          countryCode,
                          "Country name: " + countryName + ", Event info: " + eventInfo));
                }
              }
            }));

    final PipelineResult result = pipeline.run();
    MetricQueryResults metrics =
        result
            .metrics()
            .queryMetrics(
                MetricsFilter.builder()
                    .addNameFilter(MetricNameFilter.inNamespace("count"))
                    .build());
    Iterable<MetricResult<Long>> counters = metrics.getCounters();
    for (MetricResult<Long> counter : counters) {
        System.out.println("Hi  >> "+counter.getName().getName() + " : " + counter.getAttempted() + " " + counter.getCommittedOrNull());

    }

我需要这方面的帮助。谢谢

标签: apache-sparkapache-beam

解决方案


在您的代码中,当Metrics.counter("count", "errorcount")您定义计数器时。但它是在一个循环中定义的,该循环也是一种循环(processElement)。您应该将您的计数器定义为 DoFn 中的一个字段。不用担心 DoFn 会被重用于处理捆绑包。例如:private final Counter counter = Metrics.counter(MyClass.class, COUNTER_NAME); 您也只显示了部分代码,但我没有看到done布尔值设置为 true。但这只是出于好奇。

最后但并非最不重要的一点是,您应该在 Beam 的主分支上尝试 spark runner,因为昨天合并了一个关于指标的修复(在同一 JVM 中运行多个管道时指标不会重置)。我不知道它是否符合您的用例,但值得一试。


推荐阅读