apache-spark - 使用 pyspark 将数据从 pyspark 数据帧插入到另一个 cassandra 表
问题描述
我有一个cassandra表 -测试:
+----+---------+---------+
| id | country | counter |
+====+=========+=========+
| A | RU | 1 |
+----+---------+---------+
| B | EN | 2 |
+----+---------+---------+
| C | IQ | 1 |
+----+---------+---------+
| D | RU | 3 |
+----+---------+---------+
此外,我在同一空间中有一个主表,其中包含“country_main”和“main_id”列。在 main_id 列中,我有与测试表中相同的 id,而且我有一些唯一的 id。country_main 具有空值,与测试中的相同。例如:
+---------+--------------+---------+
| main_id | country_main | ...|
+=========+==============+=========+
| A | | ...|
+---------+--------------+---------+
| B | EN | ...|
+---------+--------------+---------+
| Y | IQ | ...|
+---------+--------------+---------+
| Z | RU | ...|
+---------+--------------+---------+
如何使用pyspark将数据从测试表插入主表以根据ID填充country_main中的空值?
解决方案
具有以下架构和数据:
create table test.ct1 (
id text primary key,
country text,
cnt int);
insert into test.ct1(id, country, cnt) values('A', 'RU', 1);
insert into test.ct1(id, country, cnt) values('B', 'EN', 2);
insert into test.ct1(id, country, cnt) values('C', 'IQ', 1);
insert into test.ct1(id, country, cnt) values('D', 'RU', 3);
create table test.ct2 (
main_id text primary key,
country_main text,
cnt int);
insert into test.ct2(main_id, cnt) values('A', 1);
insert into test.ct2(main_id, country_main, cnt) values('B', 'EN', 2);
insert into test.ct2(main_id, country_main, cnt) values('C', 'IQ', 1);
insert into test.ct2(main_id, country_main, cnt) values('D', 'RU', 3);
它应该是这样的:
from pyspark.sql.functions import *
ct1 = spark.read.format("org.apache.spark.sql.cassandra")\
.option("table", "ct1").option("keyspace", "test").load()
ct2 = spark.read.format("org.apache.spark.sql.cassandra")\
.option("table", "ct2").option("keyspace", "test").load()\
.where(col("country_main").isNull())
res = ct1.join(ct2, ct1.id == ct2.main_id).select(col("main_id"),
col("country").alias("country_main"))
res.write.format("org.apache.spark.sql.cassandra")\
.option("table", "ct2").option("keyspace", "test")\
.mode("append").save()
什么代码:
- 从
ct2
(对应于您的main
表)中选择所有行 wherecountry_main
isnull
; - 执行与
ct1
(对应于您的test
表)的连接以从中获取国家/地区的值(优化可能是从两个表中仅选择必要的列)。另外,请注意连接是由 Spark 完成的,而不是在 Cassandra 级别上 - Cassandra 级别的连接将仅在即将发布的 Spark Cassandra 连接器版本中支持(3.0,但 alpha 版本已经发布); - 重命名列以匹配
ct2
表结构; - 写回数据。
结果:
cqlsh> select * from test.ct2;
main_id | cnt | country_main
---------+-----+--------------
C | 1 | IQ
B | 2 | EN
A | 1 | RU
D | 3 | RU
对于源数据:
cqlsh> select * from test.ct2;
main_id | cnt | country_main
---------+-----+--------------
C | 1 | IQ
B | 2 | EN
A | 1 | null
D | 3 | RU
推荐阅读
- python - 为我的 for 循环的每次迭代创建唯一的 CSV
- c - C中的Sigaction处理程序
- bash - 从 bash 脚本中将带引号的字符串参数传递给 awk
- android - 如何为 RecyclerView 适配器设置监听器?
- angular6 - 修复 Internet Explorer 11 + Angular 6+ 项目中的语法错误
- javascript - 未捕获的类型错误:无法读取未定义的属性“ReactDebugCurrentFrame”
- docker - Docker swarm leave --force - 超出上下文截止日期
- kubernetes - Prometheus 警报规则:将表达式中的指标与标签中的值进行比较
- repository - 带有 CQRS 的 DDD 中的通用存储库模式,有意义吗?
- javascript - Flatlist 不会显示来自 json 的图像(React Native)