首页 > 解决方案 > 从 spark foreach 分区插入 mysql 时记录丢失

问题描述

我有一份工作,我从 avro 文件中读取数据并对数据执行转换。之后,我使用 foreachpartitions 将这些数据帧记录插入到我的 mysql 数据库中。

但!!我已经看到我的 5% 记录没有插入 mysql,我也没有收到任何错误。

可能的原因是什么?请帮我!!

这是快照

df.repartition(1).foreachPartition { partition =>

  val connectionProperties = brConnect.value

  val jdbcurl = connectionProperties.getProperty("jdbcurl")
  val user = connectionProperties.getProperty("user")
  val pwd = connectionProperties.getProperty("password")

  Class.forName(jdbcdriver)

  val dbc: Connection = DriverManager.getConnection(jdbcurl, user, pwd)
  var pstmt: PreparedStatement = null
  dbc.setAutoCommit(false)
  val db_batchsize = 1000
  

  
  partition.grouped(db_batchsize).foreach { batch =>

  val sqlString =s"""INSERT INTO test.table1 (alarm_id,alarm_type)select ?,? from dual where not exists (select alarm_id from test.table2) LIMIT 1"""

    pstmt = dbc.prepareStatement(sqlString)

    batch.foreach { row =>

      var AlarmidIndex = row.fieldIndex("alarm_id")
      var alarmid = row.getString(AlarmidIndex)

      var AlarmtypeIndex = row.fieldIndex("alarm_type")
      var alarmtype = row.getString(AlarmtypeIndex)

      pstmt.setString(1, alarmid)
      pstmt.setString(2, alarmtype)
      pstmt.addBatch()
    }

  pstmt.executeBatch()
  dbc.commit()
  pstmt.close()

  }
  dbc.close()

}

标签: mysqlapache-spark

解决方案


推荐阅读