首页 > 解决方案 > Scala 中的哪个线程问题不允许 S3 文件有效加载?

问题描述

我的loadRaw()函数读取S3Stream数据并将数据插入orderItems表中。为了测试该方法,我下载了包含 1 天原始数据的 S3 文件。数据正在加载,但效率非常低,并且waiting for child threads介于两者之间。我在这段代码中做错了什么?-

def loadRaw(file: String, fileHost: String, batchSize: Int, bucket: String): Unit = {
val dataStream = fileHost match {
  case "s3" => S3Stream(s3Client, bucket, file)
  case "local" => LazyFileStream(file)
}
route match {
  case 1 => {
    dataStream.map(line => readJsonFromString(line)).grouped(memsqlBatchSize).foreach { recordSet =>
      val dbRecords = recordSet.filter(f => OrderEvent.isValidCreateOrder(f)).map(OrderEvent.orderItemRowsFromCreateOrder(_).flatten.grouped(batchSize))
      try {
        dbRecords.foreach { batchedList =>
          batchedList.foreach { record =>
            try {
              OrderEvent.setColumnValues(orderItemsInsert, record.toMap)
              orderItemsInsert.addBatch()
            } catch {
              case e: Exception =>
                logger.error(s"Error parsing order items. \n - Exception: ${e.getMessage}\n Event: ${eventToString(record.toMap)}")
                ArgosLogger.sendError(ErrorType.PARSING_ERROR, Some(record.toMap), Some(e))
            }
          }
          orderItemsInsert.executeBatch()
          orderItemsInsert.clearBatch()
        }
        connection.commit
      } catch {
        case e: Exception =>
          Try(connection.rollback())
          throw e;
      }
      /* finally {
        connection.commit
        orderItemsInsert.clearBatch
        logger.debug(s"committed: ${dbRecords.length.toString}")
      }*/
    }
  }
  case _ => {}

这里,orderItemInsert是一个已经准备好的 SQL 语句,一旦使用setColumnValues().

  case "s3load" =>
    val files = if (Cli.s3loader.loadManifest.isEmpty) Cli.s3loader.loadFiles.split(",").toList
    else scala.io.Source.fromFile(Cli.s3loader.loadManifest).getLines.toList
    val fileQueue = new java.util.concurrent.ConcurrentLinkedQueue[String](files)

    val pool = Executors.newFixedThreadPool(Cli.s3loader.parallelism)
    1 to Cli.s3loader.parallelism map {
      index =>
        pool.submit(MemSQLS3Loader(
          Cli.s3loader.memsqlHost,
          Cli.s3loader.targetTable,
          Cli.s3loader.memsqlUserName,
          Cli.s3loader.memsqlPassword,
          Cli.s3loader.memsqlBatchSize,
          Cli.s3loader.bucket,
          Cli.s3loader.route,
          fileQueue)
        )
    }
    while (!fileQueue.isEmpty) {
      logger.info("waiting for child threads")
      Thread.sleep(5000L)
    }
    logger.info("starting pool termination")
    pool.shutdown()
    try {
      pool.awaitTermination(60L, TimeUnit.SECONDS)
      logger.info("pool terminated")
    } catch {
      case e: Throwable =>
        throw (e)
    }

控制台以获得更好的清晰度 - 在此处输入图像描述

标签: multithreadingscalaamazon-s3functional-programmingchild-process

解决方案


推荐阅读