stream - 在 Apache Flink 中的两个流上进行左连接的正确方法
问题描述
我正在使用 Apache Flink 开发欺诈检测系统,但我是初学者并且被困在这个问题上:
我想从两个流中进行左连接,一个包含当前交易,另一个包含与银行的验证交易,在那里我可以找到是否存在诸如被盗卡等错误。所以我需要加入他们以了解是否一张卡过去曾被拒绝。
DataStream<Card> currentDataStream = getCardsStream(env, Parameters.CURRENT_SOCKET)
.keyBy((card) -> card.getCardID);
DataStream<Card> historicDataStream = getCardsStream(env, Parameters.HISTORIC_SOCKET)
.keyBy((card) -> card.getCardID());
我现在正在做的是一个 RichCoFlatMapFunction,它会在每次historyDataStream 到达时更新一个名为historyList 的列表状态,并返回一个包含当前卡的元组和一个包含该ID 的所有连接事件的列表:
public class LeftJoin extends RichCoFlatMapFunction<Card, Card, Tuple2<Card, List<Card>> > {
private ValueState<Card> currentValueState;
private ListState<Card> historicListState;
@Override
public void open(Configuration parameters) throws Exception {
currentValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("Current State", Card.class));
historicListState = getRuntimeContext().getListState(new ListStateDescriptor<>("historic state", Card.class));
}
@Override
public void flatMap1(Card currentCard, Collector<Tuple2<Card, List<Card>>> out) throws Exception {
Iterable<Card> historicCardList = historicListState.get();
//If there is a coincidence
if (Iterables.size(historicCardList) > 0) {
out.collect(new Tuple2<>(currentCard, Lists.newArrayList(historicCardList) ));
} else {
currentValueState.update(currentCard);
//Returning null if there are no cards for the Id
out.collect(new Tuple2<>(currentCard, null));
}
}
@Override
public void flatMap2(Card historicCard, Collector<Tuple2<Card, List<Card>>> out) throws Exception {
historicListState.add(historicCard); //Updates the historicListState
}
}
问题是,List<Card>
当我想根据包含的卡片检查规则时,这给我带来了很多麻烦,因为它总是会再次获得所有卡片,并且我需要一种方法来标记我已经根据我的规则处理过的卡片,某事像这样:
//I don't like this list because it always gets me all the join coincidences
for (Card card : historicList) {
//Comparar cada regla del Broadcast state con el error que contiene el elemento card
if (rule.getBankDecision().equals(card.getErrors())) {
//Evaluate some rules
for (Long stateEventTime : windowState.keys()) {
if (isStateValueInWindow(stateEventTime, windowStartForEvent, System.currentTimeMillis())) {
aggregateValuesInState(stateEventTime, aggregator);
}
}
}
有没有更好的方法来获取加入的卡片作为流?
解决方案
我希望我理解你正确,如果不是请纠正我。
private ValueState<Card> currentValueState
是冗余的(在这个例子中你只更新它而不读取它的值)- 如果我理解正确,问题是你在整个historyListState 上发出你的规则系统,你已经检查了其中一些。为什么不从historyListState 中删除已经超过规则的卡片呢?