首页 > 解决方案 > 需要从 Kakfa 主题消费并动态提供 cron 表达式(从 kafka 主题接收)到quartz2

问题描述

问题陈述 - 我需要从一个 kafka 主题中消费,我将 cron 表达式作为标题,需要触发 QUARTZ 并需要动态提供 cron 表达式。

这是我的代码片段 -

from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}......")
    .routeId(routeId)
    //accessing all the header value and then forming a valid cron expression
    .process(new Processor() {
        @Override 
        public void process(Exchange exchange) throws Exception {
            String sec=exchange.getIn().getHeader("second",String.class);
            String min=exchange.getIn().getHeader("minute",String.class);
            String hours=exchange.getIn().getHeader("hours",String.class);       
            body=exchange.getIn().getBody();
            log.info("The original Body : {}",body);
            // Method call to prepare the cron expression based on the headers value
            cronExp=cb.getCronExperssion(sec,min,hours,"?","*","*","*");
            //Setting up the final cron expression to the header    
            exchange.getIn().setHeader("timerObject", timerObj); 
            log.info("The timer object values are: {}", timerObj);
            log.info("The original scheduled time is : {}",startPolicy.getRouteStartTime());
        }
    })
    // because the quartz2 route can’t be at the to side, it should always be a from() that's why i am using the mock:result here, Any better way ???
    .to("mock:result");

// Here I need to give the cron expression which is being stored in the headers.
from("quartz2://group1/trigger1?cron=0/20+*+*+?+*+*+*&stateful=true")
    .routeId(“Scheduler Route”)
    .log(“Scheduler routed started …”)
    .to(“direct:httpRoute”);

我尝试了多种方法,例如丰富 ()、PollEnrich,但无法从标头值应用 cron 表达式。请帮忙 ....

谢谢迪帕克

标签: dynamicapache-kafkacronapache-camelquartz-scheduler

解决方案


推荐阅读