首页 > 解决方案 > org.apache.kafka.common.errors.TimeoutException:使用 jaas SASL 配置身份验证获取 Kafka 集群的主题元数据时超时

问题描述

我正在尝试部署一个 Google Cloud Dataflow 管道,该管道从 Kafka 集群中读取数据,处理其记录,然后将结果写入 BigQuery。但是,我在尝试部署时不断遇到以下异常:

org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata for Kafka Cluster

Kafka 集群需要使用 JAAS 配置进行身份验证,我使用下面的代码设置 KafkaIO.read Apache Beam 方法所需的属性:

// Kafka properties
    Map<String, Object> kafkaProperties = new HashMap<String, Object>(){{
        put("request.timeout.ms", 900000);
        put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
        put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"USERNAME\" password=\"PASSWORD\";");
        put(CommonClientConfigs.GROUP_ID_CONFIG, GROUP_ID);
    }};

    // Build & execute pipeline
    pipeline
            .apply(
                    "ReadFromKafka",
                    KafkaIO.<Long, String>read()
                            .withBootstrapServers(properties.getProperty("kafka.servers"))
                            .withKeyDeserializer(LongDeserializer.class)
                            .withValueDeserializer(StringDeserializer.class)
                            .withTopic(properties.getProperty("kafka.topic")).withConsumerConfigUpdates(kafkaProperties))

Dataflow 管道将在禁用公共 IP 的情况下部署,但从我们的 Google Cloud VPC 网络到 Kafka 集群已建立 VPN 隧道,并且配置了双方私有 ip 所需的路由,并将它们的 IP 列入白名单。我能够使用与要部署的 Dataflow 作业位于同一 VPN 子网中的 Compute Engine 虚拟机 ping 并连接到 Kafka 服务器的套接字。

我当时认为配置存在问题,但我无法确定我是否缺少其他字段,或者现有字段之一是否配置错误。有谁知道我如何进一步诊断问题,因为抛出的异常并不能真正确定问题?任何帮助将不胜感激。

编辑: 我现在能够成功部署 Dataflow 作业,但是看起来好像读取功能不正常。在查看日志检查Dataflow作业中的错误后,我可以看到在发现kafka主题的组协调器后,在警告日志语句之前没有其他日志语句说关闭空闲阅读器超时:

Close timed out with 1 pending requests to coordinator, terminating client connections

随后是一个未捕获的异常,其根本原因是:

org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition test-partition could be determined

然后有一个错误说明:

Execution of work for P0 for key 0000000000000001 failed. Will retry locally.

由于 Kafka 主题实际上在消息中没有键,这可能是键定义的问题吗?当我在 Kafka Tool 中查看主题时,在数据中观察到的唯一列包括 Offset、Message 和 Timestamp。

标签: javagoogle-cloud-platformapache-kafkagoogle-cloud-dataflowapache-beam

解决方案


根据最后一条评论,我假设您在执行 Dataflow 作业运行程序与 Kafka 代理的连接方面遇到的问题更多的是网络堆栈,然后是最初寻找 Dataflow 管道中缺少的任何配置。

基本上,当您为 Dataflow 工作人员使用公共 IP地址池时,您有一种最简单的方式来访问外部 Kafka 集群,而无需在双方应用额外配置,因为您不需要在各方之间启动VPC 网络并执行例行网络作业来让所有路线工作。

但是,Cloud VPN带来了更多的复杂性,在双方实现 VPC 网络以及进一步调整此 VPC 的 VPN 网关、转发规则和地址池。相反,从 Dataflow 运行时的角度来看,您不需要在 Dataflow 运行器之间传播公共 IP 地址,并且无疑会降低价格。

您提到的主要问题在于 Kafka 集群方面。由于Apache Kafka是一个分布式系统,它有一个核心原则:当生产者/消费者执行时,它会请求关于哪个 broker 是分区领导者的元数据,接收具有该分区可用端点的元数据,因此客户端然后确认这些端点连接到特定的代理。据我了解,在您的情况下,与领导者的连接是通过 绑定到外部网络接口的侦听server.properties器执行的,在代理设置中配置。

因此,您可能会考虑在绑定到云 VPC 网络接口中创建一个单独的侦听器(如果它不存在),listeners并在必要时advertised.listeners使用返回客户端的元数据进行传播,包括用于连接到特定代理的数据。


推荐阅读