scala - Spark Streaming - 加入多个kafka流操作很慢
问题描述
我有 3 个 kafka 流,每个流有 600k+ 条记录,火花流需要 10 多分钟来处理流之间的简单连接。
星火集群配置:
这就是我在 spark(scala) 中读取 kafka 流到 tempviews 的方式
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "KAFKASERVER")
.option("subscribe", TOPIC1)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest").load()
.selectExpr("CAST(value AS STRING) as json")
.select( from_json($"json", schema=SCHEMA1).as("data"))
.select($"COL1", $"COL2")
.createOrReplaceTempView("TABLE1")
我使用 spark spark sql 加入 3 个 TABLES
select COL1, COL2 from TABLE1
JOIN TABLE2 ON TABLE1.PK = TABLE2.PK
JOIN TABLE3 ON TABLE2.PK = TABLE3.PK
作业的执行:
我是否错过了一些我必须研究的火花配置?
解决方案
不幸的是,没有您期望的任何测试数据或结果数据,所以我可以玩,所以我无法给出确切的正确答案。
@Asteroid 注释是有效的,因为我们看到每个阶段的任务数是 1。通常 Kafka 流使用接收器来消费主题;每个接收者只创建一个任务。一种方法是使用多个接收器/拆分分区/增加资源(核心数量)以增加并行度。
如果这仍然不起作用,另一种方法是使用 Kafka API 创建DirectStream。根据文档https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html,这个创建了一个输入流,直接从 Kafka 中提取消息经纪人不使用任何接收器。
我初步制作了一个示例代码,用于在下面创建直接流。您可能想了解这一点以根据自己的喜好进行定制。
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "KAFKASERVER",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"startingOffsets" -> "earliest",
"endingOffsets" -> "latest"
)
val topics = Array(TOPIC1)
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val schema = StructType(StructField('data', StringType, True))
val df = spark.createDataFrame([], schema)
val dstream = stream.map(_.value())
dstream.forEachRDD(){rdd:RDD[String], time:Time} => {
val tdf = spark.read.schema(schema).json(rdd)
df = df.union(tdf)
df.createOrReplaceTempView("TABLE1")
}
一些相关材料:
https://mapr.com/blog/monitoring-real-time-uber-data-using-spark-machine-learning-streaming-and-kafka-api-part-2/(向下滚动到 Kafka 消费者代码部分。其他部分无关)
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html(用于创建直接流的 Spark Doc)
祝你好运!
推荐阅读
- c# - 如何强制派生类覆盖仅返回其类名的方法?
- python - 找到积雪最多的前 3 个位置并将其存储
- c++ - 缺少成员初始值设定项列表的示例。184 的编程原理和使用 C++ 的实践,第 2 版
- cdi - 将 JSF1.2 应用程序转换为 JSF2.2 Glassfish 部署错误:加载应用程序时出现异常:CDI 部署失败:WELD-001408:
- python - 如何在python中只读取串口的数字?
- python - while 在正确的地方是真的吗?另外我如何让用户提示用户添加一些东西,然后在while循环中显示它?
- rust - rust E0597:借来的值不够活 lnog
- c# - DataGrid鼠标双击事件在组合框WPF中填充数据
- python - 如何将 Numpy / PyTorch 数组的下 n 个元素设置为某个值?
- ios - performSegue 在主屏幕顶部拉出第二个窗口而不是替换它?