apache-flink - Flink 使用 KeyedCoProcessFunction 连接流
问题描述
对于 1:1 加入,我使用 KeyedCoProcessFunction,我有两个流,查找流(每秒 100 条记录)和点击流流(每秒 10000 条记录)。在processElement2
方法中,我正在寻找关键MapState<Long,Row>
,如果找到,用它来丰富点击流数据,否则将此记录设置为侧输出,然后将侧输出下沉到 kafka。我没有在两个输入流上使用任何窗口。对于 kakfa 中的 dlq 主题,我不断地看到每秒产生 1-2 条记录,我怎么能processElement2
在将其推送到侧面输出之前以某种方式等待方法中的查找 id 几毫秒。
val joinStream = lookDataStream.keyBy(row -> row.<Long>getFieldAs("id"))
.connect(clickDataStream.keyBy(row -> row.<Long>getFieldAs("lookupid")))
.process(new EnrichJoinFunction());
public static class EnrichJoinFunction
extends KeyedCoProcessFunction<Long, Row, Row, Row> {
final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};
private MapState<Long, Row> map = null;
@Override
public void open(Configuration parameters) throws Exception {
val MapStateDescriptor =
new MapStateDescriptor<Long, Row>(
"state",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<Row>() {}));
MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
/*MapStateDescriptor.setQueryable("test");*/
map = getRuntimeContext().getMapState(MapStateDescriptor);
}
@Override
public void processElement1(
Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
val id = lookupRow.<Long>getFieldAs("id");
if (!map.contains(id)) {
map.put(id, lookupRow);
}
}
@Override
public void processElement2(
Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));
val id = clickRow.<Long>getFieldAs("id");
if (map.contains(id)) {
// enrich join
val joinRow = join(clickRow, map.get(id));
out.collect(joinRow);
} else {
// lookup entry not yet arrived, send it to side output - dlq
ctx.output(outputTag, clickRow);
}
}
public Row join(Row clickRow, Row lookupRow) throws ParseException {
Row joinedRow = new Row(RowKind.INSERT, 13);
// row setter join ouput
return joinedRow;
}
}}
解决方案
您可以使用TimerService来实现这一点。
因此,想法是将没有立即匹配的查找数据的点击流行存储在专用MapState<Long,Row>
和注册processingTimeTimer/eventTimeTimer
计时器中,该计时器将在一段时间后触发。在计时器回调时,您可以尝试在那里加入查找数据和点击流数据。如果再次找不到匹配项,则最后将此单击事件发送到侧面输出。
它可能如下所示:
public static class EnrichJoinFunction
extends KeyedCoProcessFunction<Long, Row, Row, Row> {
final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};
private MapState<Long, Row> map = null;
private MapState<Long, Row> clickstreamState = null;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<Long, Row> MapStateDescriptor =
new MapStateDescriptor<Long, Row>(
"state",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<Row>() {}));
MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
/*MapStateDescriptor.setQueryable("test");*/
map = getRuntimeContext().getMapState(MapStateDescriptor);
MapStateDescriptor<Long, Row> clickstreamStateMapStateDescriptor =
new MapStateDescriptor<Long, Row>(
"clickstreamState",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<Row>() {}));
clickstreamState MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.minutes(1)).build());
clickstreamState = getRuntimeContext().getMapState(clickstreamStateMapStateDescriptor);
}
@Override
public void processElement1(
Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
Long id = lookupRow.<Long>getFieldAs("id");
if (!map.contains(id)) {
map.put(id, lookupRow);
}
// join immediately any matching click events, waiting for counterpart
if (clickstreamState.contains(id)) {
// enrich join
Row joinRow = join(clickstreamState.get(id), lookupRow);
out.collect(joinRow);
clickstreamState.remove(id)
}
}
@Override
public void processElement2(
Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));
Long id = clickRow.<Long>getFieldAs("id");
if (map.contains(id)) {
// enrich join
Row joinRow = join(clickRow, map.get(id));
out.collect(joinRow);
} else {
// put in state and check in 1 second
clickstreamState.put(id, clickRow)
Long currTimestamp = ctx.timestamp()
ctx.timerService().registerProcessingTimeTimer(currTimestamp + 1000)
}
}
public Row join(Row clickRow, Row lookupRow) throws ParseException {
Row joinedRow = new Row(RowKind.INSERT, 13);
// row setter join ouput
return joinedRow;
}
@Override
public void onTimer(
Long timestamp,
KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out
) {
Long id = ctx.getCurrentKey
Row clickRow = clickstreamState.get(id)
if (map.contains(id)) {
// enrich join
val joinRow = join(clickRow, map.get(id));
out.collect(joinRow);
} else {
// lookup entry not arrived even in 1 second, send it to side output - dlq
ctx.output(outputTag, clickRow);
}
clickstreamState.remove(id)
}
}}
推荐阅读
- blockchain - 如何使用以太坊私有网络交换代币?
- php - 如何替换数组中的数组关键字
- python - 在迭代列表时,一旦满足条件,如何用数字填充剩余部分?
- azure - 如何建立从 Azure 功能到本地或 AWS SQS 队列的私有连接?
- mysql - 如何使用按放置日期分组返回没有任何订单的天数
- validation - 如果用户名已经存在,则 Virtuemart 注册表单每次都会重置整个表单
- reporting-services - SSRS 中的聚合总和
- r - 重命名因子水平而不参考因子名称
- c - 修改输入的 gmock 模拟函数
- linux - 在 bash 中使用 if 后预期的二元运算符