python - 如何在数据帧中使用 sparkSession 使用 spark-cassandra-connector 在 pyspark 中写入
问题描述
我将pyspark和spark-cassandra-connector_2.11-2.3.0.jar与 cassandra DB 一起使用。我正在从一个键空间读取数据帧并写入另一个不同的键空间。这两个键空间有不同的用户名和密码。
我使用以下方法创建了 sparkSession:
spark_session = None
def set_up_spark(sparkconf,config):
"""
sets up spark configuration and create a session
:return: None
"""
try:
logger.info("spark conf set up Started")
global spark_session
spark_conf = SparkConf()
for key, val in sparkconf.items():
spark_conf.set(key, val)
spark_session = SparkSession.builder.config(conf=spark_conf).getOrCreate()
logger.info("spark conf set up Completed")
except Exception as e:
raise e
我使用此 sparkSession 将数据作为数据帧读取为:
table_df = spark_session.read \
.format("org.apache.spark.sql.cassandra") \
.options(table=table_name, keyspace=keyspace_name) \
.load()
我可以使用上述会话读取数据。spark_session 附加到上述查询。
现在我需要创建另一个会话,因为写入表的凭据不同。我的写查询为:
table_df.write \
.format("org.apache.spark.sql.cassandra") \
.options(table=table_name, keyspace=keyspace_name) \
.mode("append") \
.save()
我找不到如何在 cassandra 中为上述写入操作附加新的 sparkSession。
如何使用 spark-cassandra-connector 在 pyspark 中附加新的 SparkSession 以进行写入操作?
解决方案
您可以简单地将这些信息作为选项传递给特定read
或write
操作,这包括spark.cassandra.connection.host
:
请注意,您需要将这些选项放入字典中,并传递此字典而不是直接传递,如文档中所述。
read_options = { "table": "..", "keyspace": "..",
"spark.cassandra.connection.host": "IP1",
"spark.cassandra.auth.username": "username1",
"spark.cassandra.auth.password":"password1"}
table_df = spark_session.read \
.format("org.apache.spark.sql.cassandra") \
.options(**read_options) \
.load()
write_options = { "table": "..", "keyspace": "..",
"spark.cassandra.connection.host": "IP2",
"spark.cassandra.auth.username": "username2",
"spark.cassandra.auth.password":"password1"}
table_df.write \
.format("org.apache.spark.sql.cassandra") \
.options(**write_options) \
.mode("append") \
.save()
推荐阅读
- java - 如何使用 Eclipse JDT AST 向 MethodInvocation 添加类型转换?
- python - Django 模型 - 当用户选择选项时如何添加子类型选项?
- perl - Perl Blowfish/CBC 加密和解密函数
- r - 在一个组内检查最后一行的值是否大于它之前的值
- javascript - Angular 2 路由器 ngIf 动态路由
- typescript - Angular material 6 flex table 使用调整大小符号单独调整列大小
- javascript - 如何使用 Axios 捕获 PHP 响应错误
- angular - 如何摆脱 Angular 6 服务中的冗余请求?
- c# - 如何从发送到谷歌群组邮件地址的电子邮件中下载附件
- java - Android在url中发送数组作为参数