首页 > 解决方案 > Kafka Direct Stream 的容错功能不起作用。检查点目录不存在

问题描述

我为从 Kafka 主题读取数据编写应用程序。如果驱动程序发生故障,我无法实现容错。该应用程序使用 spark submit 在 k8s 集群中运行。当我第一次运行我的应用程序时,一切都很顺利,但是当我从集群中删除 pod 时,重新启动应用程序会导致错误。Checkpoint directory does not exist: file:/opt/spark/.../rdd-541. 我使用容错存储。下面是一段代码和更详细的错误。谢谢您的帮助。让我知道是否有一些细节。

def functionToCreateContext():
    sc = SparkContext("spark-master", "kafka_spark")
    sc.setLogLevel('ERROR')
    ssc = StreamingContext(sc, 20)
    kafkaParams = {'bootstrap.servers': 'kafka.cluster'}
    kafkaStream = KafkaUtils.createDirectStream(ssc, ['topic'], kafkaParams)
    statistic_window = kafkaStream.transform(parse_reduce).reduceByKeyAndWindow(lambda x, y: x + y, 
                     lambda x, y: x - y,60, 20)
    top = statistic_window.transform(found)
    top.pprint()
    ssc.checkpoint(cpd)
    return ssc

if __name__ == "__main__":
    ssc = StreamingContext.getOrCreate(cpd, functionToCreateContext)
    ssc.start()
    ssc.awaitTermination()

追溯:

20/03/10 14:05:19 INFO SparkContext: Created broadcast 1 from checkpointFile at DStreamCheckpointData.scala:114
Traceback (most recent call last):
  File "/app-spark/kafka_spark.py", line 75, in <module>
    ssc = StreamingContext.getOrCreate(cpd, functionToCreateContext)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 105, in getOrCreate
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o2.tryRecoverFromCheckpoint.
: java.lang.IllegalArgumentException: requirement failed: Checkpoint directory does not exist: file:/opt/spark/.../rdd-541
        at scala.Predef$.require(Predef.scala:224)

标签: pysparkapache-kafkafault-tolerancecheckpointspark-streaming-kafka

解决方案


我猜您收到此错误是因为您的检查点目录不是持久卷。因此,当您删除 pod 时,指定的目录也会被删除,并且您会收到该Checkpoint directory does not exist错误。

解决方案是使用Persistent Volume来检查点目录。

在这里,您还可以找到完全相同主题的示例。


推荐阅读