scala - Flink:流拓扑中没有定义操作符。不能执行
问题描述
我正在尝试设置一个非常基本的 flink 作业。当我尝试运行时,出现以下错误:
Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1535)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:53)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.test.flink.jobs.TestJobRunnable$.run(TestJob.scala:223)
该错误是由以下代码引起的:
val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val messageStream = streamExecutionEnvironment.addSource(kafkaConsumer)
messageStream.keyBy(_ => "S")
streamExecutionEnvironment.execute("Test Job")
print()
当我将调用添加到流的末尾时,错误消失了:
val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val messageStream = streamExecutionEnvironment.addSource(kafkaConsumer)
messageStream.keyBy(_ => "S")
messageStream.print()
streamExecutionEnvironment.execute("Test Job")
我很困惑为什么要print()
解决这个问题。在引入接收器之前,流拓扑不会处理其任何运算符的想法吗?print()
在这里充当水槽吗?任何帮助,将不胜感激。谢谢。
解决方案
在编程语言理论中,惰性求值或按需调用是一种求值策略,它将表达式的求值延迟到需要它的值为止,并且还避免了重复求值。惰性求值的反面是急切求值,有时也称为严格求值。惰性求值的好处包括:
- 将控制流(结构)定义为抽象而不是原语的能力。
- 定义潜在无限数据结构的能力。这允许更直接地实现某些算法。
- 通过避免不必要的计算和在评估复合表达式时避免错误条件来提高性能。
延迟评估可以减少内存占用,因为值是在需要时创建的。然而,惰性求值很难与异常处理和输入/输出等命令性特性结合起来,因为操作的顺序变得不确定。
通常,Flink 将操作分为两类:转换操作和接收器操作。正如您所猜测的,Flink 转换是惰性的,这意味着它们在调用 sink 操作之前不会执行。
Flink 程序是在分布式集合上实现转换(例如,过滤、映射、更新状态、加入、分组、定义窗口、聚合)的常规程序。集合最初是从源创建的(例如,通过从文件、Kafka 主题或本地、内存中的集合中读取)。结果通过接收器返回,例如,可以将数据写入(分布式)文件或标准输出(例如,命令行终端)。
推荐阅读
- rest - 如何使用 REST API 将文件和附件上传到 sobject 记录?
- jovo-framework - Jovo 部署无法找到 ask-clik
- swift - didRegisterForRemoteNotificationsWithDeviceToken 没有被触发 - 推送通知不起作用
- react-native - Expo共享链接不可点击
- java - 从 Java 8 中的对象列表中获取多个属性列表
- javascript - 程序错误时在外部模块中完成写入流
- javascript - 从 node.js 中的 .MHT 文件中提取图像
- javascript - 有没有办法将 React 组件代码显示为
tag?
- hpc - Singularity 容器中的文件所有权和权限
- vue.js - 如何从 vuejs 根路由器中删除#?