hive - Speeding up Storm Topology
问题描述
We have an application that moves csv files from HDFS to Hive. We are using Storm Topology for that process.
8 machines have been using. Each of them has 22 cores and 512 GB RAM. However, our code runs really slow. It takes 10 minutes to finish to transfer 6 million data.
10 MB of 60 files are transferring to HDFS in one second. We are trying to optimize our code, but it is obvious that we are doing something very wrong.
For Hive table, we have 64 buckets.
In our topology, we have 1 Spout and 2 Bolts. Basically our Spout gets the CSV file, emits lines to first Bolt which is responsible for parsing the data then the Bolt emits to second Bolt which is responsible for HDFS process.
HDFS Spout;
HdfsSpout hdfsSpout = new HdfsSpout()
.withOutputFields(TextFileReader.defaultFields)
.setReaderType("text")
.setHdfsUri(hdfsUri)
.setSourceDir("/data/in")
.setArchiveDir("/data/done")
.setBadFilesDir("/data/bad")
.setClocksInSync(true) // NTP installed on all hosts
.setIgnoreSuffix("_COPYING_")
// do not begin reading file until it is completely copied to HDFS
.setMaxOutstanding(50_000);
Mapper;
DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
.withColumnFields(new Fields(TTDPIRecord.fieldsList))
.withPartitionFields(new Fields(TTDPIRecord.partitionFieldsList));
Hive Options;
HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper)
.withAutoCreatePartitions(true)
.withHeartBeatInterval(3)
.withCallTimeout(10_000) // default = 10.000
.withTxnsPerBatch(2)
.withBatchSize(50_000)
// doing below because its affecting storm metrics most likely
.withTickTupleInterval(1);
Config;
Config conf = new Config();
conf.setNumWorkers(6);
conf.setNumAckers(6);
conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
Topology Builder;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("hdfsSpout", hdfsSpout, 8);
builder.setBolt("recordParserBolt", recordParserBolt, 8).localOrShuffleGrouping("hdfsSpout");
builder.setBolt("hiveBolt", hiveBolt, 8).localOrShuffleGrouping("recordParserBolt");
We are not sure with the following Parameters;
in HDFS Spout; .setMaxOutstanding(50_000);
in Hive Spout Options; .withTxnsPerBatch(2) .withBatchSize(50_000) .withTickTupleInterval(1);
in Config; .setNumWorkers(6); .setNumAckers(6);
Parallelism in Spout and Bolt; We gave 8 for each.
What should be the values for those parameters? Thanks in advance.
Edit; Here is our test result for 10 mb of 100 csv files;
hdfsSpout Executors: 8 Complete Latency: 1834.209 ms
recordParserBolt Executors: 8 Complete Latency: 0.019 ms
hiveBolt Executors: 8 Complete Latency: 1092.624 ms
解决方案
您正在这样做conf.setNumWorkers(6);
意味着您只使用 8 台机器中的 6 台,您可以将其设置为 8 以利用您拥有的所有硬件。
您可以更改的另一个参数是螺栓的并行提示,这意味着组件的执行器(线程)的初始数量。您只给了 8 个并行度,您可以将其增加到 100/200 并查看性能如何变化。
你可以通过这个来了解并行性是如何在风暴中工作的。
你能告诉你max-spout-pending的配置是什么吗?
推荐阅读
- python - Selenium NoSuchElementException 与有效 XPath (Python)
- firebase - 调用函数时没有这样的方法错误
- python - 使用 python 访问 iphone
- python - 如何在pyspark中做一个“proc varclus”等价物?
- twilio - Twilio 代理 + 点击通话
- node.js - 为什么这个数组会占用 1GB 内存?
- c++ - C++ 函数模板需要 & 用于数组参数
- json - JSON Stringify 像 Dart 上的 Web
- flutter - 如何在 Flutter 中为 Divider 添加渐变颜色
- angular - 如何使用角度激活的路由器从 url 访问查询参数?