java - 如何将 Flink 中的时间窗口保存到文本文件中?
问题描述
我开始在 Java 中的 ApacheFlink 中工作。
我的目标是在一分钟的时间窗口内使用 ApacheKafka 主题,这将应用非常基本的信息并将每个窗口的结果记录在一个文件中。
到目前为止,我设法对收到的内容应用了文本转换简化,我应该使用 apply 或 process 来写入文件,结果我有点迷失了窗口。
到目前为止,这是我的代码
package myflink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.time.ZoneId;
import java.util.Date;
import java.util.Properties;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.shaded.akka.org.jboss.netty.channel.ExceptionEvent;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import scala.util.parsing.json.JSONObject;
public class BatchJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
properties.setProperty("auto.offset.reset", "latest");
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("topic-basic-test", new SimpleStringSchema(), properties);
DataStream<String> data = env.addSource(consumer);
data.flatMap(new JSONparse()).timeWindowAll(Time.minutes(1))."NEXT ??" .print()
System.out.println("Hola usuario 2");
env.execute("Flink Batch Java API Skeleton");
}
public static class JSONparse implements FlatMapFunction<String, Tuple2<String, String>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, String>> collector) throws Exception {
System.out.println(s);
s = s + "ACA PODES JUGAR NDEAH";
collector.collect(new Tuple2<String,String>("M",s));
}
}
}
解决方案
如果您希望每个一分钟窗口的结果转到其自己的文件,您可以查看使用StreamingFileSink
一分钟存储桶——它应该可以满足您的需求,或者非常接近。
我认为您实际上最终会为每个窗口创建一个目录,该目录包含来自窗口的每个并行实例的文件——但是当您使用timeWindowAll
不并行运行的 时,每个存储桶将只有一个文件,除非结果太大以至于文件翻滚。
顺便说一句,在 FlatMap 中进行 JSON 解析会表现得相当糟糕,因为这最终会为每个事件实例化一个新的解析器,这反过来又会导致大量的 GC 活动。最好使用 RichFlatMap 并在 open() 方法中创建一个解析器,您可以为每个事件重用该解析器。甚至更好的是,使用 aJSONKeyValueDeserializationSchema
而不是 a SimpleStringSchema
,并让 kafka 连接器为您处理 json 解析。
推荐阅读
- monaco-editor - Monaco-editor:如何自定义建议小部件面板
- javascript - 如果resolve(...) 在setTimeout 内,await promise 如何解决?
- python - 将 dict 的 Python 列表转换为 2D 列表
- python - 如何循环遍历具有不同索引的不同字符串并每次反转它们的 item[i] 和 item[i+1]?
- vue.js - Vue - 同时轮询多个组件的相同 API 端点的最佳方式
- java - @JmsListener 不能与 @EnableAsync 一起使用
- node.js - socket.io 在生产中使用 node.js 并做出反应
- java - 替换 Intstream 中的特定元素并将 IntStream 转换为 String
- sockets - (Python)我该怎么做才能停止收到此错误?[WinError 10013] 试图以访问权限禁止的方式访问套接字
- java - 在 Webview 中下载 pdf 不起作用(错误只能下载 HTTP/HTTPS Uri: Blob)