apache-spark - Spark 结构化流不会从先前的偏移量重新开始
问题描述
我有一个流式火花应用程序,它从 kafka 流中读取消息,如下所示;
m_kafkaEvents = m_sparkSession.readStream().format("kafka")
.option("kafka.bootstrap.servers", strKafkaAddress)
.option("subscribe", InsightConstants.IP_INSIGHT_EVENT_TOPIC)
.option("maxOffsetsPerTrigger", "100000")
.option("startingOffsets", "latest")
.option("failOnDataLoss", false)
.load()
.select(
functions.col("key").cast("string").as(InsightConstants.IP_SYS_INSTANCE_ID),
functions.col("value").cast("string").as("event"),
functions.col("partition"),
functions.col("offset"))
.as(ExpressionEncoder.javaBean(InputEvent.class));
然后我处理消息并写入 Kakfa 输出接收器,如下所示;
DataStreamWriter<SessionUpdate> eventsStream = sessionUpdates
.writeStream()
.queryName(strQueryName)
.outputMode("append") .trigger(Trigger.ProcessingTime(m_insightDeployment.getIdentifierProcessor().getTriggerInterval()))
.option("checkpointLocation", strCheckpointLocation)
.foreach(foreachWriter);
在 ForeachWriter 中,
我将消息推送到kafka主题中,如下所示;
try
{
String strKafkaAddress = m_strKafkaHost + ":" + m_iKafkaPort.toString();
Properties oProperties = new Properties();
oProperties.put("bootstrap.servers", strKafkaAddress);
oProperties.put("acks", "all");
oProperties.put("retries", 3);
oProperties.put("batch.size", 16384);
oProperties.put("linger.ms", 1);
oProperties.put("buffer.memory", 33554432);
oProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
oProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
m_oProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(oProperties);
}
catch (Throwable oThrowable)
{
oThrowable.printStackTrace();
}
}
ProducerRecord<String, String> oMessage =
new ProducerRecord<String, String>(in_strTopic, in_strKey, in_strPayload);
m_oProducer.send(oMessage);
使用此代码在 spark 应用程序重新启动时(我通过控制台手动将其杀死,一段时间后 spark 应用程序由驱动程序自行重新启动),它将再次从 start 读取消息,而不是基于检查点的先前偏移量。我看到检查点位置已创建,看起来 spark 应用程序没有从该检查点位置读取。因此,我看到重复的消息。
看起来我在这里遗漏了一些东西。有人可以推荐这里缺少的部分吗?
提前感谢您的帮助。
解决方案
推荐阅读
- java - 两个 Set 包含相同的元素,但它们不相等。为什么会有这样的结果?
- android - 如何像在 iOS 中为 iMessage 一样为 Android 创建贴纸包?
- vb.net - 类型“集合”未定义 vb BC30002
- java - 将具有动态确定类型参数的对象传递给 2 参数方法,其两个参数具有相同的类型参数
- c++ - 用于 where 子句的 Odbc 空值绑定
- c# - 无法在 LINQ to Entities 查询中构造实体或复杂类型 X
- linux - 两个 Linux 服务器之间的 PowerShell 远程执行
- javascript - AngularJS 参数“fn”不是函数,未定义
- javascript - Meteor 客户端无法访问已编译的生产文件
- linux - glew 可以与 Linux 上其他库中的 glew 冲突吗?