首页 > 解决方案 > 在流式传输 Scala 时将 List[List[String]] 转换为 Spark 数据帧

问题描述

在我的应用程序中,我Array[String]从流中获取一个并将其传递给方法(作为查询参数),该方法又在其上运行 Neo4j 查询。我想将查询 (a List[List[String]]) 的结果转换为 Spark DataFrame。如何在运行 Spark Streaming 时做到这一点?

这是我执行 Neo4j 查询的方法:

  def execNeo4jSearchQuery(neo4jSession: Session, data: Array[String]) = {
    neo4jSession.run(neo4jQueries.searchQueryWithParams, paramsMap.asJava)
      .list()
      .asScala
      .map(toRow(_, fieldsToRetrieve))
      .toList
  }

这是流本身的主要方法:

def main(args: Array[String]) {
    val sparkConf = new SparkConf()
      .setMaster(args(0))
      .setAppName("SparkStreaming")
    neo4jConfigs.setNeo4jSparkConfig(args(1), sparkConf)

    val sparkSession = SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()

    val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(3))
    val sparkContext = streamingContext.sparkContext
    sparkContext.setLogLevel("ERROR")

    val sqlContext = new SQLContext(sparkContext)

    val numStreams = 2
    val topics = Array("topic1")

    def kafkaParams(i: Int) = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "group2",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val lines = (1 to numStreams).map(i => KafkaUtils.createDirectStream[String, String](
      streamingContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams(i))
    ))

    val messages = streamingContext.union(lines)

    val values = messages
      .map(record => record.value().toString)
    val wordsArrays = values.map(_.split(", "))

    wordsArrays.foreachRDD(rdd => rdd.foreach(
      data => {
        execNeo4jSearchQuery(getNeo4jConfig(), data)
      }
    ))

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

标签: scalaapache-sparkapache-kafkaspark-streaming

解决方案


推荐阅读