首页 > 解决方案 > 与 foreachRDD Spark Streaming 的数据库连接

问题描述

我在流式传输数据时创建并传递与数据库的连接。每次从文件中读取数据并创建 Neo4j 会话都会增加性能开销。如何更改现有代码以提高应用程序的性能?我应该将 foreachRDD 更改为 foreachPartition 以便为连接创建单独的对象吗?

这是我的流式传输代码:

val wordsArrays: DStream[Array[String]] = values.map(t => t.split(", "))

wordsArrays.foreachRDD(rdd => {

  rdd.flatMap(
  data => {
    val recommendations = execNeo4jSearchQuery(neo4jConfigs.getNeo4jConfig(args(1)), data)
    val calendarTime = Calendar.getInstance.getTime
    val recommendationsMap = convertDataToMap(recommendations, calendarTime)
    recommendationsMap

  }).saveToEs("rdd-timed/output")
 }
)

标签: scalaapache-sparkneo4jspark-streaming

解决方案


foreachPartiotion 使您能够为每个分区而不是每个映射迭代创建一个对象,当您需要为每个分区创建单个连接时,它很有用。

但在您的情况下,您创建的所有对象似乎都取决于地图的输入值或当前时间。所以我看不出它会对你有什么帮助。

除非您在每次运行 execNeo4jSearchQuery 时创建一个连接,否则我看不出它对您有何帮助,但如果您确实为不依赖数据的函数的每次调用创建一个连接,那么它将有助于改进代码。(但很有可能瓶颈不存在,所以你不会看到很大的改进)。


推荐阅读