首页 > 解决方案 > 如何通过 Spark Job 向 HBase 发送 DELETE 查询

问题描述

我有一个自动化 SparkSQL 作业的用例,我想在其中执行此操作:

  1. 使用 Spark 从 Phoenix 读取一个表(我们称之为 table1)并在一个 DataFrame(我们称之为 df1)中收集找到的所有负值

  2. 然后我想从另一个表(table2)中删除记录,其中列的值在 df1 中(考虑过 JOIN 查询,但我想知道这是否可以使用 DataFrame,以及是否有使用 HBase 的 API 和火花数据帧)

  3. AFAIK Phoenix 不直接通过 Spark 支持 DELETE 操作(如果我错了,如果有什么方法我很想听听,请纠正我),这就是为什么我更倾向于使用 HBase Spark API


这是一个更直观地解释的模式:

图式


这是一些代码。

在 DataFrame 中收集负值:

// Collect negative values
val negativeValues = spark
  .sqlContext
  .phoenixTableAsDataFrame("phoenix.table1", Seq(), conf = hbaseConf)
  .select('COLUMN1)
  .where('COLUMN2.lt(0))

// Send the query
[...]

从表 2 中删除 COLUMN1 在负值中的值,因此在 SQL 中是这样的(如果可以将 IN 直接应用于 DF):

DELETE FROM table2 WHERE COLUMN1 IN negativeValues

我的预期结果是:

table1

column1 |   column2
        |
123456  |   123
234567  |   456
345678  |   -789
456789  |   012
567891  |   -123



table2

column1 |   column2
        |
123456  |   321
234567  |   654
345678  |   945 <---- same column1 as table1's, so delete
456789  |   987
567891  |   675 <---- same column1 as table1's, so delete

所以最终,我想知道是否有一种方法可以通过 Spark 将该 DELETE 请求发送到 HBase 而不会大惊小怪。

谢谢你。

标签: apache-sparkapache-spark-sqlhbasephoenix

解决方案


如果需要通过 Phoenix(sql 引擎)从 spark 运行“DELETE”查询到 Hbase,则必须创建自定义 API。

可以使用以下方法,

  1. 从源数据框中获取 table2 行键列以进行删除(在 table2 上)。
  2. 构造代码以对源数据帧的每个分区进行操作并构建“DELETE”查询。说查询是“DELETE FROM table2 WHERE column1 = ?”,准备它并以你看到的正确批量大小的批量执行它。由于我们在数据帧的每个分区上并行执行它,源数据帧中的分区数驱动并行度。因此您可以尝试使用正确的大小重新分区以查看正确的性能数据。

如果选项是跳过 sql 引擎,您也可以使用 spark-hbase 直接 API。这是一个这样的例子 - https://github.com/tmalaska/SparkOnHBase/blob/master/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala


推荐阅读