java - Kafka 生产者无法通过 Bootstrap 服务器生成消息。如何设置代理主机?
问题描述
使用 Kafka Producer API 获取超时异常。
生产Kafka消息的手动过程
我们通过 SSH 连接到 kafka 服务器:xxxx 在那里我们为生产者输入以下命令
/kafka/bin/kafka-console-producer.sh --broker BrokerHostAddress:9092 --topic TestTopic
{ValidJsonData}
Kafka 服务器和 BrokerHostAddress 是不同的地址。
尝试通过 Java + kafka-clients (2.1.0) 生成有关上述主题的消息:java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: 60000 毫秒后更新元数据失败。
String bootstrap_Server = "x.x.x.x:port"
private static KafkaProducer<String, String> producer = null;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_Server);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
producer = new KafkaProducer<String, String>(props);
kafkaInTopicName = "TestTopic";
Key = "123123";
value = "{ValidJsonData}";
producer.send(new ProducerRecord<String, String>(kafkaInTopicName, key, value)).get(); ```
解决方案
从线路来看props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_Server);
,String bootstrap_Server = "x.x.x.x"
您的引导服务器似乎不包含该端口,因此它应该是“xxxx:9092”,其中“xxxx”是集群中某个 Kafka 代理的 IP 地址。
推荐阅读
- python - 在 Pandas 列中输入列表值
- github - 如何定义 GitHub 操作?
- javascript - HTML span 标签自动滚动
- python - 将字符串与大句子进行比较并获得相似度百分比
- python-3.x - 如何在 Python 中将函数参数作为字典中的键传递
- postgresql - Aurora Postgres 和 DynamoDb 之间的数据迁移解决方案
- qt - 更新独立的应用程序
- javascript - 可以使用 Webpack 一次导入一个模块并让方法可用于多个模块
- python - 使用 HDF5 格式将 pandas 数据帧写入 S3
- c - 打印文件中的字符但省略括号中的字符