scala - How the number of Tasks and Partitions is set when using MemoryStream?
问题描述
I'm trying to understand a strange behavior that I observed in my Spark structure streaming application that is running in local[*]
mode.
I have 8 core on my machines. While the majority of my Batches have 8 partitions, every once in a while I get 16 or 32 or 56 and so on partitions/Tasks. I notice that it is always a multiple of 8. I have notice in opening the stage tab, that when it happens, it is because there is multiple LocalTableScan.
That is if I have 2 LocalTableScan then the mini-batch job, will have 16 task/partition and so on.
I mean it could well do two scans, combine the two batches and feed it to the mini-batch job. However no it results in a mini-batch job that the number of tasks = number of core * number of scan.
Here is how I set my MemoryStream:
val rows = MemoryStream[Map[String,String]]
val df = rows.toDF()
val rdf = df.mapPartitions{ it => {.....}}(RowEncoder.apply(StructType(List(StructField("blob", StringType, false)))))
I have a future that feeds my memory stream as such, right after:
Future {
blocking {
for (i <- 1 to 100000) {
rows.addData(maps)
Thread.sleep(3000)
}
}
}
and then my query:
rdf.writeStream.
trigger(Trigger.ProcessingTime("1 seconds"))
.format("console").outputMode("append")
.queryName("SourceConvertor1").start().awaitTermination()
I wonder why the numbers of Tasks varies ? How is it supposed to be determined by Spark ?
解决方案
推荐阅读
- groovy - 使用来自环境变量的 url 在 Micronaut 中注入 RxHttpClient
- python - 将整数数组更改为字符串的替代方法(不带逗号)
- python - 你怎么能得到由curve_fit产生的拟合函数的y误差?
- c# - ac#异步方法是否必须使用await
- python - Matlab function into python
- javascript - 使用 API 使用 javascript 转换货币
- angular - Angular 通过 cookie 维护会话 API 调用
- python - OSError: [WinError 10048] 通常只允许使用一种套接字地址(协议/网络地址/端口)
- curl - 卷曲上传到 Google/Shopify 失败,出现 400 错误和“格式错误的多部分正文”
- entity-framework - 如何查询 m-to-m 关系 EF Core?