首页 > 解决方案 > 在 Spark 流 Scala 中将 RDD 写入弹性搜索需要时间

问题描述

我开发了火花流(接收器方法),它从 kafka 读取数据并处理数据并写入弹性搜索。

相同的代码是在 java 中开发的(现在我们在 spark scala 中编写相同的代码),当我们与 java 性能进行比较时,spark 表现不佳。

我观察到的是,当我们写信给 ES 时,它需要时间。

这是我的高级代码:

val kafkaStreams: util.List[DStream[String]] = new util.ArrayList[DStream[String]]

for(i <- 0 until topic_threads){
      var data = KafkaUtils.createStream(ssc,kafkaConf,topic).map(line => line._2)
      kafkaStreams.add(data)
    }

//根据 spark 1.6.2 文档,下面的 union 提高了性能

val unifiedStream = ssc.union(kafkaStreams)


unifiedStream.persist(StorageLevel.MEMORY_ONLY)
if(flagY){
   val dataES = unifiedStream.map(rdd => processData(rdd))
   dataES.foreachRDD(rdd => {
     ElasticUtils.saveToEs(rdd, index_Name, index_Type)
})

在 processData 方法中,我只是解析我们从 kafka 获得的红色数据。

任何人都可以让我知道您的经验或建议,以提高火花蒸汽(scala)接收器的性能。

由于这种低性能,批次正在堆积,并且批次调度的延迟越来越大。

标签: scalaapache-sparkelasticsearchspark-streaming

解决方案


推荐阅读