首页 > 解决方案 > Spark SQL Java:将数据插入 RDBMS

问题描述

披露:不是经验丰富的 Spark Dev。因此,也欢迎一般的代码反馈。

我们正在尝试从日志中提取数据并将提取的信息存储到 MySQL 数据库中。如果 ID 不存在,我们需要插入,如果已经存在,我们需要更新。我在本地工作,但是当部署在 EMR 上时,它不会写入数据库表。

这是代码,

public static Properties properties = loadProperties();

...

private static void save(SQLContext sqlContext, JavaRDD<Row> rows) {
    String queryString = "INSERT INTO "+ properties.getProperty("db.name")+".metrics " +
            "(id, start, end, success, sid) VALUES (?,?,?,?,?) " +
            "on DUPLICATE KEY " +
            "UPDATE end=?,success=?";

    Dataset<Row> rowDataset = sqlContext.createDataFrame(rows, getDBDataType());
    rowDataset.createOrReplaceTempView("temptable");
    int batchsize = (rowDataset.count()/10)>0 ? (int)(rowDataset.count()/10):1;
    rowDataset.coalesce(batchsize).foreachPartition(partition -> {
        try {
            Connection connection = DriverManager.getConnection(properties.getProperty("db.url")+"/"+properties.getProperty("db.dbName")+properties.getProperty("db.ConnectionParam")
                    , properties.getProperty("db.username"), properties.getProperty("db.password"));

            PreparedStatement finalPreparedStatement = connection.prepareStatement(queryString);
            partition.forEachRemaining(row -> {
                try {
                    if(finalPreparedStatement!=null){
                        finalPreparedStatement.setDouble(1, row.getDouble(0));
                        finalPreparedStatement.setTimestamp(2, row.getTimestamp(1));
                        finalPreparedStatement.setTimestamp(3, row.getTimestamp(2));
                        finalPreparedStatement.setBoolean(6, row.getBoolean(5));
                        finalPreparedStatement.setString(7, row.getString(6));
                        
                        finalPreparedStatement.addBatch();
                    }
                } catch (SQLException throwables) {
                    throwables.printStackTrace();
                }
            });
            finalPreparedStatement.executeBatch();
            finalPreparedStatement.close();
            connection.close();
        } catch (SQLException throwables) {
            throwables.printStackTrace();
        }
    });
}

我做错了什么?我怎样才能解决这个问题?甚至调试这个?

同样,当我在本地运行此作业时,此方法有效。但是在部署到 EMR 时不会将数据写入数据库。可以从 Spark 集群访问数据库,因为数据库和 Spark 集群位于同一个虚拟私有云上。

标签: javaapache-sparkapache-spark-sql

解决方案


推荐阅读