apache-kafka - 如何确定在 kafka 流中完成的会话
问题描述
我被卡在 kafka 流中,无法使用 DSL 处理场景。有人可以帮忙吗。
场景:我有一个主题 timeOff,它有一个 key timeOffId 和 object 类型的值。对象还包含代表该员工休假的员工 ID。因此,一名员工可以有多次休假。
TimeOffs
timeoff1 {status:PENDING, employee: 1}
timeoff2 {status:PENDING, employee: 2}
timeoff3 {status:PENDING, employee: 3}
timeoff1 {status:APPROVED, employee: 1}
timeoff5 {status:PENDING, employee: 2}
timeoff3 {status:APPROVED, employee: 3}
timeoff6 {status:PENDING, employee: 1}
timeoff7 {status:PENDING, employee: 1}
timeoff8 {status:PENDING, employee: 2}
我想要如下所示的结果,以便员工只能有他的待定休假:
employee1: [timeoff6, timeoff7] //as timeoff1 is already approved so don't need this now.
employee2: [timeoff2, timeoff5, timeoff8] //as all timeoffs for employee2 are pending
employee3: [] //No pending timeoffs
我该怎么做。我开始做类似下面的代码的事情,但我不知道我是否以正确的方式做这件事。
我不需要代码,只是建议我通过 kafka 流 DSL 执行此操作的正确/好的方法。谢谢你。在下面的示例中,我正在流式传输主题,并按employeeId 对时间进行分组。但在那种情况下,我如何获得更新的暂停状态。我很困惑。任何人都可以帮忙。
KStream<String, TimeOff> source = builder.stream(topic);
KTable<String, ArrayList<TimeOff>> newStore = source.groupBy((k, v) -> v.getEmployeeId())
.aggregate(ArrayList::new,
(key, value, aggregate) -> {
aggregate.add(value);
return aggregate;
}, Materialized.<String, ArrayList<TimeOff>, KeyValueStore<Bytes, byte[]>>as("NewStore").withValueSerde(new TimeOffListSerde(new TimeOffSerde())));
解决方案
我认为最好的方法是使用处理器 API。
您应该实现您的自定义org.apache.kafka.streams.processor.Processor
. Processor
将有状态存储以保持TimeOffs
待处理状态,当状态到达时Timeoff
,APPROVED
将删除来自状态存储的条目。
它会是这样的:
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
public class CustomProcessor implements Processor<String, Timeoff> {
protected KeyValueStore<String, List<Timeoff>> stateStore;
private String storeName;
public CustomProcessor(String storeName) {
this.storeName = storeName;
}
@Override
public void init(ProcessorContext context) {
stateStore = (KeyValueStore<String, List<Timeoff>>) context.getStateStore(storeName);
}
@Override
public void process(String employeeId, Timeoff timeoff) {
List<Timeoff> newTimeoffs = Optional.ofNullable(stateStore.get(employeeId)).map(timeoffs -> {
if ("APPROVED".equals(timeoff.getStatus()))
timeoffs.remove(employeeId);
else
timeoffs.add(timeoff);
return timeoffs;
}).orElse(Collections.singletonList(timeoff));
stateStore.put(employeeId, newTimeoffs);
}
...
}
推荐阅读
- javascript - 为什么此代码更改字符串但又恢复到原始状态
- odoo - 如何禁用其他用户的更改密码选项?
- typescript - Typescript assumes the property type to be T | Properties["key"] instead of just T
- c++ - 就地创建和填充数组
- antlr4 - 如何解析无法转换为解析器规则的长词法规则的标记?
- hybris - 基于从 FlexQuery 获得的 PK 导入 Hybris Impex
- python-3.x - Python,将字典转换为逗号分隔值
- model - 如何使用 OR-Tools for python 描述目标大于零
- autodesk-forge - 模型衍生 API 对象 ID 与 PropertyDatabase 对象 ID 不匹配
- python - 从字符串中删除小数,但在 Python 中保留整数