apache-spark - 如何通过 Spark Job 向 HBase 发送 DELETE 查询
问题描述
我有一个自动化 SparkSQL 作业的用例,我想在其中执行此操作:
使用 Spark 从 Phoenix 读取一个表(我们称之为 table1)并在一个 DataFrame(我们称之为 df1)中收集找到的所有负值
然后我想从另一个表(table2)中删除记录,其中列的值在 df1 中(考虑过 JOIN 查询,但我想知道这是否可以使用 DataFrame,以及是否有使用 HBase 的 API 和火花数据帧)
AFAIK Phoenix 不直接通过 Spark 支持 DELETE 操作(如果我错了,如果有什么方法我很想听听,请纠正我),这就是为什么我更倾向于使用 HBase Spark API
这是一个更直观地解释的模式:
这是一些代码。
在 DataFrame 中收集负值:
// Collect negative values
val negativeValues = spark
.sqlContext
.phoenixTableAsDataFrame("phoenix.table1", Seq(), conf = hbaseConf)
.select('COLUMN1)
.where('COLUMN2.lt(0))
// Send the query
[...]
从表 2 中删除 COLUMN1 在负值中的值,因此在 SQL 中是这样的(如果可以将 IN 直接应用于 DF):
DELETE FROM table2 WHERE COLUMN1 IN negativeValues
我的预期结果是:
table1
column1 | column2
|
123456 | 123
234567 | 456
345678 | -789
456789 | 012
567891 | -123
table2
column1 | column2
|
123456 | 321
234567 | 654
345678 | 945 <---- same column1 as table1's, so delete
456789 | 987
567891 | 675 <---- same column1 as table1's, so delete
所以最终,我想知道是否有一种方法可以通过 Spark 将该 DELETE 请求发送到 HBase 而不会大惊小怪。
谢谢你。
解决方案
如果需要通过 Phoenix(sql 引擎)从 spark 运行“DELETE”查询到 Hbase,则必须创建自定义 API。
可以使用以下方法,
- 从源数据框中获取 table2 行键列以进行删除(在 table2 上)。
- 构造代码以对源数据帧的每个分区进行操作并构建“DELETE”查询。说查询是“DELETE FROM table2 WHERE column1 = ?”,准备它并以你看到的正确批量大小的批量执行它。由于我们在数据帧的每个分区上并行执行它,源数据帧中的分区数驱动并行度。因此您可以尝试使用正确的大小重新分区以查看正确的性能数据。
如果选项是跳过 sql 引擎,您也可以使用 spark-hbase 直接 API。这是一个这样的例子 - https://github.com/tmalaska/SparkOnHBase/blob/master/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala
推荐阅读
- python - 如何创建一个机器人在 python 中为我玩游戏?
- python - AttributeError:'str'对象没有属性'seek'与python
- c# - SSRS Reportviewer 导出 Excel 无效文件
- jquery - 如何在单击按钮时使用 jquery 将输入字段的值设置为会话变量的值
- sql - Group By 的基本 SQL 问题(在 Netezza 中)
- r - 使用自己的 crs 进行投影时,projectRaster 会产生不同的输出
- c# - 多线程问题,说它在任务没有完成之前完成
- java - CameraX 前置预览很暗
- oracle - SQL 计划更改原因
- python - 从 python 中的 post 请求中解析数据