首页 > 解决方案 > 批量加载时间戳敏感数据 HBase

问题描述

我们有很多历史数据需要迁移到 HBase。我们的 HBase 的设置是(时间戳)版本控制是相关的,并且使用我们知道不同列何时可用的一些领域知识。数据量很大,所以我想知道做这个批量加载的好方法是什么。Scala 或 Python 都可以,最好使用 Spark。

标签: pythonscalaapache-sparkhbase

解决方案


我已经发布了一个要点,可以让你大部分时间到达那里。我将在这里重现最相关的方法:

def write[TK, TF, TQ, TV](
  tableName: String,
  ds: Dataset[(TK, Map[TF, Map[TQ, TV]])],
  batch: Int = 1000
)(implicit
  fk: TK => HBaseData,
  ff: TF => HBaseData,
  fq: TQ => HBaseData,
  fv: TV => HBaseData
): Unit = {
  ds.foreachPartition(p => {
    val hbase = HBase.getHBase
    val table = hbase.getTable(TableName.valueOf(tableName))
    val puts = ArrayBuffer[Put]()

    p.foreach(r => {
      val put = new Put(r._1)
      r._2.foreach( f => {
        f._2.foreach( q => {
          put.addColumn(f._1, q._1, q._2)
        })
      })

      puts += put
      if (puts.length >= batch) {
        table.put(puts.asJava)
        puts.clear()
      }
    })
    if (puts.nonEmpty) {
      table.put(puts.asJava)
      puts.clear()
    }
    table.close()
  })
}

需要注意的是,此方法仅在其默认行为中使用 HBase 时间戳,因此必须对其进行扩展以包括提供您自己的时间戳。本质上,只需将TV类型变为 a Map[Long, TV],并添加适当的附加嵌套循环。

HBaseData类型case class具有多种隐式方法,可将最常见的类型转换Array[Byte]为高效的 HBase 存储。

getHbase方法确保每个分区仅与 HBase 建立一个连接,以避免每条记录都连接/断开连接。

希望这一切都是明智的,因为我是作为泛型初学者实现的。


推荐阅读