java - 在 KafkaProducer.send(message) 上,我收到“序列化 Avro 消息的异常错误”
问题描述
我正在使用 Avro 生成类。这是我在生产者中的代码看起来像
TweetInfo tweetInfo = TweetInfo.newBuilder()
.setTweetId(status.getId())
.setTweetCreatedAt(status.getCreatedAt().toString())
.setTweetMessage(status.getText())
.setUserId(user.getId())
.setUserCreatedAt(user.getCreatedAt().toString())
.setUserName(user.getName())
.setUserScreenName(user.getScreenName())
.build();
ProducerRecord<String, TweetInfo> data = new ProducerRecord(KafkaConstants.TOPIC, tweetInfo);
producer.send(data);
TweetInfo 是由 Avro 模式生成的类。当我运行程序时,我看到一个堆栈跟踪如下
2018-12-11 01:51:58.138 WARN 16244 --- [c Dispatcher[0]] o.i.service.kafka.TweetKafkaProducer : exception Error serializing Avro message
2018-12-11 01:51:59.162 ERROR 16244 --- [c Dispatcher[0]] i.c.k.s.client.rest.RestService : Failed to send HTTP request to endpoint: http://localhost:8081/subjects/twitterData-value/versions
java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) ~[na:1.8.0_152]
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) ~[na:1.8.0_152]
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[na:1.8.0_152]
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[na:1.8.0_152]
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[na:1.8.0_152]
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) ~[na:1.8.0_152]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[na:1.8.0_152]
at java.net.Socket.connect(Socket.java:589) ~[na:1.8.0_152]
at sun.net.NetworkClient.doConnect(NetworkClient.java:175) ~[na:1.8.0_152]
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) ~[na:1.8.0_152]
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) ~[na:1.8.0_152]
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242) ~[na:1.8.0_152]
at sun.net.www.http.HttpClient.New(HttpClient.java:339) ~[na:1.8.0_152]
at sun.net.www.http.HttpClient.New(HttpClient.java:357) ~[na:1.8.0_152]
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220) ~[na:1.8.0_152]
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156) ~[na:1.8.0_152]
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050) ~[na:1.8.0_152]
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984) ~[na:1.8.0_152]
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1334) ~[na:1.8.0_152]
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1309) ~[na:1.8.0_152]
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:178) [kafka-schema-registry-client-5.0.1.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:235) [kafka-schema-registry-client-5.0.1.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:326) [kafka-schema-registry-client-5.0.1.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:318) [kafka-schema-registry-client-5.0.1.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:313) [kafka-schema-registry-client-5.0.1.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:114) [kafka-schema-registry-client-5.0.1.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:153) [kafka-schema-registry-client-5.0.1.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79) [kafka-avro-serializer-5.0.1.jar:na]
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) [kafka-avro-serializer-5.0.1.jar:na]
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60) [kafka-clients-2.1.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:879) [kafka-clients-2.1.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:841) [kafka-clients-2.1.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:728) [kafka-clients-2.1.0.jar:na]
at org.interview.service.kafka.TweetKafkaProducer$1.onStatus(TweetKafkaProducer.java:95) [classes/:na]
at twitter4j.StatusStreamImpl.onStatus(StatusStreamImpl.java:75) [twitter4j-stream-4.0.6.jar:4.0.6]
at twitter4j.StatusStreamBase$1.run(StatusStreamBase.java:105) [twitter4j-stream-4.0.6.jar:4.0.6]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_152]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_152]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]
我有动物园管理员和卡夫卡在运行。我还需要运行模式注册表吗?如果是,那么是否有指南可以做到这一点?我找不到任何东西。
解决方案
无法向端点发送 HTTP 请求
Confluent Schema Registry Server 需要运行。您可能想尝试自己访问 HTTP 端点(请参阅下面的文档)。
不知道你是如何开始的,但你可以下载 Confluent OSS,将它解压到某个地方,然后在终端中,你会想要导航到解压文件夹的bin
位置并运行confluent start schema-registry
. 注意:这仅适用于 Linux。
或者,如果您想要“生产部署”配置,则需要先编辑文件etc
夹中的属性文件,然后使用各自的脚本运行 Zookeeper、Kafka 和 Registry。
文档:运行模式注册表
关于评论
当我尝试运行文章中的命令时,它给出了 bin 是无效命令的错误
$ bin/...
首先假设您已cd
进入confluent-x.x.x
提取的文件夹
顺便说一句,现有的 Kafka Connect项目与 Twitter API 交互。
推荐阅读
- build - 如何在 CentOS 8 上使用 EPEL 的 boost169-devel 正确构建?
- react-native - 当前的 Android SDK 支持是什么?- 世博会
- python - 如何在弹出窗口中保存我的复选框的值
- r - 在闪亮的服务器上运行闪亮的应用程序
- c# - Xamarin.Forms 编译绑定不适用于 DataTemplate
- oracle - 在 Oracle 存储过程中为参数添加前缀 N
- python - 如何在python中计算向量和矩阵之间的相似距离之前对数据进行归一化?
- javascript - React Shows 中的 Reducer:无法将 undefined 或 null 转换为对象
- android - 如何在android中的按钮单击上放置外部链接?
- javascript - 有没有办法通过node.js访问不是主要方法的Java类?