java - 如何在 Apache Storm 中以编程方式重新启动拓扑的工作者
问题描述
我在 Apache Storm 中遇到问题
问题场景:
- 当 Small 数据被发送到 Storm 时,数据会被拓扑正确处理(只有 1 个工作线程),并进一步保留在 MongoDB 中。
- 但是当数据很大时,它会处理数据并保存在数据库中,但以后无论数据大小都不会接受任何其他数据。
当前解决方法:
我们从 Storm UI 重启 worker。
问题:
我们可以以编程方式重新启动拓扑工作者吗?
解决方案
重新启动工作人员从来都不是一个好的解决方案,您可能会丢失一些元组。正如 Rahim 所回答的那样,最佳实践是利用 Storm 消息可靠性功能。
但是,除了消息可靠性风暴具有内部背压机制外,这意味着当 Spout 注入的数据超过 Bolts 能够处理的数据时,Spout 将自动减速。
如 Rahim 所说,要启用此功能,您首先需要启用 acking。这意味着如果您的拓扑很简单:
喷口 -> 螺栓
喷口会做:
public void nextTuple(){
...
_collector.emit(new Values(tuple), tupleId);
}
@Override
publci void ack(Object msgId) { super.ack(msgId); }
其中 tupleId 只是一个增量计数器count++
。通过这种方式,您向 Storm 声明等待确认的新元组。
同时,在连续的螺栓中,以及拓扑中的所有连续螺栓中,或者至少在导致瓶颈的那个之前,您将编写:
public void execute(Tuple tuple){
...
_collector.emit(tuple, new Values(newTuple));
_collector.ack(tuple);
}
通过这种方式,您会注意到 Storm 元组已被完全处理。
最不重要的是,在您声明拓扑构建器的 main 方法中,您必须定义 Spout 将等待的最大元组数:
Config conf = new Config();
conf.setMaxSpoutPending(100);
这样,Spout 将开始生成新的元组等待它们被确认,如果(在这种情况下)未决元组的数量超过 100,则 spout 将停止调用方法 nextTuple,等待它们被确认然后生成新的.
注意:值 100 只是一个示例,您可能需要稍微调整一下以根据您的情况对其进行优化。
Rahim 共享的链接应该足以理解机制,无论如何,如果您想深入了解实现,我添加此链接:
推荐阅读
- azure - 如何为 Azure Pipelines Docker 任务指定当前目录?
- python-3.5 - 这是我正在尝试对嵌套列表进行切片,但我得到以下输出。解释?
- in-app-purchase - Costco 如何避免使用 Apple In App Purchase
- laravel - Laravel 运行一组带有特定守卫的中间件
- c# - UWP Xaml 旋转复选框修剪标签
- php - HTML 表单提交按钮显示代码而不是执行它
- python - 具有两个输出的 Keras MSE 损失
- reactjs - 如何在 React 组件中更新 Signalr 数据
- ios - NSURLSession GET 请求仅通过 HTTPS 以 403 失败 - 代理时除外
- python - 在重组文本文件、Windows 10 上运行 make docs 时出现回车问题