apache-spark - 根据使用 Spark 在另一个 Cassandra 表中的不可用性从 Cassandra 表中删除记录
问题描述
我有两个 Cassandra 表
表格1[tb1_id, tb2_id, first_req_time, other_fields, Primary Key(tb1_id)]
表 2[tb2_id, other_fields, Primary Key(tb2_id)]
这两个表都很大(每个大约 300 GB),表 1 有大约 25 亿条记录,表 2 有 2.2 亿条记录。
现在,我想根据这些标准定期从表 1 中删除一些记录(以保持大小):-
- 如果 first_req_time 超过 6 个月。
- 对于table-1中的一条记录[tb1_id, tb2_id],table-2中没有对应的记录。
import java.util.Calendar
import com.datastax.driver.core.ConsistencyLevel
import com.datastax.driver.core.querybuilder.{Delete, QueryBuilder, Select}
import com.datastax.spark.connector.cql.CassandraConnector
import java.util.ArrayList
import scala.util.control.Breaks._
import java.util.UUID.randomUUID
def isRecordActive(row: com.datastax.spark.connector.CassandraRow, refTime: Long): Boolean = {
val first_req_time = row.get[Option[Long]]("first_req_time").getOrElse(0L)
first_req_time >= refTime
}
val cal = Calendar.getInstance()
cal.add(Calendar.MONTH, -6)
val limit = 3000
val connector = CassandraConnector(sc.getConf)
val noOfRowsDeleted = sc.accumulator(0)
val table1 = sc.cassandraTable("db", "table1")
table1.filter(row => !isRecordActive(row, cal.getTime.getTime)).foreachPartition(partition => {
val session = connector.openSession
val listOfIds = new ArrayList[java.util.UUID]()
val mapOfRecords = collection.mutable.Map[java.util.UUID, com.datastax.spark.connector.CassandraRow]()
// collect limited number of records in mapOfRecords
breakable {
partition.foreach { elem =>
val tb2_id = elem.get[Option[java.util.UUID]]("tb2_id").getOrElse(null)
listOfIds.add(tb2_id)
mapOfRecords(tb2_id) = elem
if(listOfIds.size > limit) break
}
}
// filter records found in table2
val select: Select = QueryBuilder.select.from("db", "table2")
select.where(QueryBuilder.in("tb2_id", listOfIds))
select.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE)
val resultSet = session.execute(select)
if(!resultSet.isExhausted()){
val resultIter = resultSet.iterator()
while(resultIter.hasNext){
val cur = resultIter.next
val tb2_id = cur.getUUID("tb2_id")
mapOfRecords.remove(tb2_id)
}
}
// delete remaining records in mapOfRecords
for ((tb2_id, tb1_record) <- mapOfRecords) {
val delete: Delete = QueryBuilder.delete.from("db", "table1")
delete.where(QueryBuilder.eq("tb1_id", tb1_record.get[java.util.UUID]("tb1_id")))
delete.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE)
session.execute(delete)
noOfRowsDeleted += 1
}
session.close()
})
此解决方案的问题:
- noOfRowsDeleted 并不多,单次运行大约 100K。假设我一个接一个地进行后续运行,noOfRowsDeleted 在每次迭代中减少。这种减少的原因是我每次都参考分区(本地化查找)的前 3000 个元素并试图在 table2 中找到它们,并且可用于删除的批次的大小正在减少。
- 如果我尝试从 3000 增加限制,我会得到一个内存溢出异常。我怀疑这是因为我正在创建的内部对象(listOfIds、mapOfRecords),尽管我不确定。
分区数为 2000。
我意识到我在做一些愚蠢的事情,必须有更好的方法来实现我想要实现的目标。
尝试实现:与当前的 100K 相比,一次性删除 200 万次。
解决方案
我建议不要使用 RDD API,而是至少使用 Spark SQL 来准备数据——与 RDD 相比,使用它更容易。
基本上你需要做的是:
- 准备数据 - 您只需读取两个表并为旧数据过滤第一个表,然后与第二个表进行左反连接,因此您只剩下不匹配的记录
- 使用RDD API中的deleteFromCassandra 函数进行删除。
像这样的东西(没有测试实际的删除,只有数据准备,但应该可以):
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector._
val df1 = spark.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "table1", "keyspace" -> "ks"))
.load().select("tb1_id", "tb2_id", "first_req_time")
val df2 = spark.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "table2", "keyspace" -> "ks"))
.load().select("tb2_id")
// Find old data (current_date - 180 days)
val df1_old = df1.filter(to_date($"first_req_time") < date_sub(current_date, 180))
// find records that aren't in the second table
val joined = df1_old.join(df2, Seq("tb2_id"), "left_anti")
.select("tb1_id", "tb2_id")
// perform deletion
joined.rdd.deleteFromCassandra("ks", "table1",
keyColumns = SomeColumns("tb1_id", "tb2_id")
推荐阅读
- c# - 每个并行运行复杂吗?
- java - 使用语言环境将字符串转换为日期
- android - 在默认 Web 浏览器中打开 webview 超链接
- html - div需要居中
- java - java:将 HTML 转换为 PDF 处理失败,并且有 itext
- ios - 设置 UICollectionView 的 NumberOfItemsInSection
- java - 如何使用在我的 Windows 机器上运行的 java 代码写入 linux 中的文件?
- javascript - 获取学生id数组
- .net - .net 应用程序崩溃而没有错误
- javascript - Ace Editor 自动调整同一页面上的 2 个编辑器窗口,一个可以完美运行