python - 批量加载时间戳敏感数据 HBase
问题描述
我们有很多历史数据需要迁移到 HBase。我们的 HBase 的设置是(时间戳)版本控制是相关的,并且使用我们知道不同列何时可用的一些领域知识。数据量很大,所以我想知道做这个批量加载的好方法是什么。Scala 或 Python 都可以,最好使用 Spark。
解决方案
我已经发布了一个要点,可以让你大部分时间到达那里。我将在这里重现最相关的方法:
def write[TK, TF, TQ, TV](
tableName: String,
ds: Dataset[(TK, Map[TF, Map[TQ, TV]])],
batch: Int = 1000
)(implicit
fk: TK => HBaseData,
ff: TF => HBaseData,
fq: TQ => HBaseData,
fv: TV => HBaseData
): Unit = {
ds.foreachPartition(p => {
val hbase = HBase.getHBase
val table = hbase.getTable(TableName.valueOf(tableName))
val puts = ArrayBuffer[Put]()
p.foreach(r => {
val put = new Put(r._1)
r._2.foreach( f => {
f._2.foreach( q => {
put.addColumn(f._1, q._1, q._2)
})
})
puts += put
if (puts.length >= batch) {
table.put(puts.asJava)
puts.clear()
}
})
if (puts.nonEmpty) {
table.put(puts.asJava)
puts.clear()
}
table.close()
})
}
需要注意的是,此方法仅在其默认行为中使用 HBase 时间戳,因此必须对其进行扩展以包括提供您自己的时间戳。本质上,只需将TV
类型变为 a Map[Long, TV]
,并添加适当的附加嵌套循环。
该HBaseData
类型case class
具有多种隐式方法,可将最常见的类型转换Array[Byte]
为高效的 HBase 存储。
该getHbase
方法确保每个分区仅与 HBase 建立一个连接,以避免每条记录都连接/断开连接。
希望这一切都是明智的,因为我是作为泛型初学者实现的。
推荐阅读
- python - 用于通过自定义代码访问 Smart Lock 系统的现有 API 或文档?(例如耶鲁,八月)
- python - Python 构建总是重新安装 setuptools
- python - 'NoneType' 对象不可下标。使用 django 管理员时
- docker - Telegraf 无法连接到 Docker sock
- angular - 根据另一个 mat-select 的值显示不同的 mat-options
- mongodb - 无法从我的 Ubuntu VPS 连接到 MongoDB
- angular - Angular ngneat 旁观者服务测试
- redux - React Query / RTK Query 与 Redux 的集成?
- database - 使用 NoSQL DB 跟踪用户时间戳数据的 DB 最佳实践(使用 firebase)
- php - 查询返回速度很好,但是获取很慢