java - 使用 Akka Kafka Streams 时不兼容的等式约束
问题描述
我正在尝试按照Akka Kafka Streams文档使用 Akka Kafka Streams。这是我的代码:
ConsumerSettings<byte[], ETLProcessMessage> consumerSettings = ConsumerSettings
.create(actorSystem, new ByteArrayDeserializer(), new KafkaJacksonSerializer<>(ETLProcessMessage.class))
.withBootstrapServers(kafkaServers)
.withGroupId(consumerGroupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer.committableSource(consumerSettings, Subscriptions.topics(topicName))
.mapAsync(3, msg -> CompletableFuture.supplyAsync(() -> {
handlePartitionedRequest(msg.record().value());
return Done.getInstance();
}))
.runWith(Sink.ignore(), materializer);
这是 KafkaJacksonSerializer 的代码:
import com.adaequare.mapro.common.exception.AppException;
import com.adaequare.mapro.config.jackson.PostConstructDeserializer;
import com.adaequare.mapro.model.transformer.JSONTransformer;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.io.CharStreams;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map;
public class KafkaJacksonSerializer<T> implements Serializer<T>, Deserializer<T>{
private ObjectReader objectReader;
private ObjectWriter objectWriter;
private ObjectMapper objectMapper;
public KafkaJacksonSerializer(){
}
public KafkaJacksonSerializer(Class<T> persistentClass) {
objectMapper = new ObjectMapper();
SimpleModule module = new SimpleModule();
module.setDeserializerModifier(new BeanDeserializerModifier() {
@Override
public JsonDeserializer<?> modifyDeserializer(DeserializationConfig config,
BeanDescription beanDesc, final JsonDeserializer<?> deserializer) {
return new PostConstructDeserializer(deserializer);
}
});
objectMapper.registerModule(module);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.setVisibility(objectMapper.getSerializationConfig().getDefaultVisibilityChecker()
.withFieldVisibility(JsonAutoDetect.Visibility.ANY)
.withGetterVisibility(JsonAutoDetect.Visibility.NONE)
.withIsGetterVisibility(JsonAutoDetect.Visibility.NONE)
.withSetterVisibility(JsonAutoDetect.Visibility.NONE)
.withCreatorVisibility(JsonAutoDetect.Visibility.NONE));
objectReader = objectMapper.readerFor(persistentClass);
objectWriter = objectMapper.writer();
}
@Override
public T deserialize(String topic, byte[] data) {
InputStream stream = new ByteArrayInputStream(data);
if(stream == null){
return null;
}
try {
String json = CharStreams.toString(new InputStreamReader(stream));
return objectReader.readValue(json);
} catch (IOException e) {
throw AppException.forException("Error while unmarshalling AssetData JSON: "+e.getMessage(), e);
}
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, T data) {
if(data == null){
return null;
}
try {
return objectWriter.writeValueAsBytes(data);
} catch (IOException e) {
throw AppException.forException("Error while marshalling JSON: "+e.getMessage(), e);
}
}
@Override
public void close() {
}
}
我不确定到底是什么问题。但是下面的代码没有显示任何错误:
ConsumerSettings newconsumerSettings = ConsumerSettings
.create(actorSystem, new ByteArrayDeserializer(), new StringDeserializer())
.withBootstrapServers(kafkaServers)
.withGroupId(consumerGroupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer.committableSource(newconsumerSettings, Subscriptions.topics("topic2"))
.mapAsync(3, msg -> CompletableFuture.supplyAsync(() -> Done.getInstance()))
.runWith(Sink.ignore(), materializer);
有人可以帮我确定这里出了什么问题吗?
解决方案
添加的依赖项之间存在 akka 版本不匹配。一旦我将它们更正为相同,我就不会再看到编译错误。
以下是我使用的依赖项:
compile 'com.typesafe.akka:akka-actor_2.12:2.5.4'
compile 'com.typesafe.akka:akka-cluster_2.12:2.5.4'
compile 'com.typesafe.akka:akka-cluster-tools_2.12:2.5.4'
compile 'com.typesafe.akka:akka-slf4j_2.12:2.5.4'
这是我为响应式 kafka 新添加的内容:
compile 'com.typesafe.akka:akka-stream-kafka_2.12:0.21'
将 akka(actor/cluster 相关)依赖项升级到 2.5.9 后,编译错误消失了。
推荐阅读
- reactjs - 如何在 React 中引用带有变量的组件?
- python - 使用 dlib 进行人脸地标检测
- javascript - forEach 方法的嵌套 console.log 函数中是否支持模板文字?
- c# - 通过 RTP 发布解码 H264
- python - 使用udf计算与pyspark数据框中特定值匹配的键值
- c++ - 防止 std::ifstream 创建空文件
- javascript - 尝试将音频/视频内容作为文件切片附加到媒体源的源缓冲区中
- excel - 在 DAX 中测量总计
- python - TypeError:不能在类似字节的对象上使用字符串模式——mimetypes.guess_type
- powershell - 获取一组用户的邮箱权限