首页 > 解决方案 > 如何编写脚本/代码来阻止 apache kafka 消费者使用消息?

问题描述

我需要编写一个 Java 代码来启动 kafka 消费者使用消息。一旦消费者从命令行启动。但不确定阻止消费者处理的标准方法。

在我的本地 Windows 机器上,我编写了简单的独立生产者和消费者。现在我是否要阻止独立使用者使用不同的代码/脚本进行进一步处理。` private static final String TOPIC = "conftest"; 私有最终静态字符串 BOOTSTRAP_SERVERS = "localhost:9092";

private static Consumer<String, String> createConsumer() {
    final Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    // Create the consumer using props.
    consumer = new KafkaConsumer<>(props);
    // Subscribe to the topic.
    consumer.subscribe(Collections.singletonList(TOPIC));
    return consumer;
}

public static void main(String args[]) throws Exception {
    runConsumer();
}

private static void runConsumer(){
    Consumer<String, String> consumer = createConsumer();
    try{
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : consumerRecords) {
                System.out.printf("partition = %d, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value()); 
            }
            consumer.commitAsync();
        }
    }catch(WakeupException wue){
        System.out.println("Wake Up Exception Occured");
        wue.printStackTrace();
    }
    catch(Exception e){
        e.printStackTrace();
    }finally {
        consumer.close();
    }
}`

标签: javaapache-kafkakafka-consumer-api

解决方案


使用close方法关闭KafkaConsumerJust 调用consumer.close();

公共无效关闭()

关闭消费者,等待最多 30 秒的默认超时以进行任何需要的清理。如果启用了自动提交,这将在默认超时内尽可能提交当前偏移量。有关详细信息,请参阅 close(long, TimeUnit)。请注意,wakeup() 不能用于中断关闭。

public void close(long timeout, java.util.concurrent.TimeUnit timeUnit)指定时间

尝试在指定的超时时间内干净地关闭消费者。此方法等待消费者完成挂起的提交并离开组的超时。如果启用了自动提交,这将在超时内尽可能提交当前偏移量。如果消费者无法在超时到期之前完成偏移提交并优雅地离开组,则消费者将被强制关闭。请注意,wakeup() 不能用于中断关闭。


推荐阅读