scala - 从 Kafka 获取错误的偏移量
问题描述
我正在尝试将一些 Avro 数据写入 Kafka,并将其用作从数据库中读取记录的偏移量。所以我不想在每个数据处理阶段之后读取所有记录,我将偏移量保存到 Kafka,然后从它开始我的数据读取过程。我已经为此定义了读取数据方法和最新的偏移量检索方法。问题是,当我第一次添加新数据并保存偏移量时,它没有被正确检索或类似的东西。当我第二次添加数据时,数据是从前一个偏移量而不是当前偏移量开始检索的。预期结果:
第一批数据包括主键1-20,偏移主键20的记录数据,从1开始检索数据。第二批数据包括主键21-30,偏移主键30的记录数据,数据从 20(前一个偏移量)开始检索。第三批数据包括主键31-40,偏移主键40的记录数据,从30开始检索数据(上一个偏移量)。等等。
实际行为如下所示:
第一批数据包括主键1-20,偏移主键20的记录数据,从1开始检索数据。第二批数据包括主键20-30,偏移主键30的记录数据,无数据被检索。第三批数据包括主键31-40,偏移记录数据为主键40,从20开始取数据(???)。
那么可能是什么原因呢?Kafka 的某些配置或整体代码逻辑是否存在问题?
这是我从 Kafka 检索最新偏移量的方式
这是读取我保存到 Kafka 的状态的代码:
def getLatestOffset() = {
val kafkaProperties = new Properties()
kafkaProperties.putAll(kafkaParams)
val topicAndPartition = new TopicPartition(kafkaTopic, 0)
val consumer = new KafkaConsumer[String,GenericRecord](kafkaProperties)
consumer.subscribe(java.util.Arrays.asList(kafkaTopic))
val consumerRecords = consumer
.poll(10000)
.records(kafkaTopic)
.asScala
.toList
val partitionsAssigned = consumer.assignment()
val endOffset = consumer.endOffsets(partitionsAssigned)
.get(topicAndPartition)
endOffset
}
def readComplexStateFromKafka(sparkSession: SparkSession, dayColumn: String, endingOffset: Long) = {
logger.debug(s"Reading from Kafka topic: $kafkaTopic")
val offsetRanges = Array(
OffsetRange(kafkaTopic, 0, endingOffset-1, endingOffset)
)
val dataRDD = KafkaUtils.createRDD(
sparkSession.sparkContext,
sparkAppConfig.kafkaParams.asJava,
offsetRanges,
LocationStrategies.PreferConsistent
)
val genericRecordsValues = dataRDD
.map(record =>
record
.value()
.asInstanceOf[GenericRecord]
)
val genericRecordsFields = genericRecordsValues
.map(record =>
(record.get("table_name").toString,
record.get("code").toString,
new Timestamp(record.get(dayColumn).asInstanceOf[Long]).toString)
)
genericRecordsFields.first()
}
Kafka 参数如下所示:
bootstrapServers = "kafka-headless.default.svc.cluster.local:9092"
schemaRegistryUrl = "http://cp-schema-registry:8081"
topic = "tableName_offsets"
keyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer"
valueDeserializer = "io.confluent.kafka.serializers.KafkaAvroDeserializer"
keySerializer = "org.apache.kafka.common.serialization.StringSerializer"
valueSerializer = "io.confluent.kafka.serializers.KafkaAvroSerializer"
clientId = "table_state"
groupId = "avro_data"
autoOffsetReset = "latest"
enableAutoCommit = true
auto.commit.interval.ms = 10000
解决方案
推荐阅读
- netlogo - 我如何让乌龟计算其他乌龟的数量比自己的乌龟拥有值大?
- node.js - 查找(查询)返回空表
- c++ - C ++中的安全交叉编译器ABI?
- arrays - mongodb查询搜索后从字段中提取值
- webhooks - Microsoft Logic App 中的 Webhook 触发器不会触发
- java - 片段中的 java.util.ConcurrentModificationException
- linux - Ansible 禁用 repo 未按预期运行
- asp.net - Angular HttpClient 在真实设备中不起作用
- c++ - libharu 中的 utf8:嵌入字体真的有必要吗?
- javascript - Firestore:按文档数组中的地图查询