首页 > 解决方案 > 将数据从 Spark 结构化流加载到 ArrayList

问题描述

我需要将数据从 Kafka 发送到 Kinesis Firehose。我正在使用 Spark 结构化流处理 Kafka 数据。我不确定如何将流式查询的数据集处理成一个ArrayList变量 - 例如recordList- 例如 100 条记录(可以是任何其他值),然后调用 Firehose APIputRecordBatch(recordList)将记录放入 Firehose。

标签: apache-sparkapache-kafka

解决方案


我认为您想根据您的 Spark 版本检查Foreach 和 ForeachBatch 。ForeachBatch 出现在 V2.4.0 中,foreach 可用 < V2.4.0。如果 Kinesis Firehouse 没有可用的流接收器实现,那么您应该自己实现ForeachWriter。Databricks 有一些使用 foreach 创建自定义编写器的好例子

我从未使用过 Kinesis,但这里有一个您的自定义接收器可能是什么样子的示例。

case class MyConfigInfo(info1: String, info2: String)

class  KinesisSink(configInfo: MyConfigInfo) extends ForeachWriter[(String, String)] {
  val kinesisProducer = _

  def open(partitionId: Long,version: Long): Boolean = {
    kinesisProducer = //set up the kinesis producer using MyConfigInfo
      true
  }

  def process(value: (String, String)): Unit = {
    //ask kinesisProducer to send data
  }

  def close(errorOrNull: Throwable): Unit = {
    //close the kinesis producer
  }
}

如果您使用的是 AWS kinesisfirehose API,您可能会做这样的事情

case class MyConfigInfo(info1: String, info2: String)

class  KinesisSink(configInfo: MyConfigInfo) extends ForeachWriter[(String, String)] {
  val firehoseClient = _
  val req = putRecordBatchRequest = new PutRecordBatchRequest()
  val records = 0
  val recordLimit = //maybe you need to set this? 

  def open(partitionId: Long,version: Long): Boolean = {
    firehoseClient = //set up the firehose client using MyConfigInfo
      true
  }

  def process(value: (String, String)): Unit = {
    //ask fireHose client to send data or batch the request
    val record: Record = //create Record out of value
    req.setRecords(record)
    records = records + 1
    if(records >= recordLimit) {
      firehoseClient.putRecordBatch(req)
      records = 0
    }
  }

  def close(errorOrNull: Throwable): Unit = {
    //close the firehose client
    //or instead you could put the batch request to the firehose client here but i'm not sure if that's good practice
  }
}

然后你会这样使用它

val writer = new KinesisSink(configuration)
val query =
  streamingSelectDF
    .writeStream
    .foreach(writer)
    .outputMode("update")
    .trigger(ProcessingTime("25 seconds"))
    .start()

推荐阅读