pyspark - Kafka Direct Stream 的容错功能不起作用。检查点目录不存在
问题描述
我为从 Kafka 主题读取数据编写应用程序。如果驱动程序发生故障,我无法实现容错。该应用程序使用 spark submit 在 k8s 集群中运行。当我第一次运行我的应用程序时,一切都很顺利,但是当我从集群中删除 pod 时,重新启动应用程序会导致错误。Checkpoint directory does not exist: file:/opt/spark/.../rdd-541
. 我使用容错存储。下面是一段代码和更详细的错误。谢谢您的帮助。让我知道是否有一些细节。
def functionToCreateContext():
sc = SparkContext("spark-master", "kafka_spark")
sc.setLogLevel('ERROR')
ssc = StreamingContext(sc, 20)
kafkaParams = {'bootstrap.servers': 'kafka.cluster'}
kafkaStream = KafkaUtils.createDirectStream(ssc, ['topic'], kafkaParams)
statistic_window = kafkaStream.transform(parse_reduce).reduceByKeyAndWindow(lambda x, y: x + y,
lambda x, y: x - y,60, 20)
top = statistic_window.transform(found)
top.pprint()
ssc.checkpoint(cpd)
return ssc
if __name__ == "__main__":
ssc = StreamingContext.getOrCreate(cpd, functionToCreateContext)
ssc.start()
ssc.awaitTermination()
追溯:
20/03/10 14:05:19 INFO SparkContext: Created broadcast 1 from checkpointFile at DStreamCheckpointData.scala:114
Traceback (most recent call last):
File "/app-spark/kafka_spark.py", line 75, in <module>
ssc = StreamingContext.getOrCreate(cpd, functionToCreateContext)
File "/opt/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 105, in getOrCreate
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o2.tryRecoverFromCheckpoint.
: java.lang.IllegalArgumentException: requirement failed: Checkpoint directory does not exist: file:/opt/spark/.../rdd-541
at scala.Predef$.require(Predef.scala:224)
解决方案
我猜您收到此错误是因为您的检查点目录不是持久卷。因此,当您删除 pod 时,指定的目录也会被删除,并且您会收到该Checkpoint directory does not exist
错误。
解决方案是使用Persistent Volume来检查点目录。
在这里,您还可以找到完全相同主题的示例。
推荐阅读
- azure-cosmosdb - 在 cosmos db 中转换 JSON 结构
- ios - 此应用无法在中国激活 CallKit 功能时获得批准。请进行适当的更改并重新提交此应用以供审核
- c# - 根据输出的xml修改xslt
- turi-create - 如何在 mac 中安装 turicreate
- c# - 从 ado.net 而不是 DataTable 返回任何类型的数据列表
- python-3.x - Numpy 数组中对象的大小:Sympy Points 示例
- ruby-on-rails - 如何在 Rails 项目中使用 secure_yaml 解密和使用加密的 database.yml?
- asp.net-core - 在 .NET Core 2.1.0-rc1-final 中使用 System.Net.Http.SocketsHttpHandler
- python - 如何在 Tornado 中获取请求的身份验证标头
- c# - ASP.NET Web api 2- 从数组中删除