scala - 使用 spark 将 cosmos 数据同步到雪花并得到 `java.lang.NullPointerException: Null value appears in non-nullable field` 错误
问题描述
我使用 spark 将 cosmos 数据同步到雪花,代码如下:
var cosmosQuery = "SELECT * FROM c";
val readConfig = Config(Map(
"Endpoint" -> endpoint,
"Masterkey" -> cosmosKey,
"Database" -> cosmosSourceDB,
"Collection" -> cosmosSourceCollection,
"ReadChangeFeed" -> "false",
"query_custom" -> cosmosQuery // Optional
))
// Connect via azure-cosmosdb-spark to create Spark DataFrame
val df = ss.read.cosmosDB(readConfig)
val sfOptions = Map(
"sfURL" -> "***.snowflakecomputing.com",
"sfUser" -> sfUser,
"sfRole" -> sfRole,
"pem_private_key" -> pem_private_key,
"sfDatabase" -> sfDatabase,
"sfSchema" -> sfSchema,
"sfWarehouse" -> sfWarehouse
)
df.write.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("dbtable", "cosmosSourceCollection")
.mode(SaveMode.Overwrite)
.save()
但是当我运行它时,df.write.format 中有一个错误,错误是:
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at net.snowflake.spark.snowflake.io.CloudStorage$class.uploadRDD(CloudStorageOperations.scala:771)
at net.snowflake.spark.snowflake.io.InternalAzureStorage.uploadRDD(CloudStorageOperations.scala:901)
at net.snowflake.spark.snowflake.io.CloudStorage$class.upload(CloudStorageOperations.scala:528)
at net.snowflake.spark.snowflake.io.InternalAzureStorage.upload(CloudStorageOperations.scala:901)
at net.snowflake.spark.snowflake.io.StageWriter$.writeToStage(StageWriter.scala:209)
at net.snowflake.spark.snowflake.io.package$.writeRDD(package.scala:50)
at net.snowflake.spark.snowflake.SnowflakeWriter.save(SnowflakeWriter.scala:74)
at net.snowflake.spark.snowflake.DefaultSource.createRelation(DefaultSource.scala:144)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at com.fti.cosmos.AppendToSnowflake$.main(AppendToSnowflake.scala:140)
at com.fti.cosmos.AppendToSnowflake.main(AppendToSnowflake.scala)
Caused by: java.lang.RuntimeException: Error while encoding: java.lang.NullPointerException: Null value appeared in non-nullable field:
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
我添加了读取选项,例如:
val df = ss.read
.option("spark.cosmos.read.inferSchema.enabled", true)
.option("spark.cosmos.read.inferSchema.samplingSize", 10000)
.option("spark.cosmos.read.inferSchema.forceNullableProperties", false).cosmosDB(readConfig)
它仍然得到同样的错误,那么我该如何解决这个错误?是否可以添加任何选项来避免此错误?我参考了官方文档cosmos-spark-guide并在其中找不到合适的选项。
解决方案
推荐阅读
- iis-7.5 - IIS AppPool 没有获得适当的权限
- c# - Visual Studio 不会构建它创建的 ASP.NET Core 项目
- ios - ios 应用因付费应用协议第 3.8(b) 节而被拒绝
- javascript - 我想通过文件传输“状态”
- thingsboard - 如何从小部件更新键的值?
- java - 在spring boot中实现自定义错误
- python - Chart.add_series() 无法以编程方式工作
- c# - WindowsCryptographicException:从 .net 标准 2 项目中的字节数组创建 X509Certificate2 时“找不到请求的对象”
- sql - 在不创建新表或 string_split() 的情况下拆分字符串
- python - 计算出现在另一个列表中的数量