apache-flink - 在 Flink Kafka Producer 中发送密钥
问题描述
我是 Flink Stream 处理的新手,需要 Flink Kafka 生产者的一些帮助,因为经过一段时间的搜索后找不到与之相关的太多东西。我目前正在从 Kafka 主题读取流,然后在执行一些计算后,我想将其写入 Kafka 中的新单独主题。但我面临的问题是我无法将密钥发送到 Kafka 主题。我正在使用 Flink Kafka 连接器,它为我提供了 FlinkKafkaConsumer 和 FlinkKafkaProducer。以下是我的代码的更详细信息,我可以在我的代码中更改它可以工作的内容,目前在 Kafka 上,我正在生成我的消息在 Key 中使用 null ,因为值是我需要的:
Properties consumerProperties = new Properties();
consumerProperties.setProperty("bootstrap.servers", serverURL);
consumerProperties.setProperty("group.id", groupID);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(consumerTopicName,
new SimpleStringSchema(), consumerProperties);
kafkaConsumer.setStartFromEarliest();
DataStream<String> kafkaConsumerStream = env.addSource(kafkaConsumer);
final int[] tVoteCount = {0};
DataStream<String> kafkaProducerStream = kafkaConsumerStream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws InterruptedException, IOException {
JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
Tcount = Tcount + jsonNode.get(key1).asInt();
int nameCandidate = jsonNode.get(key2).asInt();
System.out.println(Tcount);
String tCountT = Integer.toString(Tcount);
//tVoteCount = tVoteCount + voteCount;
//waitForEventTime(timeStamp);
return tCountT;
}
});
kafkaConsumerStream.print();
System.out.println("sdjknvksjdnv"+Tcount);
Properties producerProperties = new Properties();
producerProperties.setProperty("bootstrap.servers", serverURL);
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(producerTopicName,
new SimpleStringSchema(), producerProperties);
kafkaProducerStream.addSink(kafkaProducer);
env.execute();
谢谢。
解决方案
在此博客中,您将找到一个有关如何将 key 和 topic 写入主题的示例:
您需要用new FlinkKafkaProducer
以下内容替换您创建的 a :
FlinkKafkaProducer<KafkaRecord> kafkaProducer =
new FlinkKafkaProducer<KafkaRecord>(
producerTopicName,
((record, timestamp) -> new ProducerRecord<byte[], byte[]>(producerTopicName, record.key.getBytes(), record.value.getBytes())),
producerProperties
);
推荐阅读
- python - 根据文本文件中的第一个字母对单词进行排序,python
- windows - 使用本机(Windows)桌面客户端在哪里存储 oauth2 访问/刷新令牌?
- git - 如何将分支或从另一个存储库提交到当前存储库?
- wpf - 缩放后着色器变得模糊
- graphql - 如何使用 graphql-tools 对 GraphQL 模式指令做出反应
- reporting-services - SSRS - 在每个组中为每个页面重复标题行
- excel - Excel检查多个单元格是否包含文本
- python - Sublime Text 4 中的虚拟环境
- spring - 使用 Scala 的 Spring Boot
- python - 在不使用 selenium 的情况下提取网络活动及其响应标头