apache-spark - Spark-shell 不允许查询结构化流
问题描述
我正在关注Spark the Definitive Guide一书 以下代码使用 spark-shell 在本地执行
过程:在没有任何其他选项的情况下启动 spark-shell
val static = spark.read.json("/part-00079-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json")
val dataSchema = static.schema
val streaming = spark.readStream.schema(dataSchema) .option("maxFilesPerTrigger",1).json("/part-00079-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json")
val activityCounts = streaming.groupBy("gt").count()
val activityQuery = activityCounts.writeStream.queryName("activity_counts").format("memory").outputMode("complete").start()
activityQuery.awaitTermination()
书上说
执行此代码后,流计算将在后台启动
……
现在这个流正在运行,我们可以通过查询来试验结果
我的观察:
执行此代码时,它不会释放 shell 让我输入命令,例如
`spark.streams.active`
因此我无法查询此流
我的研究
我试图打开一个新的 spark-shell,但在该 shell 中查询不会返回任何结果。从这个 shell 获得的流是否可以从另一个 shell 实例访问。
编辑1:
我想要内存中的表,以便我可以使用命令查询
for( i <- 1 to 5)
{
spark.sql("SELECT * FROM activity_counts").show()
Thread.sleep(1000)
}
解决方案
几点:
1) 确保为 Spark-shell 分配了足够数量的核心
运行 Streaming / Structured Streaming 应用程序需要至少 2 个内核,以防止出现饥饿情况,即当启动流式应用程序时,1 个内核将分配给Receiver,如果您启动只有 1 个内核的 spark 应用程序,将没有内核可用于为执行者处理收到的消息。
要检查 spark-shell 中的内核数:
spark.conf.get("spark.master")
使用 4 核启动 spark-shell
spark-shell --master local[4]
2)您正在将流写入内存,这不会在控制台中显示输出,要显示您必须注册表然后查询。
相反,您可以将格式从内存更改为控制台以查看控制台中的内容。
val activityQuery = activityCounts.writeStream.queryName("activity_counts").format("console").outputMode("complete").start()
推荐阅读
- apache - Apache 服务器错误:在此服务器上找不到请求的 URL /ena/home/home.php
- git - 将 GitHub 存储库镜像到 VPN 后面的 GitLab 存储库
- python - 我可以用 python 读取 Windows 应用程序 (c++) 的给定变量吗?
- flutter -
fetch 调用返回 'Snippet' 的实例 - twilio - Twilio - 如果未将其标记为失败,如何检查收集的语音结果是否具有预期值
- javascript - JavaScript 的 Map.has() 在计算大 O 时算作循环吗?
- javascript - 带有亚洲字符的 Javascript TextDecoder 中的解码如何工作?
- javascript - 将 IStreamResult 转换为 RxJs Observable
- reactjs - react js react-boostrap切换菜单按钮不起作用
- r - 用 R 将向量对象的名称转换为向量列表的标签