apache-spark - 从 Spark 中删除 Cassandra 中的特定列
问题描述
我能够使用 RDD API 删除特定列 -
sc.cassandraTable("books_ks", "books")
.deleteFromCassandra("books_ks", "books",SomeColumns("book_price"))
我正在努力使用 Dataframe API 做到这一点。
有人可以分享一个例子吗?
解决方案
您不能通过 DF API 删除,并且通过 RDD api 是不自然的。RDDs 和 DFs 是不可变的,这意味着没有修改。您可以过滤它们以减少它们,但这会生成一个新的 RDD / DF。
话虽如此,您可以做的是过滤掉您希望删除的行,然后构建一个 C* 客户端来执行该删除:
// Spark 和 C* 连接的导入 import org.apache.spark.sql.cassandra._ import com.datastax.spark.connector.cql.CassandraConnectorConf
spark.setCassandraConf("Test Cluster", CassandraConnectorConf.ConnectionHostParam.option("localhost"))
val df = spark.read.format("org.apache.spark.sql.cassandra").options(Map("keyspace" -> "books_ks", "table" -> "books")).load()
val dfToDelete = df.filter($"price" < 3).select($"price");
dfToDelete.show();
// import for C* client
import com.datastax.driver.core._
// build a C* client (part of the dependency of the scala driver)
val clusterBuilder = Cluster.builder().addContactPoints("127.0.0.1");
val cluster = clusterBuilder.build();
val session = cluster.connect();
// loop over everything that you filtered in the DF and delete specified row.
for(price <- dfToDelete.collect())
session.execute("DELETE FROM books_ks.books WHERE price=" + price.get(0).toString);
很少有警告如果您尝试删除大部分行,这将无法正常工作。在这里使用 collect 意味着这项工作将在 Spark 的驱动程序中完成,即 SPOF 和瓶颈。
更好的方法是去 a) 定义一个 DF UDF 来执行删除,好处是你可以获得并行化。选项 b) 到 RDD 级别,只删除如上所示。
故事的寓意,仅仅因为它可以做到,并不意味着它应该做到。
推荐阅读
- qt - Qml StackView 推送转换不起作用
- vba - DOMDocument 未加载
- python - 如何根据上述行的值添加新列
- localization - 我们可以使用 moment.js 支持数字本地化吗
- asp.net-core - 使用 UseStatusCodePagesWithReExecute 并显示消息未按预期工作
- c# - 将 XAML 窗口转换为位图
- mysql - mysql 根据输入在列中查找最“真”的行
- appium-android - 无法使用 bat 文件启动 appium 1.9.0
- java - Java http代理SocketException:来自服务器连接重置的文件意外结束
- cassandra - 如何在 cassandra 流式传输时增加每个主机的连接