scala - 在流式传输 Scala 时将 List[List[String]] 转换为 Spark 数据帧
问题描述
在我的应用程序中,我Array[String]
从流中获取一个并将其传递给方法(作为查询参数),该方法又在其上运行 Neo4j 查询。我想将查询 (a List[List[String]]
) 的结果转换为 Spark DataFrame
。如何在运行 Spark Streaming 时做到这一点?
这是我执行 Neo4j 查询的方法:
def execNeo4jSearchQuery(neo4jSession: Session, data: Array[String]) = {
neo4jSession.run(neo4jQueries.searchQueryWithParams, paramsMap.asJava)
.list()
.asScala
.map(toRow(_, fieldsToRetrieve))
.toList
}
这是流本身的主要方法:
def main(args: Array[String]) {
val sparkConf = new SparkConf()
.setMaster(args(0))
.setAppName("SparkStreaming")
neo4jConfigs.setNeo4jSparkConfig(args(1), sparkConf)
val sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(3))
val sparkContext = streamingContext.sparkContext
sparkContext.setLogLevel("ERROR")
val sqlContext = new SQLContext(sparkContext)
val numStreams = 2
val topics = Array("topic1")
def kafkaParams(i: Int) = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group2",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val lines = (1 to numStreams).map(i => KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams(i))
))
val messages = streamingContext.union(lines)
val values = messages
.map(record => record.value().toString)
val wordsArrays = values.map(_.split(", "))
wordsArrays.foreachRDD(rdd => rdd.foreach(
data => {
execNeo4jSearchQuery(getNeo4jConfig(), data)
}
))
streamingContext.start()
streamingContext.awaitTermination()
}
}
解决方案
推荐阅读
- hardware - SMBIOS 硬盘
- python - 样式化后如何使滚动条句柄可点击?
- python - Python JSON 数据转换成 HTML 表
- java - Android WireApp 在尝试视频通话时记录 UnsatisfiedLink 错误:“wcall_set_video_send_state”
- ssh - 树莓派 ssh 访问被拒绝
- c++ - 上限/下限不像我预期的那样工作,不明白为什么
- c - 在 C 中有效存储参数
- vba - 尝试从其他工作表 vba 生成图形时发生错误
- python-3.x - 在单独的数据框中的列中查找变量值列表
- python - h2o 聚合方法“none”将未知词映射到 NAN 而不是向量