apache-kafka - Flink Kafka 连接器到 eventthub
问题描述
我正在使用 Apache Flink,并尝试通过使用 Apache Kafka 协议从它接收消息来连接到 Azure eventthub。我设法连接到 Azure eventthub 并接收消息,但我无法使用此处所述的 flink 功能“setStartFromTimestamp(...)”(https://ci.apache.org/projects/flink/flink-docs-stable /dev/connectors/kafka.html#kafka-consumers-start-position-configuration)。当我试图从时间戳中获取一些消息时,Kafka 说代理端的消息格式在 0.10.0 之前。有人面临这个吗?Apache Kafka 客户端版本是 2.0.1 Apache Flink 版本是 1.7.2
更新:尝试在消费者包中使用 Azure-Event-Hub 快速入门示例 ( https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/java ) 添加代码以抵消时间戳,如果消息版本低于 0.10.0 kafka 版本,它将按预期返回 null。
List<PartitionInfo> partitionInfos = consumer.partitionsFor(TOPIC);
List<TopicPartition> topicPartitions = partitionInfos.stream().map(pi -> new TopicPartition(pi.topic(), pi.partition())).collect(Collectors.toList());
Map<TopicPartition, Long> topicPartitionToTimestampMap = topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> 0L));
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = consumer.offsetsForTimes(topicPartitionToTimestampMap);
System.out.println(offsetAndTimestamp);
解决方案
Sorry we missed this. Kafka offsetsForTimes() is now supported in EH (previously unsupported).
Feel free to open an issue against our Github in the future. https://github.com/Azure/azure-event-hubs-for-kafka
推荐阅读
- javascript - 使用这个访问 Vue 实例
- mysql - 获取与列表(数组)中精确值匹配的记录
- javascript - 在 JSON 中绑定没有“值”的选项
- java - java.util.NoSuchElementException 错误(可能是因为 Scanner)
- vb.net - 保存数据以访问不起作用,尽管没有错误
- encryption - oiosaml 密钥库中是否可以有多个密钥?
- python - Kafka Python,如何跟踪在不同进程中开始的消费者
- android - Gradle sync failed - Failed to resolve: appcompat-v7:26
- ethereum - 如何在输入数据中显示调用的智能合约函数的名称
- kubernetes - 如何为公共和私有域运行多个入口网关