multithreading - 使用 2 个 scala 期货在 foreachPartition 内进行并行 JDBC 插入是否安全?
问题描述
我目前有一个插入 MySQL 数据库的 Spark 流应用程序。我遵循的模式
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
可以在spark 流文档中找到。问题是我想这样修改它,
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val f1 = Future {
//insert data to database 1
val connection = createNewConnection(db1_config)
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
val f2 = Future {
//insert data to database 2
val connection = createNewConnection(db2_config)
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
//await for both f1 and f2 to finish
}
实际上,我发现这行得通,但我想确保我不会在脚上开枪。我在我的应用程序中使用1 个执行器和 1 个核心对此进行了测试,发现当我使用全局执行上下文时,两个期货并行工作。所以我假设全局执行上下文提供了 2 个线程来处理。但是这两个线程是从哪里来的呢?当我将 Spark 作业提交给 YARN 时,我指定了 1 个执行器和 1 个核心。当我在那台机器上执行 LSCPU 时,我发现每个核心有 2 个线程。那么我是在该内核上使用这两个线程还是全局执行上下文为我提供了来自另一个内核的可用线程?我只想知道这是否是一种安全模式,而不是从其他应用程序中“窃取”线程。
解决方案
推荐阅读
- python - Kivy Action按钮无法通过python更新图标
- c++ - 为什么在我输入任何值之前打印此代码?
- java - 显示一年中最低的平均价格,以及该价格的周数,以及它发生的月份的名称
- python - 基于元数据从对象存储中检索对象
- perl - 哪个作业在哪个节点上运行?
- react-admin - 如何使用 ArrayInput 解析和格式化值转换道具
- python - 错误“utf-8”编解码器无法解码位置 268 中的字节 0x96:读取文件 csv 时的起始字节无效
- java - Android Studio View 的显示方式与我的手机(三星 Galaxy S5)不同
- ansible - 在模板 jinja2 方面需要帮助
- c++ - 索引数组时我应该总是使用 size_t 吗?