apache-spark - Spark-kafka:从 Spark 写入流时出现 org.apache.kafka.common.errors.TimeoutException
问题描述
在从 Spark 编写有关该主题的流时,我遇到了一个问题。
import org.apache.spark.sql.types._
val mySchema = StructType(Array(
StructField("ID", IntegerType),
StructField("ACCOUNT_NUMBER", StringType)
))
val streamingDataFrame = spark.readStream.schema(mySchema).option("delimiter",",")
.csv("file:///opt/files")
streamingDataFrame.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream.format("kafka")
.option("topic", "testing")
.option("kafka.bootstrap.servers", "10.55.55.55:9092")
.option("checkpointLocation", "file:///opt/")
.start().awaitTermination()
错误:
2018-09-12 11:09:04,344 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.kafka.common.errors.TimeoutException: Expiring 38 record(s) for testing-0: 30016 ms has passed since batch creation plus linger time
2018-09-12 11:09:04,358 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): org.apache.kafka.common.errors.TimeoutException: Expiring 38 record(s) for testing-0: 30016 ms has passed since batch creation plus linger time
2018-09-12 11:09:04,359 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
2018-09-12 11:09:04,370 ERROR streaming.StreamExecution: Query [id = 866e4416-138a-42b6-82fd-04b6ee1aa638, runId = 4dd10740-29dd-4275-97e2-a43104d71cf5] terminated with error
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): org.apache.kafka.common.errors.TimeoutException: Expiring 38 record(s) for testing-0: 30016 ms has passed since batch creation plus linger time
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
我的 sbt 详细信息:
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.2.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.0.0"
但是当我通过服务器发送消息时bin/kafka-console-producer.sh
,bin/kafka-console-consumer.sh
我可以发送和接收消息
解决方案
您需要增加request.timeout.ms
客户端的值。
Kafka 将记录分组以增加吞吐量。当一个新的记录添加到批次中时,它必须在时限内发送。request.timeout.ms
是控制此时间限制的可配置参数(默认值为 30 秒)。
当批次排队较长时间时,将TimeoutException
抛出 a 并将记录从队列中删除(因此不会传递消息)。
推荐阅读
- python - 使用 if 根据值添加列值
- bash - Bash 终端:仅将特定行写入日志文件
- docker - 如何使用 docker run 命令传递环境变量?
- css - 如何使用 react-native-svg 找到宽度、高度和 viewBox 的正确值
- visual-studio-code - VSCode“查找全部”不查找所有文件,仅打开文件
- html - Ionic 5 - 如何将自定义样式应用于离子输入
- tabs - 添加按钮以导航选项卡 | 角材料
- sql - 表列的值不是看起来的那样
- google-cloud-platform - 如何检查最新的 Cloud Run 修订版是否已准备好投放
- java - 将布局文件添加到类(Java Android Studio)