首页 > 解决方案 > 使用 Apache Camel 的 Kafka 消费者

问题描述

我是 Apache 骆驼的新手。我们正在做 POC 以使用 Camel 开发 kafka 消费者。下面是示例代码。

    context.addRoutes(new RouteBuilder(){

      @Override
        public void configure() throws Exception {
            // TODO Auto-generated method stub

         from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}" 
                     + "&consumersCount={{consumer.consumersCount}}" 
                     + "&seekTo={{consumer.seekTo}}" 
                     + "&groupId={{consumer.group}}")
             .process(new Processor() {

                @Override
                public void process(Exchange exchange) throws Exception {

                    Message message = exchange.getIn();
                    Object data = message.getBody();

                    System.out.println(data);
                }
             })
             .to("seda:end");


  });

        context.start();

    ConsumerTemplate template=context.createConsumerTemplate();
    String info=template.receiveBody("seda:end",String.class);

    System.out.println(info);
}

我有以下问题:

  1. 上下文在启动后立即停止。
  2. 如果我使用消费者模板轮询到端点,它不会打印任何内容,而在 .process() 内部,当我在无限循环中启动上下文时,我可以打印 kafka 消息。为什么消费者模板无法打印。

标签: javaapache-kafkaapache-camel

解决方案


  1. 正如克劳斯已经评论的那样,您的骆驼上下文正在立即关闭,因为它没有阻塞。请参阅他评论中的链接。
  2. 我认为您错过了template.start();启动消费者的机会。有关示例,请参见此链接。

推荐阅读