首页 > 解决方案 > Spark Streaming - 加入多个kafka流操作很慢

问题描述

我有 3 个 kafka 流,每个流有 600k+ 条记录,火花流需要 10 多分钟来处理流之间的简单连接。

星火集群配置:

火花大师用户界面

这就是我在 spark(scala) 中读取 kafka 流到 tempviews 的方式

spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "KAFKASERVER")
.option("subscribe", TOPIC1)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest").load()
.selectExpr("CAST(value AS STRING) as json")
.select( from_json($"json", schema=SCHEMA1).as("data"))
.select($"COL1", $"COL2")
.createOrReplaceTempView("TABLE1")

我使用 spark spark sql 加入 3 个 TABLES

select COL1, COL2 from TABLE1   
JOIN TABLE2 ON TABLE1.PK = TABLE2.PK
JOIN TABLE3 ON TABLE2.PK = TABLE3.PK

作业的执行:

作业界面

我是否错过了一些我必须研究的火花配置?

标签: scalaapache-sparkapache-kafkaspark-structured-streaming

解决方案


不幸的是,没有您期望的任何测试数据或结果数据,所以我可以玩,所以我无法给出确切的正确答案。

@Asteroid 注释是有效的,因为我们看到每个阶段的任务数是 1。通常 Kafka 流使用接收器来消费主题;每个接收者只创建一个任务。一种方法是使用多个接收器/拆分分区/增加资源(核心数量)以增加并行度。

如果这仍然不起作用,另一种方法是使用 Kafka API 创建DirectStream。根据文档https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html,这个创建了一个输入流,直接从 Kafka 中提取消息经纪人不使用任何接收器。

我初步制作了一个示例代码,用于在下面创建直接流。您可能想了解这一点以根据自己的喜好进行定制。

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "KAFKASERVER",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "startingOffsets" -> "earliest",
  "endingOffsets" -> "latest"
)

val topics = Array(TOPIC1)
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)
val schema = StructType(StructField('data', StringType, True))
val df = spark.createDataFrame([], schema)
val dstream = stream.map(_.value())
dstream.forEachRDD(){rdd:RDD[String], time:Time} => {
    val tdf = spark.read.schema(schema).json(rdd)
    df = df.union(tdf)
    df.createOrReplaceTempView("TABLE1")
}

一些相关材料:

https://mapr.com/blog/monitoring-real-time-uber-data-using-spark-machine-learning-streaming-and-kafka-api-part-2/(向下滚动到 Kafka 消费者代码部分。其他部分无关)

https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html(用于创建直接流的 Spark Doc)

祝你好运!


推荐阅读