首页 > 解决方案 > 使用 spark 将 cosmos 数据同步到雪花并得到 `java.lang.NullPointerException: Null value appears in non-nullable field` 错误

问题描述

我使用 spark 将 cosmos 数据同步到雪花,代码如下:

var cosmosQuery = "SELECT * FROM c";
val readConfig = Config(Map(
      "Endpoint" -> endpoint,
      "Masterkey" -> cosmosKey,
      "Database" -> cosmosSourceDB,
      "Collection" -> cosmosSourceCollection,
      "ReadChangeFeed" -> "false",
      "query_custom" -> cosmosQuery // Optional
    ))

 // Connect via azure-cosmosdb-spark to create Spark DataFrame
 val df = ss.read.cosmosDB(readConfig)
 val sfOptions = Map(
    "sfURL" -> "***.snowflakecomputing.com",
    "sfUser" -> sfUser,
    "sfRole" -> sfRole,
    "pem_private_key" -> pem_private_key,
    "sfDatabase" -> sfDatabase,
    "sfSchema" -> sfSchema,
    "sfWarehouse" -> sfWarehouse
 )
df.write.format(SNOWFLAKE_SOURCE_NAME)
        .options(sfOptions)
        .option("dbtable", "cosmosSourceCollection")
        .mode(SaveMode.Overwrite)
        .save()

但是当我运行它时,df.write.format 中有一个错误,错误是:

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at net.snowflake.spark.snowflake.io.CloudStorage$class.uploadRDD(CloudStorageOperations.scala:771)
    at net.snowflake.spark.snowflake.io.InternalAzureStorage.uploadRDD(CloudStorageOperations.scala:901)
    at net.snowflake.spark.snowflake.io.CloudStorage$class.upload(CloudStorageOperations.scala:528)
    at net.snowflake.spark.snowflake.io.InternalAzureStorage.upload(CloudStorageOperations.scala:901)
    at net.snowflake.spark.snowflake.io.StageWriter$.writeToStage(StageWriter.scala:209)
    at net.snowflake.spark.snowflake.io.package$.writeRDD(package.scala:50)
    at net.snowflake.spark.snowflake.SnowflakeWriter.save(SnowflakeWriter.scala:74)
    at net.snowflake.spark.snowflake.DefaultSource.createRelation(DefaultSource.scala:144)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at com.fti.cosmos.AppendToSnowflake$.main(AppendToSnowflake.scala:140)
    at com.fti.cosmos.AppendToSnowflake.main(AppendToSnowflake.scala)
Caused by: java.lang.RuntimeException: Error while encoding: java.lang.NullPointerException: Null value appeared in non-nullable field:
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).

我添加了读取选项,例如:

val df = ss.read
      .option("spark.cosmos.read.inferSchema.enabled", true)
      .option("spark.cosmos.read.inferSchema.samplingSize", 10000)
      .option("spark.cosmos.read.inferSchema.forceNullableProperties", false).cosmosDB(readConfig)

它仍然得到同样的错误,那么我该如何解决这个错误?是否可以添加任何选项来避免此错误?我参考了官方文档cosmos-spark-guide并在其中找不到合适的选项。

标签: scalaapache-sparkazure-cosmosdbsnowflake-cloud-data-platform

解决方案


推荐阅读