首页 > 解决方案 > 可以在每个微批次中查找 Cassandra

问题描述

我们正在使用结构化流并尝试对源数据进行一些重复数据删除。如果 id col 在 20 天内重复,我们需要更新最早的事件时间。20 天可能有 10-150 亿行。我们不想使用 dropDuplicates 因为状态可能很大。我们正在考虑使用 Cassandra 表来存储状态(比如到目前为止的 id 和 min time)。每次微批次触发时,我们都会在微批次中查找带有 id 的 Cassandra 表存储状态。20天的ids也在10-150亿级别,或者说Cassandra中的状态表在10-150亿级别。那么在每个微批次中查找或加入这个 Cassandra 表是否可行?

标签: apache-sparkjoincassandraspark-structured-streamingspark-cassandra-connector

解决方案


Spark Cassandra 连接器在 RDD API 中有 2 个对应的函数joinWithCassandra:允许 leftJoinWithCassandra通过主键在 Cassandra 中执行有效的数据查找,如下所示:

val joinWithRDD = someRDD.joinWithCassandraTable("test","table")

连接器的开源版本中的 DataFrame/DataSet API 不支持与 Cassandra 功能的连接,但作为 DSE Analytics 一部分的连接器(所谓的DSE Direct Join)支持连接器。但是您可以将数据转换为 RDD 并通过现有 API 执行连接。


推荐阅读