首页 > 解决方案 > Spark 结构化流不会从先前的偏移量重新开始

问题描述

我有一个流式火花应用程序,它从 kafka 流中读取消息,如下所示;

    m_kafkaEvents = m_sparkSession.readStream().format("kafka")
        .option("kafka.bootstrap.servers", strKafkaAddress)
        .option("subscribe", InsightConstants.IP_INSIGHT_EVENT_TOPIC)
        .option("maxOffsetsPerTrigger", "100000")
        .option("startingOffsets", "latest")
        .option("failOnDataLoss", false)
        .load()
        .select(
            functions.col("key").cast("string").as(InsightConstants.IP_SYS_INSTANCE_ID), 
            functions.col("value").cast("string").as("event"),
            functions.col("partition"),
            functions.col("offset"))
        .as(ExpressionEncoder.javaBean(InputEvent.class));

然后我处理消息并写入 Kakfa 输出接收器,如下所示;

 DataStreamWriter<SessionUpdate> eventsStream = sessionUpdates
        .writeStream()
        .queryName(strQueryName)
        .outputMode("append")        .trigger(Trigger.ProcessingTime(m_insightDeployment.getIdentifierProcessor().getTriggerInterval()))
        .option("checkpointLocation", strCheckpointLocation)
        .foreach(foreachWriter);

在 ForeachWriter 中,

我将消息推送到kafka主题中,如下所示;

    try
    {
      String strKafkaAddress = m_strKafkaHost + ":" + m_iKafkaPort.toString();
      Properties oProperties = new Properties();
      oProperties.put("bootstrap.servers", strKafkaAddress);
      oProperties.put("acks",              "all");
      oProperties.put("retries",           3);
      oProperties.put("batch.size",        16384);
      oProperties.put("linger.ms",         1);
      oProperties.put("buffer.memory",     33554432);
      oProperties.put("key.serializer",    "org.apache.kafka.common.serialization.StringSerializer");
      oProperties.put("value.serializer",  "org.apache.kafka.common.serialization.StringSerializer");

      m_oProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(oProperties);
    }
    catch (Throwable oThrowable)
    {
      oThrowable.printStackTrace();
    }
  }

    ProducerRecord<String, String> oMessage =
        new ProducerRecord<String, String>(in_strTopic, in_strKey, in_strPayload);

      m_oProducer.send(oMessage);

使用此代码在 spark 应用程序重新启动时(我通过控制台手动将其杀死,一段时间后 spark 应用程序由驱动程序自行重新启动),它将再次从 start 读取消息,而不是基于检查点的先前偏移量。我看到检查点位置已创建,看起来 spark 应用程序没有从该检查点位置读取。因此,我看到重复的消息。

看起来我在这里遗漏了一些东西。有人可以推荐这里缺少的部分吗?

提前感谢您的帮助。

标签: apache-sparkapache-spark-sqlspark-streaming

解决方案


推荐阅读