首页 > 解决方案 > 使用 pyspark 将数据从 pyspark 数据帧插入到另一个 cassandra 表

问题描述

我有一个cassandra表 -测试

+----+---------+---------+
| id | country | counter |
+====+=========+=========+
|  A |      RU |       1 |
+----+---------+---------+
|  B |      EN |       2 |
+----+---------+---------+
|  C |      IQ |       1 |
+----+---------+---------+
|  D |      RU |       3 |
+----+---------+---------+

此外,我在同一空间中有一个表,其中包含“country_main”和“main_id”列。在 main_id 列中,我有与测试表中相同的 id,而且我有一些唯一的 id。country_main 具有空值,与测试中的相同。例如:

+---------+--------------+---------+
| main_id | country_main |      ...|
+=========+==============+=========+
|  A      |              |      ...|
+---------+--------------+---------+
|  B      |      EN      |      ...|
+---------+--------------+---------+
|  Y      |      IQ      |      ...|
+---------+--------------+---------+
|  Z      |      RU      |      ...|
+---------+--------------+---------+

如何使用pyspark将数据从测试表插入主表以根据ID填充country_main中的空值?

标签: apache-sparkpysparkcassandraspark-cassandra-connector

解决方案


具有以下架构和数据:

create table test.ct1 (
  id text primary key,
  country text,
  cnt int);

insert into test.ct1(id, country, cnt) values('A', 'RU', 1);
insert into test.ct1(id, country, cnt) values('B', 'EN', 2);
insert into test.ct1(id, country, cnt) values('C', 'IQ', 1);
insert into test.ct1(id, country, cnt) values('D', 'RU', 3);


create table test.ct2 (
  main_id text primary key,
  country_main text,
  cnt int);

insert into test.ct2(main_id, cnt) values('A', 1);
insert into test.ct2(main_id, country_main, cnt) values('B', 'EN', 2);
insert into test.ct2(main_id, country_main, cnt) values('C', 'IQ', 1);
insert into test.ct2(main_id, country_main, cnt) values('D', 'RU', 3);

它应该是这样的:

from pyspark.sql.functions import *

ct1 = spark.read.format("org.apache.spark.sql.cassandra")\
   .option("table", "ct1").option("keyspace", "test").load()

ct2 = spark.read.format("org.apache.spark.sql.cassandra")\
  .option("table", "ct2").option("keyspace", "test").load()\
  .where(col("country_main").isNull())

res = ct1.join(ct2, ct1.id == ct2.main_id).select(col("main_id"), 
  col("country").alias("country_main"))
res.write.format("org.apache.spark.sql.cassandra")\
   .option("table", "ct2").option("keyspace", "test")\
   .mode("append").save()

什么代码:

  1. ct2(对应于您的main表)中选择所有行 where country_mainis null;
  2. 执行与ct1(对应于您的test表)的连接以从中获取国家/地区的值(优化可能是从两个表中仅选择必要的列)。另外,请注意连接是由 Spark 完成的,而不是在 Cassandra 级别上 - Cassandra 级别的连接将仅在即将发布的 Spark Cassandra 连接器版本中支持(3.0,但 alpha 版本已经发布);
  3. 重命名列以匹配ct2表结构;
  4. 写回数据。

结果:

cqlsh> select * from test.ct2;

 main_id | cnt | country_main
---------+-----+--------------
       C |   1 |           IQ
       B |   2 |           EN
       A |   1 |           RU
       D |   3 |           RU

对于源数据:

cqlsh> select * from test.ct2;
main_id | cnt | country_main
---------+-----+--------------                                       
       C |   1 |           IQ                                  
       B |   2 |           EN                                                                                         
       A |   1 |         null                                      
       D |   3 |           RU

推荐阅读