首页 > 解决方案 > Spark-shell 不允许查询结构化流

问题描述

我正在关注Spark the Definitive Guide一书 以下代码使用 spark-shell 在本地执行

过程:在没有任何其他选项的情况下启动 spark-shell

val static = spark.read.json("/part-00079-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json")
val dataSchema = static.schema
val streaming = spark.readStream.schema(dataSchema) .option("maxFilesPerTrigger",1).json("/part-00079-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json")
val activityCounts = streaming.groupBy("gt").count()
val activityQuery  = activityCounts.writeStream.queryName("activity_counts").format("memory").outputMode("complete").start()
activityQuery.awaitTermination()

书上说

执行此代码后,流计算将在后台启动

……

现在这个流正在运行,我们可以通过查询来试验结果

我的观察

执行此代码时,它不会释放 shell 让我输入命令,例如

`spark.streams.active`

因此我无法查询此流

我的研究

我试图打开一个新的 spark-shell,但在该 shell 中查询不会返回任何结果。从这个 shell 获得的流是否可以从另一个 shell 实例访问。

编辑1:

我想要内存中的表,以便我可以使用命令查询

for( i <- 1 to 5)
{
spark.sql("SELECT * FROM activity_counts").show()
Thread.sleep(1000)
}

标签: apache-sparkspark-structured-streaming

解决方案


几点:

1) 确保为 Spark-shell 分配了足够数量的核心

运行 Streaming / Structured Streaming 应用程序需要至少 2 个内核,以防止出现饥饿情况,即当启动流式应用程序时,1 个内核将分配给Receiver,如果您启动只有 1 个内核的 spark 应用程序,将没有内核可用于为执行者处理收到的消息。

要检查 spark-shell 中的内核数:

spark.conf.get("spark.master")

使用 4 核启动 spark-shell

spark-shell --master local[4]

2)您正在将流写入内存,这不会在控制台中显示输出,要显示您必须注册表然后查询。

相反,您可以将格式从内存更改为控制台以查看控制台中的内容。

val activityQuery  = activityCounts.writeStream.queryName("activity_counts").format("console").outputMode("complete").start()

推荐阅读