java - Spark SQL Java:将数据插入 RDBMS
问题描述
披露:不是经验丰富的 Spark Dev。因此,也欢迎一般的代码反馈。
我们正在尝试从日志中提取数据并将提取的信息存储到 MySQL 数据库中。如果 ID 不存在,我们需要插入,如果已经存在,我们需要更新。我在本地工作,但是当部署在 EMR 上时,它不会写入数据库表。
这是代码,
public static Properties properties = loadProperties();
...
private static void save(SQLContext sqlContext, JavaRDD<Row> rows) {
String queryString = "INSERT INTO "+ properties.getProperty("db.name")+".metrics " +
"(id, start, end, success, sid) VALUES (?,?,?,?,?) " +
"on DUPLICATE KEY " +
"UPDATE end=?,success=?";
Dataset<Row> rowDataset = sqlContext.createDataFrame(rows, getDBDataType());
rowDataset.createOrReplaceTempView("temptable");
int batchsize = (rowDataset.count()/10)>0 ? (int)(rowDataset.count()/10):1;
rowDataset.coalesce(batchsize).foreachPartition(partition -> {
try {
Connection connection = DriverManager.getConnection(properties.getProperty("db.url")+"/"+properties.getProperty("db.dbName")+properties.getProperty("db.ConnectionParam")
, properties.getProperty("db.username"), properties.getProperty("db.password"));
PreparedStatement finalPreparedStatement = connection.prepareStatement(queryString);
partition.forEachRemaining(row -> {
try {
if(finalPreparedStatement!=null){
finalPreparedStatement.setDouble(1, row.getDouble(0));
finalPreparedStatement.setTimestamp(2, row.getTimestamp(1));
finalPreparedStatement.setTimestamp(3, row.getTimestamp(2));
finalPreparedStatement.setBoolean(6, row.getBoolean(5));
finalPreparedStatement.setString(7, row.getString(6));
finalPreparedStatement.addBatch();
}
} catch (SQLException throwables) {
throwables.printStackTrace();
}
});
finalPreparedStatement.executeBatch();
finalPreparedStatement.close();
connection.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
});
}
我做错了什么?我怎样才能解决这个问题?甚至调试这个?
同样,当我在本地运行此作业时,此方法有效。但是在部署到 EMR 时不会将数据写入数据库。可以从 Spark 集群访问数据库,因为数据库和 Spark 集群位于同一个虚拟私有云上。
解决方案
推荐阅读
- python - ValueError:无法将字符串转换为浮点数:'Y'
- php - 使用“to”而不是打印数组中的每个数字
- python - 我如何阅读不和谐消息并将其用作变量?Python
- sql - 在谷歌表格中分组数据?
- c++ - 无法在完美转发中将“int&&”类型的右值引用绑定到“int”类型的左值
- javascript - JavaScript - 检查字符串的子字符串并将子字符串大写
- c# - Azure 人脸检测 API - 发送请求时出错
- sql - 在 Flask 的列中显示一定数量的字符
- redux - ngrx 中的参数化选择器的类型未知
- reactjs - React Ant 设计文件仅上传 xls 文件验证