首页 > 解决方案 > Flink 使用 KeyedCoProcessFunction 连接流

问题描述

对于 1:1 加入,我使用 KeyedCoProcessFunction,我有两个流,查找流(每秒 100 条记录)和点击流流(每秒 10000 条记录)。在processElement2方法中,我正在寻找关键MapState<Long,Row>,如果找到,用它来丰富点击流数据,否则将此记录设置为侧输出,然后将侧输出下沉到 kafka。我没有在两个输入流上使用任何窗口。对于 kakfa 中的 dlq 主题,我不断地看到每秒产生 1-2 条记录,我怎么能processElement2在将其推送到侧面输出之前以某种方式等待方法中的查找 id 几毫秒。

val joinStream = lookDataStream.keyBy(row -> row.<Long>getFieldAs("id"))
            .connect(clickDataStream.keyBy(row -> row.<Long>getFieldAs("lookupid")))
            .process(new EnrichJoinFunction());
public static class EnrichJoinFunction
      extends KeyedCoProcessFunction<Long, Row, Row, Row> {


    final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};

    private MapState<Long, Row> map = null;

    @Override
    public void open(Configuration parameters) throws Exception {
      val MapStateDescriptor =
          new MapStateDescriptor<Long, Row>(
              "state",
              TypeInformation.of(Long.class),
              TypeInformation.of(new TypeHint<Row>() {}));
      MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
      /*MapStateDescriptor.setQueryable("test");*/
      map = getRuntimeContext().getMapState(MapStateDescriptor);
    }

    @Override
    public void processElement1(
        Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
        throws Exception {
      log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
      val id = lookupRow.<Long>getFieldAs("id");
      if (!map.contains(id)) {
        map.put(id, lookupRow);
      }
    }

    @Override
    public void processElement2(
        Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
        throws Exception {
      log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));

      val id = clickRow.<Long>getFieldAs("id");

      if (map.contains(id)) {
          // enrich join
          val joinRow = join(clickRow, map.get(id));
          out.collect(joinRow);
      } else {
        // lookup entry not yet arrived, send it to side output - dlq
        ctx.output(outputTag, clickRow);
      }
    }

    public Row join(Row clickRow, Row lookupRow) throws ParseException {
      Row joinedRow = new Row(RowKind.INSERT, 13);
      // row setter join ouput
      return joinedRow;
    }
}}

标签: apache-flinkflink-streaming

解决方案


您可以使用TimerService来实现这一点。

因此,想法是将没有立即匹配的查找数据的点击流行存储在专用MapState<Long,Row> 和注册processingTimeTimer/eventTimeTimer计时器中,该计时器将在一段时间后触发。在计时器回调时,您可以尝试在那里加入查找数据和点击流数据。如果再次找不到匹配项,则最后将此单击事件发送到侧面输出。

它可能如下所示:

public static class EnrichJoinFunction
      extends KeyedCoProcessFunction<Long, Row, Row, Row> {


    final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};

    private MapState<Long, Row> map = null;
    private MapState<Long, Row> clickstreamState = null;

    @Override
    public void open(Configuration parameters) throws Exception {
      MapStateDescriptor<Long, Row> MapStateDescriptor =
          new MapStateDescriptor<Long, Row>(
              "state",
              TypeInformation.of(Long.class),
              TypeInformation.of(new TypeHint<Row>() {}));
      MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
      /*MapStateDescriptor.setQueryable("test");*/
      map = getRuntimeContext().getMapState(MapStateDescriptor);

      MapStateDescriptor<Long, Row> clickstreamStateMapStateDescriptor =
          new MapStateDescriptor<Long, Row>(
              "clickstreamState",
              TypeInformation.of(Long.class),
              TypeInformation.of(new TypeHint<Row>() {}));
      clickstreamState MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.minutes(1)).build());
      clickstreamState = getRuntimeContext().getMapState(clickstreamStateMapStateDescriptor);
    }

    @Override
    public void processElement1(
        Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
        throws Exception {
      log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
      Long id = lookupRow.<Long>getFieldAs("id");
      if (!map.contains(id)) {
        map.put(id, lookupRow);
      }

      // join immediately any matching click events, waiting for counterpart
      if (clickstreamState.contains(id)) {
          // enrich join
          Row joinRow = join(clickstreamState.get(id), lookupRow);
          out.collect(joinRow);
          clickstreamState.remove(id)
      } 
    }

    @Override
    public void processElement2(
        Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
        throws Exception {
      log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));

      Long id = clickRow.<Long>getFieldAs("id");

      if (map.contains(id)) {
          // enrich join
          Row joinRow = join(clickRow, map.get(id));
          out.collect(joinRow);
      } else {
        // put in state and check in 1 second
        clickstreamState.put(id, clickRow)
        Long currTimestamp = ctx.timestamp()
        ctx.timerService().registerProcessingTimeTimer(currTimestamp + 1000)
      }
    }

    public Row join(Row clickRow, Row lookupRow) throws ParseException {
      Row joinedRow = new Row(RowKind.INSERT, 13);
      // row setter join ouput
      return joinedRow;
    }

    @Override
    public void onTimer(
      Long timestamp,
      KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out
    ) {
       Long id = ctx.getCurrentKey
       Row clickRow = clickstreamState.get(id)
       if (map.contains(id)) {
          // enrich join
          val joinRow = join(clickRow, map.get(id));
          out.collect(joinRow);
       } else {
          // lookup entry not arrived even in 1 second, send it to side output - dlq
          ctx.output(outputTag, clickRow);
       }
       clickstreamState.remove(id)
  }
}}

推荐阅读