scala - 使用结构化 Spark 流在 HBase 中批量插入数据
问题描述
我正在使用结构化 Spark Streaming 读取来自 Kafka(每秒 100.000 行)的数据,并且我正在尝试将所有数据插入 HBase。
我在 Cloudera Hadoop 2.6 中,我正在使用 Spark 2.3
我尝试了类似我在这里看到的东西。
eventhubs.writeStream
.foreach(new MyHBaseWriter[Row])
.option("checkpointLocation", checkpointDir)
.start()
.awaitTermination()
MyHBaseWriter 看起来像这样:
class AtomeHBaseWriter[RECORD] extends HBaseForeachWriter[Row] {
override def toPut(record: Row): Put = {
override val tableName: String = "hbase-table-name"
override def toPut(record: Row): Put = {
// Get Json
val data = JSON.parseFull(record.getString(0)).asInstanceOf[Some[Map[String, Object]]]
val key = data.getOrElse(Map())("key")+ ""
val val = data.getOrElse(Map())("val")+ ""
val p = new Put(Bytes.toBytes(key))
//Add columns ...
p.addColumn(Bytes.toBytes(columnFamaliyName),Bytes.toBytes(columnName), Bytes.toBytes(val))
p
}
}
HBaseForeachWriter 类看起来像这样:
trait HBaseForeachWriter[RECORD] extends ForeachWriter[RECORD] {
val tableName: String
def pool: Option[ExecutorService] = None
def user: Option[User] = None
private var hTable: Table = _
private var connection: Connection = _
override def open(partitionId: Long, version: Long): Boolean = {
connection = createConnection()
hTable = getHTable(connection)
true
}
def createConnection(): Connection = {
// I create HBase Connection Here
}
def getHTable(connection: Connection): Table = {
connection.getTable(TableName.valueOf(Variables.getTableName()))
}
override def process(record: RECORD): Unit = {
val put = toPut(record)
hTable.put(put)
}
override def close(errorOrNull: Throwable): Unit = {
hTable.close()
connection.close()
}
def toPut(record: RECORD): Put
}
所以在这里我逐行进行放置,即使我允许 20 个执行器和每个执行器 4 个核心,我也没有立即将数据插入 HBase。所以我需要做的是批量加载,因为我在互联网上找到的所有东西都是通过 RDD 和 Map/Reduce 来实现的。
解决方案
我的理解是记录摄取到 hbase 的速度很慢。我有几个建议给你。
1) hbase.client.write.buffe r。
以下属性可能会对您有所帮助。
hbase.client.write.buffer
说明BufferedMutator 写入缓冲区的默认大小(以字节为单位)。更大的缓冲区会占用更多的内存——在客户端和服务器端,因为服务器会实例化传递的写入缓冲区来处理它——但是更大的缓冲区大小会减少生成的 RPC 的数量。对于服务器端内存使用的估计,评估 hbase.client.write.buffer * hbase.regionserver.handler.count
默认 2097152(大约 2 mb)
我更喜欢foreachBatch
看火花文档(它在火花核心中的 foreachPartition 类型)而不是foreach
同样在你的 hbase writer 中扩展ForeachWriter
open
方法初始化放入的数组列表将放入process
的放入数组列表中close
table.put(listofputs);
,然后在更新表后重置数组列表...
它的作用基本上是上面提到的缓冲区大小填充了 2 mb,然后它将刷新到 hbase 表中。在那之前,记录不会进入 hbase 表。
您可以将其增加到 10mb 等等....这样 RPC 的数量将减少。并且大量数据将被刷新并位于 hbase 表中。
当写入缓冲区被填满并flushCommits
触发到 hbase 表时。
示例代码:在我的回答中
2)关闭 WAL 您可以关闭 WAL(预写日志 - 危险是无法恢复)但它会加快写入速度......如果不想恢复数据。
注意:如果您在 hbase 表上使用 solr 或 cloudera 搜索,则不应将其关闭,因为 Solr 将在 WAL 上工作。如果您将其关闭,Solr 索引将无法正常工作。这是我们许多人常犯的一个错误。
如何关闭: https://hbase.apache.org/1.1/apidocs/org/apache/hadoop/hbase/client/Put.html#setWriteToWAL(boolean)
进一步研究的 基本架构和链接:
正如我提到的puts列表是好方法......这是在结构化流示例如下之前做的旧方法(foreachPartition和puts列表)......其中foreachPartition
对每个分区而不是每一行进行操作。
def writeHbase(mydataframe: DataFrame) = {
val columnFamilyName: String = "c"
mydataframe.foreachPartition(rows => {
val puts = new util.ArrayList[ Put ]
rows.foreach(row => {
val key = row.getAs[ String ]("rowKey")
val p = new Put(Bytes.toBytes(key))
val columnV = row.getAs[ Double ]("x")
val columnT = row.getAs[ Long ]("y")
p.addColumn(
Bytes.toBytes(columnFamilyName),
Bytes.toBytes("x"),
Bytes.toBytes(columnX)
)
p.addColumn(
Bytes.toBytes(columnFamilyName),
Bytes.toBytes("y"),
Bytes.toBytes(columnY)
)
puts.add(p)
})
HBaseUtil.putRows(hbaseZookeeperQuorum, hbaseTableName, puts)
})
}
总结一下 :
我觉得我们需要了解 spark 和 hbase 的心理学才能做出有效的配对。
推荐阅读
- css - 如何在部分中显示固定的视频背景?
- javascript - 如何检查字符串是 xml 还是不使用 Node.js
- php - 如何从 AWS ECS 容器访问 Laravel 原生日志文件(storage/logs/)?
- oracle - regexp_replace 从包含 - 的字符串中提取
- java - 如何在不知道 id 的情况下选择 SQLite DB 的最后一个条目?
- r - 在 ggplot 中使用辅助轴时图例不可见
- django - 使用 django annotate 计算两个日期时间字段之间的时间差(减去时间范围 10PM 到 9AM)?
- asp.net - 401 错误后使用 C# 调用 Dynamics NAV Web 服务成功
- react-testing-library - RTL - 处理复杂和大型集成组件超时的“正确”方式
- python - 从 Transfermarkt 抓取数据 - 如何获取全名和姓氏