首页 > 解决方案 > Flink Kafka 连接器到 eventthub

问题描述

我正在使用 Apache Flink,并尝试通过使用 Apache Kafka 协议从它接收消息来连接到 Azure eventthub。我设法连接到 Azure eventthub 并接收消息,但我无法使用此处所述的 flink 功能“setStartFromTimestamp(...)”(https://ci.apache.org/projects/flink/flink-docs-stable /dev/connectors/kafka.html#kafka-consumers-start-position-configuration)。当我试图从时间戳中获取一些消息时,Kafka 说代理端的消息格式在 0.10.0 之前。有人面临这个吗?Apache Kafka 客户端版本是 2.0.1 Apache Flink 版本是 1.7.2

更新:尝试在消费者包中使用 Azure-Event-Hub 快速入门示例 ( https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/java ) 添加代码以抵消时间戳,如果消息版本低于 0.10.0 kafka 版本,它将按预期返回 null。

        List<PartitionInfo> partitionInfos = consumer.partitionsFor(TOPIC);
        List<TopicPartition> topicPartitions = partitionInfos.stream().map(pi -> new TopicPartition(pi.topic(), pi.partition())).collect(Collectors.toList());
        Map<TopicPartition, Long> topicPartitionToTimestampMap = topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> 0L));
        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = consumer.offsetsForTimes(topicPartitionToTimestampMap);
        System.out.println(offsetAndTimestamp);

标签: apache-kafkaapache-flinkazure-eventhub

解决方案


Sorry we missed this. Kafka offsetsForTimes() is now supported in EH (previously unsupported).

Feel free to open an issue against our Github in the future. https://github.com/Azure/azure-event-hubs-for-kafka


推荐阅读