首页 > 解决方案 > 如何查看正在运行的 Storm 拓扑的当前输出?

问题描述

目前正在学习如何使用 Storm(2.1.0 版),我对这个数据流处理 (DSP) 引擎的特定方面有点困惑:如何处理输出数据?教程对系统设置和运行我们的第一个应用程序提供了很好的解释。不幸的是,我没有找到提供拓扑生成结果详细信息的页面。

对于 DSP 应用程序,没有最终输出,因为输入数据是连续传入的数据流(或者我们可以说当应用程序停止时有最终输出)。我想要的是能够看到正在运行的拓扑的当前输出状态(当前生成的实际输出数据)。

我能够运行WordCountTopology。我了解此拓扑的输出是由以下代码片段生成的:

public static class WordCount extends BaseBasicBolt {
    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String word = tuple.getString(0);
        Integer count = counts.get(word);
        if (count == null) {
            count = 0;
        }
        count++;
        counts.put(word, count);
        collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

我的误解是关于<"word":string, "count":int>输出的位置。它只是在内存中,写在某处的数据库中,写在文件中吗?

进一步探讨这个问题:存储正在进行的输出数据的现有可能性是什么?处理此类数据的“好方法”是什么?

我希望我的问题不要太天真。并感谢 StackOverflow 社区始终提供良好的帮助。

标签: apache-storm

解决方案


自从我发布这个问题以来已经过去了几天。我回来与您分享我的尝试。虽然我无法判断这是否是正确的做法,但以下两个命题回答了我的问题。

简单 System.out.println()

我尝试的第一件事是System.out.println("Hello World!")直接在prepare()我的BaseBasicBolt方法中创建一个。此方法仅在每个 Bolt 线程执行开始时调用一次。

public void prepare(Map topoConf, TopologyContext context) {
  System.out.println("Hello World!");   
}

最大的挑战是弄清楚日志的写入位置。默认情况下,它写在请求的工作程序/插槽的端口所在的位置<storm installation folder>/logs/workers-artifacts/<topology name>/<worker-port>/worker.log<worker-port>

例如,conf.setNumWorkers(3)拓扑请求访问 3 个工作人员(3 个插槽)。因此, 的值<worker-port>将是 6700、6701 和 6702。这些值是 3 个插槽的端口号(在storm.yaml下定义supervisor.slots.ports)。

注意:您将拥有与BaseBasicBolt平行大小一样多的“ Hello World! ” 。当split bolt 用 实例化时,它会产生 8 个并行线程,每个线程都写入自己的日志。builder.setBolt("split", new SplitSentence(), 8)

写入文件

出于研究目的,我必须以特定格式分析我需要的大量日志。我找到的解决方案是将日志附加到每个螺栓管理的特定文件中。

以下是我自己为计数螺栓实现的文件日志记录解决方案。

public static class WordCount extends BaseBasicBolt {
    private String workerName;
    private FileWriter fw;
    private BufferedWriter bw;
    private PrintWriter out;
    private String logFile = "/var/log/storm/count.log";
    private Map<String, Integer> counts = new HashMap<String, Integer>();

    public void prepare(Map topoConf, TopologyContext context) {
        this.workerName = this.toString();
        try {
            this.fw = new FileWriter(logFile, true);
            this.bw = new BufferedWriter(fw);
            this.out = new PrintWriter(bw);
        } catch (Exception e) {
            System.out.println(e);
        }
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String word = tuple.getString(0);
        Integer count = counts.get(word);
        if (count == null) {
            count = 0;
        }
        count++;
        counts.put(word, count);
        collector.emit(new Values(word, count));

        out.println(this.workerName + ": Hello World!");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

在这段代码中,我的日志文件位于该文件的末尾/var/log/storm/count.log并调用out.println(text)附加。text由于我不确定它是否是线程安全的,所有并行线程同时写入同一个文件可能会导致数据丢失。

注意:如果你的 bolts 分布在多台机器上,每台机器都有自己的日志文件。在我的测试过程中,我配置了一个带有 1 台机器的简单集群(运行 Nimbus + Supervisor + UI),因此我只有 1 个日志文件。

结论

有多种方法可以处理输出数据,更普遍的是使用 Storm 记录任何内容。我没有找到任何官方的方式来做这件事,并且关于这个主题的文档非常简单。

虽然我们中的一些人会对简单sysout.println()的 . 使用 Java 可以做的任何事情都可以使用 Storm,因为它是简单的 Java 编程。

任何完成此答案的建议和其他评论将不胜感激。


推荐阅读