java - spring-kafka 中的 seekToTimestamp
问题描述
我试图寻找时间戳功能,但由于某种原因,它对我不起作用。
在我的制片人中,我有下一个代码:
ProducerRecord<String, Obj> producer = new ProducerRecord<>("topic", 0, System.currentTimeMillis() - 10000, "key", obj);
kafkaTemplate.send(producer);
在我的 Kafka 监听器中,我试图从高于上述时间戳的某个时间戳中寻找偏移量:
@Component
@RequiredArgsConstructor
@KafkaListener(id = "container",
topics = "topic",
clientIdPrefix = "init_client",
autoStartup = "true")
public class KafkaList implements ConsumerSeekAware {
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
long timestamp = System.currentTimeMillis()+60*1000;
log.info("Search for a time that is great or equal then {}", timestamp);
callback.seekToTimestamp(new ArrayList<>(assignments.keySet()), timestamp);
}
@KafkaHandler
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, Obj obj,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) Long timestamp) {
log.info("Received message timestamp: {}, date: {}", timestamp,
Instant.ofEpochMilli(timestamp).atZone(ZoneId.systemDefault()).toLocalDate());
}
}
在日志中,我看到下一个输出:
Search for a time that is great or equal then 1613079865328
Received message timestamp: 1613079798676, date: 2021-02-11
Kafka 主题 1613079798676 中的时间戳值低于我的搜索值 1613079865328 那么为什么消费者会选择这个偏移量?
解决方案
我刚刚用你的代码测试了它,它按我的预期工作;我在日志中看到了这个...
2021-02-16 11:30:16.587 INFO 36721 --- [o66163492-0-C-1] com.example.demo.So66163492Application:搜索大于或等于 1613493076587 的时间
2021-02-16 11:30:16.590 INFO 36721 --- [o66163492-0-C-1] Oakclients.consumer.KafkaConsumer:[Consumer clientId=consumer-so66163492-1,groupId=so66163492] 寻求为分区偏移 1所以66163492-0
2021-02-16 11:30:16.590 信息 36721 --- [o66163492-0-C-1] osklKafkaMessageListenerContainer:so66163492:分配的分区:[so66163492-0]
2021-02-16 11:30:16.611 INFO 36721 --- [o66163492-0-C-1] com.example.demo.So66163492Application:收到的消息时间戳:1613529016472 qux
@SpringBootApplication
public class So66163492Application extends AbstractConsumerSeekAware {
private static final Logger log = LoggerFactory.getLogger(So66163492Application.class);
public static void main(String[] args) {
SpringApplication.run(So66163492Application.class, args);
}
@KafkaListener(id = "so66163492", topics = "so66163492")
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String obj,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) Long timestamp) {
log.info("Received message timestamp: {} {}", timestamp, obj);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so66163492").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send(new ProducerRecord<>("so66163492", 0, System.currentTimeMillis() - 10_000, "foo", "bar"));
template.send(new ProducerRecord<>("so66163492", 0, System.currentTimeMillis() + 36_000_000, "baz", "qux"));
};
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
long timestamp = System.currentTimeMillis() + 60 * 1000;
log.info("Search for a time that is great or equal then {}", timestamp);
callback.seekToTimestamp(new ArrayList<>(assignments.keySet()), timestamp);
}
}
推荐阅读
- python - Twython - 我怎样才能加快获得用户的关注者?
- java - 如何创建可执行的可执行 jar (Maven-Jfoenix-Hibernate-IntelliJ) __ 有人可以解决/关闭该主题吗
- jmeter - 需要通过 JMeter 发送多个 PUT 请求
- php - PHP mail()函数不发送特定文本
- c# - 如何处理 Swashbuckle 中的路径/路由参数?
- javascript - 获取另一个页面的 DOM,不同的服务器
- reactjs - 如何在 react.js 中使用(graphql query + relay)获取所有数据?
- django - 如何在 Django 中获取仅属于某个国家/地区的城市名称?
- regex - 正则表达式删除除大多数外部括号外的所有括号
- python - 如果 int() 提供非整数,如何处理 ValueError