apache-flink - 如何使用 Flink 中的内存数据创建可刷新的表以进行连接?
问题描述
我有一个依赖 Table API 的 Flink 应用程序。我确实有一个创建表格的 Kafka 主题。然后,我们为 IP 地址列表和一些元数据信息维护一个 S3 对象。
我们还想在这个 S3 对象上创建一个表。S3 对象路径是静态的,不会更改,但我可以覆盖 S3 对象,并且我想用新数据刷新此表。
基本上,我有一个从 S3 对象读取的内存集合。如何最有效地创建表并加入 Kafka 表?当 S3 对象有更新时,应刷新表。
解决方案
如果您使用FileSystem SQL 连接器创建由 S3 对象支持的表,它可能会满足您的需求。但是请注意,文件系统源尚未完全开发,您可能会遇到影响您的用例的限制。
您可以改为使用StreamExecutionEnvironment#readFile
( docs ),并将DataStream
其生成的转换为表格。请注意,如果您使用readFile
while usingFileProcessingMode.PROCESS_CONTINUOUSLY
模式读取文件,然后修改该文件,则将重新摄取整个文件。
推荐阅读
- asp.net - blazor 项目中的自定义 AuthenticationStateProvider 在服务器端不起作用
- openstreetmap - Openstreetmap - 从给定城市检索附近有大学的城市
- python - CDLIB:NMI(归一化互信息)功能不起作用,有人可以帮忙吗?
- azure - Strimzi 无法调整 PV 的大小
- firebase - 如何将当前 DateTime 添加到 firestore 文档?
- pandas - 如何在带有熊猫的 csv 文件中输出带间距的列?
- centos7 - 如何解决错误:无法初始化 NSS 库?
- angular - 使用 Angular 的 Excel 插件:找不到 dist/functions.json
- spring - spring security OAuth2中如何生成client-id和client-secret
- python - 使用 river.stream.iter_csv() 循环数据集的 MemoryError