apache-spark - 将数据从 Spark 结构化流加载到 ArrayList
问题描述
我需要将数据从 Kafka 发送到 Kinesis Firehose。我正在使用 Spark 结构化流处理 Kafka 数据。我不确定如何将流式查询的数据集处理成一个ArrayList
变量 - 例如recordList
- 例如 100 条记录(可以是任何其他值),然后调用 Firehose APIputRecordBatch(recordList)
将记录放入 Firehose。
解决方案
我认为您想根据您的 Spark 版本检查Foreach 和 ForeachBatch 。ForeachBatch 出现在 V2.4.0 中,foreach 可用 < V2.4.0。如果 Kinesis Firehouse 没有可用的流接收器实现,那么您应该自己实现ForeachWriter。Databricks 有一些使用 foreach 创建自定义编写器的好例子。
我从未使用过 Kinesis,但这里有一个您的自定义接收器可能是什么样子的示例。
case class MyConfigInfo(info1: String, info2: String)
class KinesisSink(configInfo: MyConfigInfo) extends ForeachWriter[(String, String)] {
val kinesisProducer = _
def open(partitionId: Long,version: Long): Boolean = {
kinesisProducer = //set up the kinesis producer using MyConfigInfo
true
}
def process(value: (String, String)): Unit = {
//ask kinesisProducer to send data
}
def close(errorOrNull: Throwable): Unit = {
//close the kinesis producer
}
}
如果您使用的是 AWS kinesisfirehose API,您可能会做这样的事情
case class MyConfigInfo(info1: String, info2: String)
class KinesisSink(configInfo: MyConfigInfo) extends ForeachWriter[(String, String)] {
val firehoseClient = _
val req = putRecordBatchRequest = new PutRecordBatchRequest()
val records = 0
val recordLimit = //maybe you need to set this?
def open(partitionId: Long,version: Long): Boolean = {
firehoseClient = //set up the firehose client using MyConfigInfo
true
}
def process(value: (String, String)): Unit = {
//ask fireHose client to send data or batch the request
val record: Record = //create Record out of value
req.setRecords(record)
records = records + 1
if(records >= recordLimit) {
firehoseClient.putRecordBatch(req)
records = 0
}
}
def close(errorOrNull: Throwable): Unit = {
//close the firehose client
//or instead you could put the batch request to the firehose client here but i'm not sure if that's good practice
}
}
然后你会这样使用它
val writer = new KinesisSink(configuration)
val query =
streamingSelectDF
.writeStream
.foreach(writer)
.outputMode("update")
.trigger(ProcessingTime("25 seconds"))
.start()
推荐阅读
- ios - CloudKit 订阅通知未发送
- c++ - 启用 c++17 时 __cplusplus 的奇怪输出
- javascript - 计算字符串中字符的出现次数,然后根据计数逻辑用 ( 或 ) 替换它们
- python - 如果目标变量不包含在二进制分类任务的测试数据中,我应该如何预测它
- validation - Struts 2 checkboxlist标签不保存
- python - 为什么我的导数不等于理论值?
- ios - 以编程方式设置文本时未调用 UITextfield 委托方法
- laravel - 如何修复 Laravel 队列错误:“Aws\Sqs\SqsClient 的实例无法序列化”?
- javascript - 使用 AJAX 在 DataTable 中传递排序列的名称
- python - 如何使流线型下拉菜单具有交互性?