首页 > 解决方案 > 如何确定在 kafka 流中完成的会话

问题描述

我被卡在 kafka 流中,无法使用 DSL 处理场景。有人可以帮忙吗。

场景:我有一个主题 timeOff,它有一个 key timeOffId 和 object 类型的值。对象还包含代表该员工休假的员工 ID。因此,一名员工可以有多次休假。

TimeOffs 

timeoff1 {status:PENDING, employee: 1}
timeoff2 {status:PENDING, employee: 2}
timeoff3 {status:PENDING, employee: 3}
timeoff1 {status:APPROVED, employee: 1}
timeoff5 {status:PENDING, employee: 2}
timeoff3 {status:APPROVED, employee: 3}
timeoff6 {status:PENDING, employee: 1}
timeoff7 {status:PENDING, employee: 1}
timeoff8 {status:PENDING, employee: 2}

我想要如下所示的结果,以便员工只能有他的待定休假:

employee1: [timeoff6, timeoff7] //as timeoff1 is already approved so don't need this now.
employee2: [timeoff2, timeoff5, timeoff8] //as all timeoffs for employee2 are pending
employee3: [] //No pending timeoffs

我该怎么做。我开始做类似下面的代码的事情,但我不知道我是否以正确的方式做这件事。

我不需要代码,只是建议我通过 kafka 流 DSL 执行此操作的正确/好的方法。谢谢你。在下面的示例中,我正在流式传输主题,并按employeeId 对时间进行分组。但在那种情况下,我如何获得更新的暂停状态。我很困惑。任何人都可以帮忙。

KStream<String, TimeOff> source = builder.stream(topic);
KTable<String, ArrayList<TimeOff>> newStore = source.groupBy((k, v) -> v.getEmployeeId())
        .aggregate(ArrayList::new,
                (key, value, aggregate) -> {
                    aggregate.add(value);
                    return aggregate;
                }, Materialized.<String, ArrayList<TimeOff>, KeyValueStore<Bytes, byte[]>>as("NewStore").withValueSerde(new TimeOffListSerde(new TimeOffSerde())));

标签: apache-kafkaapache-kafka-streams

解决方案


我认为最好的方法是使用处理器 API。

您应该实现您的自定义org.apache.kafka.streams.processor.Processor. Processor将有状态存储以保持TimeOffs待处理状态,当状态到达时TimeoffAPPROVED将删除来自状态存储的条目。

它会是这样的:

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

public class CustomProcessor implements Processor<String, Timeoff> {

    protected KeyValueStore<String, List<Timeoff>> stateStore;
    private String storeName;

    public CustomProcessor(String storeName) {
        this.storeName = storeName;
    }

    @Override
    public void init(ProcessorContext context) {
        stateStore = (KeyValueStore<String, List<Timeoff>>) context.getStateStore(storeName);
    }

    @Override
    public void process(String employeeId, Timeoff timeoff) {
        List<Timeoff> newTimeoffs = Optional.ofNullable(stateStore.get(employeeId)).map(timeoffs -> {
            if ("APPROVED".equals(timeoff.getStatus()))
                timeoffs.remove(employeeId);
            else
                timeoffs.add(timeoff);
            return timeoffs;
        }).orElse(Collections.singletonList(timeoff));
        stateStore.put(employeeId, newTimeoffs);
    }

    ...
}

推荐阅读