multithreading - Scala 中的哪个线程问题不允许 S3 文件有效加载?
问题描述
我的loadRaw()
函数读取S3Stream
数据并将数据插入orderItems
表中。为了测试该方法,我下载了包含 1 天原始数据的 S3 文件。数据正在加载,但效率非常低,并且waiting for child threads
介于两者之间。我在这段代码中做错了什么?-
def loadRaw(file: String, fileHost: String, batchSize: Int, bucket: String): Unit = {
val dataStream = fileHost match {
case "s3" => S3Stream(s3Client, bucket, file)
case "local" => LazyFileStream(file)
}
route match {
case 1 => {
dataStream.map(line => readJsonFromString(line)).grouped(memsqlBatchSize).foreach { recordSet =>
val dbRecords = recordSet.filter(f => OrderEvent.isValidCreateOrder(f)).map(OrderEvent.orderItemRowsFromCreateOrder(_).flatten.grouped(batchSize))
try {
dbRecords.foreach { batchedList =>
batchedList.foreach { record =>
try {
OrderEvent.setColumnValues(orderItemsInsert, record.toMap)
orderItemsInsert.addBatch()
} catch {
case e: Exception =>
logger.error(s"Error parsing order items. \n - Exception: ${e.getMessage}\n Event: ${eventToString(record.toMap)}")
ArgosLogger.sendError(ErrorType.PARSING_ERROR, Some(record.toMap), Some(e))
}
}
orderItemsInsert.executeBatch()
orderItemsInsert.clearBatch()
}
connection.commit
} catch {
case e: Exception =>
Try(connection.rollback())
throw e;
}
/* finally {
connection.commit
orderItemsInsert.clearBatch
logger.debug(s"committed: ${dbRecords.length.toString}")
}*/
}
}
case _ => {}
这里,orderItemInsert
是一个已经准备好的 SQL 语句,一旦使用setColumnValues()
.
case "s3load" =>
val files = if (Cli.s3loader.loadManifest.isEmpty) Cli.s3loader.loadFiles.split(",").toList
else scala.io.Source.fromFile(Cli.s3loader.loadManifest).getLines.toList
val fileQueue = new java.util.concurrent.ConcurrentLinkedQueue[String](files)
val pool = Executors.newFixedThreadPool(Cli.s3loader.parallelism)
1 to Cli.s3loader.parallelism map {
index =>
pool.submit(MemSQLS3Loader(
Cli.s3loader.memsqlHost,
Cli.s3loader.targetTable,
Cli.s3loader.memsqlUserName,
Cli.s3loader.memsqlPassword,
Cli.s3loader.memsqlBatchSize,
Cli.s3loader.bucket,
Cli.s3loader.route,
fileQueue)
)
}
while (!fileQueue.isEmpty) {
logger.info("waiting for child threads")
Thread.sleep(5000L)
}
logger.info("starting pool termination")
pool.shutdown()
try {
pool.awaitTermination(60L, TimeUnit.SECONDS)
logger.info("pool terminated")
} catch {
case e: Throwable =>
throw (e)
}
解决方案
推荐阅读
- .htaccess - HTAccess 重写规则影响其他页面?
- jenkins - Jenkins 错误“Microsoft Excel 无法访问该文件”
- javascript - 根据外部对象属性删除对象上的数组元素
- javascript - 在 vega-lite 图表之间共享数据
- python - Django:在部署中,管理员的静态文件在哪里?
- c# - 如何将侦听器从 Java 转换为 C#
- java - TextWatcher 在启动后关闭我的应用程序
- ubuntu-18.04 - 从 ubuntu 服务器备份中排除 lxcfs 文件夹有什么风险?
- c# - 如何在 c# 中创建扩展以返回可以在 linq 中转换为实体的新日期时间条目?
- prometheus - 如何使用 Istio Prometheus 指标计算每分钟的请求数?