首页 > 解决方案 > Flink Input 输出记录时序

问题描述

我有这个管道:KafkaProducer -> Topic1 -> FlinkConsumer -> Topic2 -> KafkaConsumer

我正在尝试提取管道每个阶段的记录时间:

在 Flink java 应用程序中,我做了这样的事情:

inputstream.

                // To calculate flink input time
                map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                System.out.printf("source time : %d\n",System.nanoTime());
                writeDataLineByLine("flinkinput_data.csv",-1,System.nanoTime());
                return s;
            }
        }).

                // Process
                map(new MapFunction<String, String>() {
            @Override
            public String map(String record) throws InterruptedException {
                for(int i=0;i<2;i++)
                    Thread.sleep(1);
                return record + " mapped";
            }
        }).

                // To calculate flink output time
                map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                System.out.printf("sink time : %d\n",System.nanoTime());
                writeDataLineByLine("flinkoutput_data.csv",-1,System.nanoTime());
                return s;
            }
        }).
                addSink(producer);

虽然这在 Intellij 的迷你集群中工作,但它不适用于独立集群。有人可以向我解释为什么打印和写入 csv 行被忽略了吗?

标签: apache-flinkflink-streaming

解决方案


无论任务管理器向标准输出写入什么,都会进入每个任务管理器节点上 Flink 日志目录中的文件。


推荐阅读