首页 > 解决方案 > Flink sink 从不执行

问题描述

我有一个程序将加密货币价格流式传输到 flink 管道中,并打印一个时间窗口的最高出价。

主.java

public class Main {
    private final static Logger log = LoggerFactory.getLogger(Main.class);
    private final static DateFormat dateFormat = new SimpleDateFormat("y-M-d H:m:s");
    private final static NumberFormat numberFormat = new DecimalFormat("#0.00");
    public static void main(String[] args) throws Exception {
        MultipleParameterTool multipleParameterTool = MultipleParameterTool.fromArgs(args);

        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecutionEnvironment.getConfig().setGlobalJobParameters(multipleParameterTool);
        streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        streamExecutionEnvironment.addSource(new GdaxSourceFunction())
        .name("Gdax Exchange Price Source")
        .assignTimestampsAndWatermarks(new WatermarkStrategy<TickerPrice>() {
            @Override
            public WatermarkGenerator<TickerPrice> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new BoundedOutOfOrdernessGenerator();
            }
        })
        .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(100)))
        .trigger(EventTimeTrigger.create())
        .reduce((ReduceFunction<TickerPrice>) (value1, value2) ->
                value1.getHighestBid() > value2.getHighestBid() ? value1 : value2)
        .addSink(new SinkFunction<TickerPrice>() {
             @Override
             public void invoke(TickerPrice value, Context context) throws Exception {
                 String dateString = dateFormat.format(context.timestamp());
                 String valueString = "$" + numberFormat.format(value.getHighestBid());
                 log.info(dateString + " : " + valueString);
             }
        }).name("Highest Bid Logger");

        streamExecutionEnvironment.execute("Gdax Highest bid window calculator");
    }

    /**
     * This generator generates watermarks assuming that elements arrive out of order,
     * but only to a certain degree. The latest elements for a certain timestamp t will arrive
     * at most n milliseconds after the earliest elements for timestamp t.
     */
    public static class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<TickerPrice> {

        private final long maxOutOfOrderness = 3500; // 3.5 seconds

        private long currentMaxTimestamp;

        @Override
        public void onEvent(TickerPrice event, long eventTimestamp, WatermarkOutput output) {
            currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // emit the watermark as current highest timestamp minus the out-of-orderness bound
            output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
        }
    }
}

GdaxSourceFunction.java

public class GdaxSourceFunction extends WebSocketClient implements SourceFunction<TickerPrice> {
    private static String URL = "wss://ws-feed.gdax.com";
    private static Logger log = LoggerFactory.getLogger(GdaxSourceFunction.class);
    private static String subscribeMsg = "{\n" +
            "    \"type\": \"subscribe\",\n" +
            "    \"product_ids\": [<productIds>],\n" +
            "    \"channels\": [\n" +
            //TODO: uncomment to re-enable order book tracking
            //"        \"level2\",\n" +
            "        {\n" +
            "            \"name\": \"ticker\",\n" +
            "            \"product_ids\": [<productIds>]\n" +
            "        }\n"+
            "    ]\n" +
            "}";
    SourceContext<TickerPrice> ctx;

    @Override
    public void run(SourceContext<TickerPrice> ctx) throws Exception {
        this.ctx = ctx;
        openConnection().get();
        while(isOpen()) {
            Thread.sleep(10000);
        }
    }

    @Override
    public void cancel() {

    }


    @Override
    public void onMessage(String message) {
        try {
            ObjectNode objectNode = objectMapper.readValue(message, ObjectNode.class);
            String type = objectNode.get("type").asText();
            if("ticker".equals(type)) {
                TickerPrice tickerPrice = new TickerPrice();
                String productId = objectNode.get("product_id").asText();
                String[] currencies = productId.split("-");
                tickerPrice.setFromCurrency(currencies[1]);
                tickerPrice.setToCurrency(currencies[0]);
                tickerPrice.setHighestBid(objectNode.get("best_bid").asDouble());
                tickerPrice.setLowestOffer(objectNode.get("best_ask").asDouble());
                tickerPrice.setExchange("gdax");
                String time = objectNode.get("time").asText();
                Instant instant = Instant.parse(time);
                ctx.collectWithTimestamp(tickerPrice, instant.getEpochSecond());
            }
            //log.info(objectNode.toString());
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }


    @Override
    public void onOpen(Session session) {
        super.onOpen(session);

        //Authenticate and ensure we can properly connect to Gdax Websocket
        //construct auth message with list of product ids

        StringBuilder productIds = new StringBuilder("");
        productIds.append("" +
                "\"ETH-USD\",\n" +
                "\"ETH-USD\",\n" +
                "\"BTC-USD\"");

        String subMsg = subscribeMsg.replace("<productIds>", productIds.toString());

        try {
            userSession.getAsyncRemote().sendText(subMsg).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String getUrl() {
        return URL;
    }
}

但从未调用 sink 函数。我已经验证了减速器正在执行(非常快,每 100 毫秒)。如果我删除窗口部分,只打印每条记录的出价,程序就可以工作。但是我已经遵循了所有关于窗口的教程,我发现我在这里所做的和教程中显示的没有区别。我不知道为什么 flink sink 不会在窗口模式下执行。

BoundedOutOfOrdernessGenerator我直接从本教程中复制了课程。它应该适用于我的用例。在 3600 毫秒内,我应该在日志中看到我的第一条记录,但我没有。我调试了程序,但 sink 函数永远不会执行。如果我删除这些行:

.assignTimestampsAndWatermarks(new WatermarkStrategy<TickerPrice>() {
            @Override
            public WatermarkGenerator<TickerPrice> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new BoundedOutOfOrdernessGenerator();
            }
        })
        .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(100)))
        .trigger(EventTimeTrigger.create())
        .reduce((ReduceFunction<TickerPrice>) (value1, value2) ->
                value1.getHighestBid() > value2.getHighestBid() ? value1 : value2)      

以便流创建代码如下所示:

streamExecutionEnvironment.addSource(new GdaxSourceFunction())
        .name("Gdax Exchange Price Source")
        .addSink(new SinkFunction<TickerPrice>() {
             @Override
             public void invoke(TickerPrice value, Context context) throws Exception {
                 String dateString = dateFormat.format(context.timestamp());
                 String valueString = "$" + numberFormat.format(value.getHighestBid());
                 log.info(dateString + " : " + valueString);
             }
        }).name("Highest Bid Logger");

接收器执行,但当然结果不是窗口化的,因此它们不适合我的用例。但这表明我的窗口逻辑有问题,但我不知道它是什么。

版本:

JDK 1.8 Flink 1.11.2

标签: apache-flink

解决方案


我相信这个问题的原因是您的自定义源生成的时间戳以秒为单位,而窗口持续时间始终以毫秒为单位。尝试改变

ctx.collectWithTimestamp(tickerPrice, instant.getEpochSecond());

ctx.collectWithTimestamp(tickerPrice, instant.getEpochMilli());

我还建议进行一些其他(基本上不相关)的更改。

streamExecutionEnvironment.addSource(new GdaxSourceFunction())
    .name("Gdax Exchange Price Source")
    .uid("source")
    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            .<TickerPrice>forBoundedOutOfOrderness(Duration.ofMillis(3500))
    )
    .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(100)))
    .reduce((ReduceFunction<TickerPrice>) (value1, value2) ->
            value1.getHighestBid() > value2.getHighestBid() ? value1 : value2)
    .uid("window")
    .addSink(new SinkFunction<TickerPrice>() { ... }
    .uid("sink")

请注意以下建议:

  • 删除BoundedOutOfOrdernessGenerator. 无需重新实现内置的有界无序水印生成器。
  • 移除窗口触发器。似乎没有必要覆盖默认触发器,如果​​你弄错了,它会导致问题。
  • 将 UID 添加到每个有状态运算符。如果您想在更改作业拓扑后对应用程序进行有状态升级,则将需要这些。(您当前的接收器不是有状态的,但向其添加 UID 不会有任何影响。)

推荐阅读