首页 > 解决方案 > How the number of Tasks and Partitions is set when using MemoryStream?

问题描述

I'm trying to understand a strange behavior that I observed in my Spark structure streaming application that is running in local[*] mode.

I have 8 core on my machines. While the majority of my Batches have 8 partitions, every once in a while I get 16 or 32 or 56 and so on partitions/Tasks. I notice that it is always a multiple of 8. I have notice in opening the stage tab, that when it happens, it is because there is multiple LocalTableScan.

That is if I have 2 LocalTableScan then the mini-batch job, will have 16 task/partition and so on.

I mean it could well do two scans, combine the two batches and feed it to the mini-batch job. However no it results in a mini-batch job that the number of tasks = number of core * number of scan.

Here is how I set my MemoryStream:

val rows = MemoryStream[Map[String,String]]
val df = rows.toDF()
val rdf = df.mapPartitions{ it => {.....}}(RowEncoder.apply(StructType(List(StructField("blob", StringType, false)))))

I have a future that feeds my memory stream as such, right after:

Future {
    blocking {
      for (i <- 1 to 100000) {
        rows.addData(maps)
        Thread.sleep(3000)
      }
    }
  }

and then my query:

rdf.writeStream.
    trigger(Trigger.ProcessingTime("1 seconds"))
    .format("console").outputMode("append")
    .queryName("SourceConvertor1").start().awaitTermination()

I wonder why the numbers of Tasks varies ? How is it supposed to be determined by Spark ?

标签: scalaapache-sparkspark-structured-streaming

解决方案


推荐阅读