scala - 与 foreachRDD Spark Streaming 的数据库连接
问题描述
我在流式传输数据时创建并传递与数据库的连接。每次从文件中读取数据并创建 Neo4j 会话都会增加性能开销。如何更改现有代码以提高应用程序的性能?我应该将 foreachRDD 更改为 foreachPartition 以便为连接创建单独的对象吗?
这是我的流式传输代码:
val wordsArrays: DStream[Array[String]] = values.map(t => t.split(", "))
wordsArrays.foreachRDD(rdd => {
rdd.flatMap(
data => {
val recommendations = execNeo4jSearchQuery(neo4jConfigs.getNeo4jConfig(args(1)), data)
val calendarTime = Calendar.getInstance.getTime
val recommendationsMap = convertDataToMap(recommendations, calendarTime)
recommendationsMap
}).saveToEs("rdd-timed/output")
}
)
解决方案
foreachPartiotion 使您能够为每个分区而不是每个映射迭代创建一个对象,当您需要为每个分区创建单个连接时,它很有用。
但在您的情况下,您创建的所有对象似乎都取决于地图的输入值或当前时间。所以我看不出它会对你有什么帮助。
除非您在每次运行 execNeo4jSearchQuery 时创建一个连接,否则我看不出它对您有何帮助,但如果您确实为不依赖数据的函数的每次调用创建一个连接,那么它将有助于改进代码。(但很有可能瓶颈不存在,所以你不会看到很大的改进)。
推荐阅读
- docker - 错误:无法启动 nginx,因为网络无法在 alpine docker 映像上启动
- flutter - Flutter 删除重复的 ListView
- html - 如何以html下载保存在数据库中的图像
- mysql - 如何导入包含大量数据并具有包含一些空值的列的csv文件
- javascript - 未找到模块:无法解析“D:\gmail-react\src”中的“firebase”
- swift - SwiftUI Core Data NSManagedObjectContext“无法识别的选择器发送到实例”错误
- mysql - 如何查询在另一个表中有许多关联行的记录被视为一行
- c++ - C++ 中对 const Callable 的引用和对 Callable 的引用之间的区别
- postfix-notation - 下面的表达式中缀到后缀的转换是什么?
- phpmyadmin - phpmyadmin 不显示登录表单,只显示徽标