java - flink 1.13.1 Kafka producer报错ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
问题描述
我正在尝试使用 flink kafka 生产者,如下所示
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
public static FlinkKafkaProducer<SelfDescribingMessageDO> createProducer() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "<Server details>");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
return new FlinkKafkaProducer<>(
"FlinkSdmKafkaTopic",
new SerializationSchema("FlinkSdmKafkaTopic", 8),
props,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
}
private static class SerializationSchema implements KafkaSerializationSchema<SelfDescribingMessageDO> {
final String topic;
final int numPartitions;
public SerializationSchema(final String topic, final int numPartitions) {
this.topic = topic;
this.numPartitions = numPartitions;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(SelfDescribingMessageDO sdm, @Nullable Long aLong) {
return new ProducerRecord<>(topic,
KafkaPublisher.getPartitionId(sdm.getHashKey(), numPartitions),
sdm.getHashKey().getBytes(StandardCharsets.UTF_8),
sdm.toByteArray());
}
}
尝试部署 flink 作业时出现以下异常。在单元测试期间,我没有收到此错误。
2021-08-23T14:47:55.504Z WARN runtime.taskmanager.Task Source: MetricSource -> Filter -> MetricStoreMapper -> (Filter -> Timestamps/Watermarks -> Map -> Flat Map, Sink: FlinkKafkaProducer11, Sink: TSDBSink14) (5/8)#0 transitionState:1069 Source: MetricSource -> Filter -> MetricStoreMapper -> (Filter -> Timestamps/Watermarks -> Map -> Flat Map, Sink: FlinkKafkaProducer11, Sink: TSDBSink14) (5/8)#0 (5764a387ede7d6710bcf3ad4e2222248) switched from INITIALIZING to FAILED with failure cause: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:436)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
我在应用程序代码中添加了 flink 连接器 kafka 作为我的依赖项。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.1</version>
</dependency>
我的 pom 中只有flink-connector-kafka
非测试依赖项(对于 kafka)。
ubuntu@vrni-platform:~$ mvn dependency:tree | grep -i kafka
[INFO] +- org.apache.flink:flink-connector-kafka_2.12:jar:1.13.1:compile
[INFO] | +- org.apache.kafka:kafka-clients:jar:2.4.1:compile
[INFO] +- org.apache.kafka:kafka_2.12:jar:2.4.1:test
[INFO] +- org.apache.flink:flink-connector-kafka_2.12:test-jar:tests:1.13.1:test
[INFO] +- net.mguenther.kafka:kafka-junit:jar:2.4.0:test
[INFO] | +- org.apache.kafka:kafka_2.11:jar:2.4.0:test
[INFO] | +- org.apache.kafka:kafka_2.11:jar:test:2.4.0:test
[INFO] | +- org.apache.kafka:kafka-clients:jar:test:2.4.0:test
[INFO] | +- org.apache.kafka:connect-api:jar:2.4.0:test
[INFO] | +- org.apache.kafka:connect-json:jar:2.4.0:test
[INFO] | \- org.apache.kafka:connect-runtime:jar:2.4.0:test
[INFO] | +- org.apache.kafka:kafka-tools:jar:2.4.0:test
[INFO] | | +- org.apache.kafka:kafka-log4j-appender:jar:2.4.0:test
[INFO] | +- org.apache.kafka:connect-transforms:jar:2.4.0:test
有人可以建议可能出了什么问题吗?
解决方案
可以参考两个思路
思路1 可能是依赖问题,如果集群flink/lib下已经有flink-connector-kafka.jar,那么将pom中kafka的connector依赖改成provider
思路2 和flink的类加载方式有关。flink-conf.yml中classloader.resolve-order的默认值child-first,可以尝试改成parent-first
推荐阅读
- javascript - 是否可以通过控制台检测注入的 Javascript?
- reactjs - React App 仅在第一次状态更改时重新渲染,即使更改成功,第二次更改也不可见
- javascript - 将数据从asp.net mvc中的2个不同jquery传递到同一个控制器
- python-3.x - Python3.8:将一个数据文件拆分为多个文件,所有文件均由文件中的关键字命名
- r - 上传多张人脸图像(5-10 张人脸图像,相同尺寸),更改为矩阵并在 R Shiny 中从头开始执行 PCA
- python - 在 Eclipse FileNotFoundError 上设置 Selenium 和 Python:[WinError 2]
- sql - 我如何才能对在 SQL 中保存为 nvarchar 的日期进行排序
- css - 使用媒体查询的 Bootstrap 列顺序有问题
- javascript - 为什么手动计算字符串中的字符索引返回-1?
- r - 标记组的第一个(或第 n 个)观察