首页 > 解决方案 > 在 Flink Kafka Producer 中发送密钥

问题描述

我是 Flink Stream 处理的新手,需要 Flink Kafka 生产者的一些帮助,因为经过一段时间的搜索后找不到与之相关的太多东西。我目前正在从 Kafka 主题读取流,然后在执行一些计算后,我想将其写入 Kafka 中的新单独主题。但我面临的问题是我无法将密钥发送到 Kafka 主题。我正在使用 Flink Kafka 连接器,它为我提供了 FlinkKafkaConsumer 和 FlinkKafkaProducer。以下是我的代码的更详细信息,我可以在我的代码中更改它可以工作的内容,目前在 Kafka 上,我正在生成我的消息在 Key 中使用 null ,因为值是我需要的:

Properties consumerProperties = new Properties();
    
    consumerProperties.setProperty("bootstrap.servers", serverURL);
    consumerProperties.setProperty("group.id", groupID);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(consumerTopicName,
            new SimpleStringSchema(), consumerProperties);

    kafkaConsumer.setStartFromEarliest();
    DataStream<String> kafkaConsumerStream = env.addSource(kafkaConsumer);
    final int[] tVoteCount = {0};
    
    DataStream<String> kafkaProducerStream = kafkaConsumerStream.map(new MapFunction<String, String>() {
        @Override
        public String map(String value) throws InterruptedException, IOException {
            JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
            Tcount = Tcount + jsonNode.get(key1).asInt();
            int nameCandidate = jsonNode.get(key2).asInt();
            System.out.println(Tcount);
            String tCountT = Integer.toString(Tcount);
            //tVoteCount = tVoteCount + voteCount;
             
            //waitForEventTime(timeStamp);
            return tCountT;
        }
    });
    kafkaConsumerStream.print();
    System.out.println("sdjknvksjdnv"+Tcount);
    Properties producerProperties = new Properties();
    producerProperties.setProperty("bootstrap.servers", serverURL);
    FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(producerTopicName,
            new SimpleStringSchema(), producerProperties);
    kafkaProducerStream.addSink(kafkaProducer);
    env.execute();

谢谢。

标签: apache-flinkkafka-producer-api

解决方案


在此博客中,您将找到一个有关如何将 key 和 topic 写入主题的示例:

您需要用new FlinkKafkaProducer以下内容替换您创建的 a :

FlinkKafkaProducer<KafkaRecord> kafkaProducer = 
  new FlinkKafkaProducer<KafkaRecord>(
    producerTopicName, 
    ((record, timestamp) -> new ProducerRecord<byte[], byte[]>(producerTopicName, record.key.getBytes(), record.value.getBytes())), 
    producerProperties
  );

推荐阅读