apache-kafka - 在 HDP Sandbox 2.6.5 中从 localhost 生产到 Kafka 不起作用
问题描述
我将 Kafka 客户端生产者写为:
public class BasicProducerExample {
public static void main(String[] args){
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//props.put(ProducerConfig.
props.put("batch.size","16384");// maximum size of message
Producer<String, String> producer = new KafkaProducer<String, String>(props);
TestCallback callback = new TestCallback();
Random rnd = new Random();
for (long i = 0; i < 2 ; i++) {
//ProducerRecord<String, String> data = new ProducerRecord<String, String>("dke", "key-" + i, "message-"+i );
//Topci and Message
ProducerRecord<String, String> data = new ProducerRecord<String, String>("dke", ""+i);
producer.send(data, callback);
}
producer.close();
}
private static class TestCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.out.println("Error while producing message to topic :" + recordMetadata);
e.printStackTrace();
} else {
String message = String.format("sent message to topic:%s partition:%s offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
System.out.println(message);
}
}
}
}
输出:向主题生成消息时出错:null org.apache.kafka.common.errors.TimeoutException:60000 毫秒后更新元数据失败。
注意:代理端口:localhost:6667 正在工作。
解决方案
我在 Hortonworks(HDP 2.X 版本)安装上使用 Apache Kafka。遇到的错误消息意味着 Kafka 生产者无法将数据推送到段日志文件。从命令行控制台,这意味着两件事:
- 您为经纪人使用了不正确的端口
- server.properties 中的侦听器配置不起作用
如果您在通过 scala api 编写时遇到错误消息,请另外检查与 kafka 集群的连接使用telnet <cluster-host> <broker-port>
注意:如果您使用 scala api 创建主题,代理需要一些时间才能了解新创建的主题。因此,在主题创建后,生产者可能会因错误而失败Failed to update metadata after 60000 ms.
为了解决这个问题,我做了以下检查:
我通过 Ambari 检查后的第一个区别是 Kafka 代理6667
在 HDP 2.x 上侦听端口(apache kafka 使用 9092)。
listeners=PLAINTEXT://localhost:6667
接下来,使用 ip 而不是 localhost。我执行了netstat -na | grep 6667
tcp 0 0 192.30.1.5:6667 0.0.0.0:* LISTEN
tcp 1 0 192.30.1.5:52242 192.30.1.5:6667 CLOSE_WAIT
tcp 0 0 192.30.1.5:54454 192.30.1.5:6667 TIME_WAIT
因此,我修改了生产者调用以使用 IP 而不是 localhost:
./kafka-console-producer.sh --broker-list 192.30.1.5:6667 --topic rdl_test_2
要监控是否有新记录正在写入,请监控/kafka-logs
文件夹。
cd /kafka-logs/<topic name>/
ls -lart
-rw-r--r--. 1 kafka hadoop 0 Feb 10 07:24 00000000000000000000.log
-rw-r--r--. 1 kafka hadoop 10485756 Feb 10 07:24 00000000000000000000.timeindex
-rw-r--r--. 1 kafka hadoop 10485760 Feb 10 07:24 00000000000000000000.index
一旦生产者成功写入,段日志文件00000000000000000000.log
的大小就会增加。
请参阅以下尺寸:
-rw-r--r--. 1 kafka hadoop 10485760 Feb 10 07:24 00000000000000000000.index
-rw-r--r--. 1 kafka hadoop **45** Feb 10 09:16 00000000000000000000.log
-rw-r--r--. 1 kafka hadoop 10485756 Feb 10 07:24 00000000000000000000.timeindex
此时,您可以运行 consumer-console.sh:
./kafka-console-consumer.sh --bootstrap-server 192.30.1.5:6667 --topic rdl_test_2 --from-beginning
response is hello world
在此步骤之后,如果您想通过 Scala API 生成消息,则更改listeners
值(从 localhost 到公共 IP)并通过 Ambari 重新启动 Kafka 代理:
listeners=PLAINTEXT://192.30.1.5:6667
样品生产者如下:
package com.scalakafka.sample
import java.util.Properties
import java.util.concurrent.TimeUnit
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}
class SampleKafkaProducer {
case class KafkaProducerConfigs(brokerList: String = "192.30.1.5:6667") {
val properties = new Properties()
val batchsize :java.lang.Integer = 1
properties.put("bootstrap.servers", brokerList)
properties.put("key.serializer", classOf[StringSerializer])
properties.put("value.serializer", classOf[StringSerializer])
// properties.put("serializer.class", classOf[StringDeserializer])
properties.put("batch.size", batchsize)
// properties.put("linger.ms", 1)
// properties.put("buffer.memory", 33554432)
}
val producer = new KafkaProducer[String, String](KafkaProducerConfigs().properties)
def produce(topic: String, messages: Iterable[String]): Unit = {
messages.foreach { m =>
println(s"Sending $topic and message is $m")
val result = producer.send(new ProducerRecord(topic, m)).get()
println(s"the write status is ${result}")
}
producer.flush()
producer.close(10L, TimeUnit.MILLISECONDS)
}
}
希望这可以帮助某人。
推荐阅读
- ios - canOpenURL 当返回 false 时打印出不必要的错误消息
- javers - 分布式系统上 Javers 中的缓存机制
- javascript - html javascript jquery 选择单击选项并加载
- vba - 通过在 excel 电子表格中查找特定单词来填充用户表单中的列表框
- c# - c# selenium firefox 打开新标签不起作用
- c# - c#调用c++ dll传输struct数组发生异常
- python-3.x - 复杂的python3 csv刮板
- php - 从 API 回调 url 中查找值
- python - 如何获取类型(c1)-> 列表,而不是类型 str
- facebook - Facebook 如何提交 App Review 以获得 user_events 权限