首页 > 解决方案 > 使用 Spark 在 SQL SERVER 数据库中使用 HIVE 中的数据更新表

问题描述

我在 SQL Server 中有我的主表,我想根据我的主表(在 SQL Server DB 中)和目标表(在 HIVE 中)之间 3 列匹配的条件更新表中的几列。两个表都有多个列,但我只对 6 列感兴趣,如下所示:

我要在主表中更新的 3 列是

"INSPECTED_BY", "INSPECTION_COMMENTS" and "SIGNED_BY"

我想用作匹配条件的列是

"SERVICE_NUMBER", "PART_ID" and "LOTID"

我尝试了下面的代码,但它给了我一个 NullPointerException 错误

val df = spark.table("location_of_my_table_in_hive")
df.show(false)
df.foreachPartition(partition => 
{
    val Connection = DriverManager.getConnection(SQLjdbcURL, SQLusername, SQLPassword)
    val batch_size = 100
    var psmt: PreparedStatement = null 

    partition.grouped(batch_size).foreach(batch => 
    {
        batch.foreach{row => 
            {
                val inspctbyIndex = row.fieldIndex("INSPECTED_BY")
                val inspctby = row.getString(inspctbyIndex)
        
                val inspcomIndex = row.fieldIndex("INSPECT_COMMENTS")
                val inspcom = row.getString(inspcomIndex)
        
                val signIndex = row.fieldIndex("SIGNED_BY")
                val signby = row.getString(signIndex)
        
                val sqlquery = "MERGE INTO SERVICE_LOG_TABLE as LOG" +
                    "USING (VALUES(?, ?, ?))" +
                    "AS ROW(inspctby, inspcom, signby)" +
                    "ON LOG.SERVICE_NUMBER = ROW.SERVICE_NUMBER and LOG.PART_ID = ROW.PART_ID and LOG.LOTID = ROW.LOTID" +
                    "WHEN MATCHED THEN UPDATE SET INSPECTED_BY = 'SMITH', INSPECT_COMMENTS = 'STANDARD_MET', SIGNED_BY = 'WILL'" +
                    "WHEN NOT MATCHED THEN INSERT VALUES(ROW.INSPECTED_BY, ROW.INSPECT_COMMENTS, ROW.SIGNED_BY)"
                var psmt: PreparedStatement = Connection.prepareStatement(sqlquery)
        
                psmt.setString(1, inspctby)
                psmt.setString(2, inspcom)
                psmt.setString(3, signby)
                psmt.addBatch()
            }   
        }
        psmt.executeBatch()
        Connection.commit()
        psmt.close()
    })
    Connection.close()
})

这是错误

ERROR scheduler.TaskSetManager: Task 0 in stage 2.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 
times, most recent failure: Lost task 0.3 in stage 2.0 (TID 9, lwtxa0gzpappr.corp.bankofamerica.com, 
executor 4): java.lang.NullPointerException
    at $anonfun$1$$anonfun$apply$1.apply(/location/service_log.scala:101)
    at $anonfun$1$$anonfun$apply$1.apply(/location/service_log.scala:74)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at $anonfun$1.apply(/location/service_log.scala:74)
    at $anonfun$1.apply(/location/service_log.scala:68)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我搜索了互联网,找不到错误出现的原因。任何帮助,将不胜感激

标签: sql-serverscalaapache-sparkapache-spark-sql

解决方案


如果你在 spark 集群上运行它,我想你可能需要广播一些对象。执行者无法获取对象的值所以空指针异常。


推荐阅读