java - 如何编写脚本/代码来阻止 apache kafka 消费者使用消息?
问题描述
我需要编写一个 Java 代码来启动 kafka 消费者使用消息。一旦消费者从命令行启动。但不确定阻止消费者处理的标准方法。
在我的本地 Windows 机器上,我编写了简单的独立生产者和消费者。现在我是否要阻止独立使用者使用不同的代码/脚本进行进一步处理。` private static final String TOPIC = "conftest"; 私有最终静态字符串 BOOTSTRAP_SERVERS = "localhost:9092";
private static Consumer<String, String> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
// Create the consumer using props.
consumer = new KafkaConsumer<>(props);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
}
public static void main(String args[]) throws Exception {
runConsumer();
}
private static void runConsumer(){
Consumer<String, String> consumer = createConsumer();
try{
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
for (ConsumerRecord<String, String> record : consumerRecords) {
System.out.printf("partition = %d, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
}catch(WakeupException wue){
System.out.println("Wake Up Exception Occured");
wue.printStackTrace();
}
catch(Exception e){
e.printStackTrace();
}finally {
consumer.close();
}
}`
解决方案
使用close
方法关闭KafkaConsumer
Just 调用consumer.close();
公共无效关闭()
关闭消费者,等待最多 30 秒的默认超时以进行任何需要的清理。如果启用了自动提交,这将在默认超时内尽可能提交当前偏移量。有关详细信息,请参阅 close(long, TimeUnit)。请注意,wakeup() 不能用于中断关闭。
public void close(long timeout, java.util.concurrent.TimeUnit timeUnit)指定时间
尝试在指定的超时时间内干净地关闭消费者。此方法等待消费者完成挂起的提交并离开组的超时。如果启用了自动提交,这将在超时内尽可能提交当前偏移量。如果消费者无法在超时到期之前完成偏移提交并优雅地离开组,则消费者将被强制关闭。请注意,wakeup() 不能用于中断关闭。