首页 > 解决方案 > 如何在 Apache Storm 中以编程方式重新启动拓扑的工作者

问题描述

我在 Apache Storm 中遇到问题

问题场景

  1. 当 Small 数据被发送到 Storm 时,数据会被拓扑正确处理(只有 1 个工作线程),并进一步保留在 MongoDB 中。
  2. 但是当数据很大时,它会处理数据并保存在数据库中,但以后无论数据大小都不会接受任何其他数据。

当前解决方法

我们从 Storm UI 重启 worker。

问题

我们可以以编程方式重新启动拓扑工作者吗?

标签: javaapache-storm

解决方案


重新启动工作人员从来都不是一个好的解决方案,您可能会丢失一些元组。正如 Rahim 所回答的那样,最佳实践是利用 Storm 消息可靠性功能。

但是,除了消息可靠性风暴具有内部背压机制外,这意味着当 Spout 注入的数据超过 Bolts 能够处理的数据时,Spout 将自动减速。

如 Rahim 所说,要启用此功能,您首先需要启用 acking。这意味着如果您的拓扑很简单:

喷口 -> 螺栓

喷口会做:

public void nextTuple(){
  ...
  _collector.emit(new Values(tuple), tupleId);
}

@Override
publci void ack(Object msgId) { super.ack(msgId); }

其中 tupleId 只是一个增量计数器count++。通过这种方式,您向 Storm 声明等待确认的新元组。

同时,在连续的螺栓中,以及拓扑中的所有连续螺栓中,或者至少在导致瓶颈的那个之前,您将编写:

public void execute(Tuple tuple){
  ...
  _collector.emit(tuple, new Values(newTuple));
  _collector.ack(tuple);
}

通过这种方式,您会注意到 Storm 元组已被完全处理。

最不重要的是,在您声明拓扑构建器的 main 方法中,您必须定义 Spout 将等待的最大元组数:

Config conf = new Config();
conf.setMaxSpoutPending(100);

这样,Spout 将开始生成新的元组等待它们被确认,如果(在这种情况下)未决元组的数量超过 100,则 spout 将停止调用方法 nextTuple,等待它们被确认然后生成新的.

注意:值 100 只是一个示例,您可能需要稍微调整一下以根据您的情况对其进行优化。

Rahim 共享的链接应该足以理解机制,无论如何,如果您想深入了解实现,我添加此链接:


推荐阅读