mysql - 从 spark foreach 分区插入 mysql 时记录丢失
问题描述
我有一份工作,我从 avro 文件中读取数据并对数据执行转换。之后,我使用 foreachpartitions 将这些数据帧记录插入到我的 mysql 数据库中。
但!!我已经看到我的 5% 记录没有插入 mysql,我也没有收到任何错误。
可能的原因是什么?请帮我!!
这是快照
df.repartition(1).foreachPartition { partition =>
val connectionProperties = brConnect.value
val jdbcurl = connectionProperties.getProperty("jdbcurl")
val user = connectionProperties.getProperty("user")
val pwd = connectionProperties.getProperty("password")
Class.forName(jdbcdriver)
val dbc: Connection = DriverManager.getConnection(jdbcurl, user, pwd)
var pstmt: PreparedStatement = null
dbc.setAutoCommit(false)
val db_batchsize = 1000
partition.grouped(db_batchsize).foreach { batch =>
val sqlString =s"""INSERT INTO test.table1 (alarm_id,alarm_type)select ?,? from dual where not exists (select alarm_id from test.table2) LIMIT 1"""
pstmt = dbc.prepareStatement(sqlString)
batch.foreach { row =>
var AlarmidIndex = row.fieldIndex("alarm_id")
var alarmid = row.getString(AlarmidIndex)
var AlarmtypeIndex = row.fieldIndex("alarm_type")
var alarmtype = row.getString(AlarmtypeIndex)
pstmt.setString(1, alarmid)
pstmt.setString(2, alarmtype)
pstmt.addBatch()
}
pstmt.executeBatch()
dbc.commit()
pstmt.close()
}
dbc.close()
}
解决方案
推荐阅读
- php - Unlink/Delete files which starts with a dot
- user-interface - Flutter Gesture detector onTap problems
- python - Python: remove elements of a list satisfying a condition
- node.js - 是否可以将文件发布到 MQTT 服务器?
- javascript - 更改内容时自动刷新仅显示多条记录中的一条
- html - 如何在使用 CSS Grid 自动调整属性时完美地居中网格项?
- php - Natsort 数组并设置键值
- rust - 将结构作为参数的通用函数?
- c++ - 尽管循环了一半的迭代,但循环速度不快?
- java - VS 服务接口