首页 > 解决方案 > Run a function parallely in Scala

问题描述

I have a Spark SQL function which generates temp file in HDFS directory. I want to print all the directory and files as the function is running.

So Here is the function:

spark.sql(s"INSERT INTO ${table} VALUES ....")

And while the function/query is running, I want to see the files generated under the HDFS directory. Since the files are temporary, I want to list out the directory several times as the query is running.

FileSystem.get( sc.hadoopConfiguration ).listStatus( new Path("hdfs:///mypath")).foreach( x => println(x.getPath ))

I am new to Scala programming and can't really find a way to run this parallely.

标签: scalaapache-sparkparallel-processing

解决方案


Sure. You would wrap that spark.sql(query) in a scala.concurrent.Future[Unit].

import scala.concurrent.ExecutionContext.Implicits.global

val insert = scala.concurrent.Future {
  spark.sql(s"INSERT INTO ${table} VALUES ....")
} // begins to work immediately

Then while it executes, you can see the files it creates.

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

val fs = FileSystem.get(sc.hadoopConfiguration)
val path = new Path("hdfs:///mypath")
while(!insert.isCompleted){
 Thread.sleep(1000) // Sleep to limit how often your message prints
 fs.listStatus(path).foreach(x => println(x.getPath))
}

Keep in mind you'll be looking at the whole list of files each time.


推荐阅读