sql-server - 使用 Spark 在 SQL SERVER 数据库中使用 HIVE 中的数据更新表
问题描述
我在 SQL Server 中有我的主表,我想根据我的主表(在 SQL Server DB 中)和目标表(在 HIVE 中)之间 3 列匹配的条件更新表中的几列。两个表都有多个列,但我只对 6 列感兴趣,如下所示:
我要在主表中更新的 3 列是
"INSPECTED_BY", "INSPECTION_COMMENTS" and "SIGNED_BY"
我想用作匹配条件的列是
"SERVICE_NUMBER", "PART_ID" and "LOTID"
我尝试了下面的代码,但它给了我一个 NullPointerException 错误
val df = spark.table("location_of_my_table_in_hive")
df.show(false)
df.foreachPartition(partition =>
{
val Connection = DriverManager.getConnection(SQLjdbcURL, SQLusername, SQLPassword)
val batch_size = 100
var psmt: PreparedStatement = null
partition.grouped(batch_size).foreach(batch =>
{
batch.foreach{row =>
{
val inspctbyIndex = row.fieldIndex("INSPECTED_BY")
val inspctby = row.getString(inspctbyIndex)
val inspcomIndex = row.fieldIndex("INSPECT_COMMENTS")
val inspcom = row.getString(inspcomIndex)
val signIndex = row.fieldIndex("SIGNED_BY")
val signby = row.getString(signIndex)
val sqlquery = "MERGE INTO SERVICE_LOG_TABLE as LOG" +
"USING (VALUES(?, ?, ?))" +
"AS ROW(inspctby, inspcom, signby)" +
"ON LOG.SERVICE_NUMBER = ROW.SERVICE_NUMBER and LOG.PART_ID = ROW.PART_ID and LOG.LOTID = ROW.LOTID" +
"WHEN MATCHED THEN UPDATE SET INSPECTED_BY = 'SMITH', INSPECT_COMMENTS = 'STANDARD_MET', SIGNED_BY = 'WILL'" +
"WHEN NOT MATCHED THEN INSERT VALUES(ROW.INSPECTED_BY, ROW.INSPECT_COMMENTS, ROW.SIGNED_BY)"
var psmt: PreparedStatement = Connection.prepareStatement(sqlquery)
psmt.setString(1, inspctby)
psmt.setString(2, inspcom)
psmt.setString(3, signby)
psmt.addBatch()
}
}
psmt.executeBatch()
Connection.commit()
psmt.close()
})
Connection.close()
})
这是错误
ERROR scheduler.TaskSetManager: Task 0 in stage 2.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4
times, most recent failure: Lost task 0.3 in stage 2.0 (TID 9, lwtxa0gzpappr.corp.bankofamerica.com,
executor 4): java.lang.NullPointerException
at $anonfun$1$$anonfun$apply$1.apply(/location/service_log.scala:101)
at $anonfun$1$$anonfun$apply$1.apply(/location/service_log.scala:74)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at $anonfun$1.apply(/location/service_log.scala:74)
at $anonfun$1.apply(/location/service_log.scala:68)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我搜索了互联网,找不到错误出现的原因。任何帮助,将不胜感激
解决方案
如果你在 spark 集群上运行它,我想你可能需要广播一些对象。执行者无法获取对象的值所以空指针异常。
推荐阅读
- php - 对 DataTable 的 DateTime 字段进行排序
- vba - 如何使用 VBA 保存 Excel 加载项
- java - Gradle 使用 2 个不同的 JDK
- sql-server - Sql server 索引使用取决于列值
- css - 如何更改禁用光标
- apache-camel - 骆驼豆绑定:从变量中设置参数
- javascript - 防止客户端编辑功能的最佳实践?
- android - 在 ArFragment 中隐藏加载屏幕
- google-chrome - 清除 Chrome 以使 Google Tag Assistant 重新加载?
- html - 带有 Action 的 Angular 5 简单表单不起作用