首页 > 解决方案 > Kafka 生产者无法通过 Bootstrap 服务器生成消息。如何设置代理主机?

问题描述

使用 Kafka Producer API 获取超时异常。

生产Kafka消息的手动过程

我们通过 SSH 连接到 kafka 服务器:xxxx 在那里我们为生产者输入以下命令

/kafka/bin/kafka-console-producer.sh --broker BrokerHostAddress:9092 --topic TestTopic
{ValidJsonData}

Kafka 服务器和 BrokerHostAddress 是不同的地址。

尝试通过 Java + kafka-clients (2.1.0) 生成有关上述主题的消息:java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: 60000 毫秒后更新元数据失败。

String bootstrap_Server = "x.x.x.x:port"
private static KafkaProducer<String, String> producer = null;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_Server);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
producer = new KafkaProducer<String, String>(props);

kafkaInTopicName = "TestTopic";
Key = "123123";
value = "{ValidJsonData}";
producer.send(new ProducerRecord<String, String>(kafkaInTopicName, key, value)).get(); ```


标签: javaapache-zookeeperkafka-producer-api

解决方案


从线路来看props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_Server);String bootstrap_Server = "x.x.x.x"您的引导服务器似乎不包含该端口,因此它应该是“xxxx:9092”,其中“xxxx”是集群中某个 Kafka 代理的 IP 地址。


推荐阅读