首页 > 解决方案 > spark setCassandraConf 没有按预期工作

问题描述

我正在使用 .setCassandraConf(c_options_conf) 设置 sparkSession 以连接 cassandra 集群,如下所示。

工作正常:

 val spark = SparkSession
      .builder()
      .appName("DatabaseMigrationUtility")
      .config("spark.master",devProps.getString("deploymentMaster"))
      .getOrCreate()
                .setCassandraConf(c_options_conf)

如果我使用下面的数据帧写入器对象保存表,它指向配置的集群并完美保存在 Cassandra 中,如下所示

 writeDfToCassandra(o_vals_df, key_space , "model_vals"); //working fine using o_vals_df.

但是,如果说如下,它指向 localhost 而不是 cassandra 集群并且无法保存。

不工作:

import spark.implicits._
val sc = spark.sparkContext

val audit_df = sc.parallelize(Seq(LogCaseClass(columnFamilyName, status,
      error_msg,currentDate,currentTimeStamp, updated_user))).saveToCassandra(keyspace, columnFamilyName);

它在尝试连接本地主机时抛出错误。

错误:

Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All
host(s) tried for query failed (tried: localhost/127.0.0.1:9042
(com.datastax.driver.core.exceptions.TransportException:
[localhost/127.0.0.1:9042] Cannot connect))
            at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:233)

这里有什么问题?为什么即使 sparkSession 设置为 cassandra 集群和早期方法工作正常,它仍指向默认本地主机。

标签: apache-sparkdatabricksdatastax-enterprisecassandra-3.0spark-cassandra-connector

解决方案


我们需要使用两种设置方法来设置配置SparkSession,即 .config(conf).setCassandraConf(c_options_conf)具有相同的值,如下所示

  val spark = SparkSession
        .builder()
        .appName("DatabaseMigrationUtility")
        .config("spark.master",devProps.getString("deploymentMaster"))
        .config("spark.dynamicAllocation.enabled",devProps.getString("spark.dynamicAllocation.enabled"))
        .config("spark.executor.memory",devProps.getString("spark.executor.memory"))
        .config("spark.executor.cores",devProps.getString("spark.executor.cores"))
        .config("spark.executor.instances",devProps.getString("spark.executor.instances"))
        .config(conf)

        .getOrCreate()
        .setCassandraConf(c_options_conf)

然后我会为 cassandra latest api 以及 RDD/DF Api 工作。


推荐阅读