apache-flink - Flink Input 输出记录时序
问题描述
我有这个管道:KafkaProducer -> Topic1 -> FlinkConsumer -> Topic2 -> KafkaConsumer
我正在尝试提取管道每个阶段的记录时间:
在 Flink java 应用程序中,我做了这样的事情:
inputstream.
// To calculate flink input time
map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
System.out.printf("source time : %d\n",System.nanoTime());
writeDataLineByLine("flinkinput_data.csv",-1,System.nanoTime());
return s;
}
}).
// Process
map(new MapFunction<String, String>() {
@Override
public String map(String record) throws InterruptedException {
for(int i=0;i<2;i++)
Thread.sleep(1);
return record + " mapped";
}
}).
// To calculate flink output time
map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
System.out.printf("sink time : %d\n",System.nanoTime());
writeDataLineByLine("flinkoutput_data.csv",-1,System.nanoTime());
return s;
}
}).
addSink(producer);
虽然这在 Intellij 的迷你集群中工作,但它不适用于独立集群。有人可以向我解释为什么打印和写入 csv 行被忽略了吗?
解决方案
无论任务管理器向标准输出写入什么,都会进入每个任务管理器节点上 Flink 日志目录中的文件。