scala - 使用 Intellij IDEA 在 Maven 中使用 Scala 的 Spark 工作流和调度框架
问题描述
我用 Scala 创建了一个 spark 项目。它是一个在 POM 中配置了所有依赖项的 Maven 项目。
Spark 我用作 ETL。源是API生成的文件,spark中的各种转换然后将其加载到cassandra。
是否有任何 Workflow 软件,可以使用 jar 来通过电子邮件触发、成功或失败工作流来自动化流程。
可能有人请帮助我.....气流是否可以用于此目的,我使用的是SCALA 而不是 Python
请分享您的想法。
解决方案
Spark 中没有内置机制可以提供帮助。对于您的情况,cron 工作似乎是合理的。如果您发现自己不断向计划作业添加依赖项,请尝试 Azkaban
shell脚本的一个这样的例子是:-
#!/bin/bash
cd /locm/spark_jobs
export SPARK_HOME=/usr/hdp/2.2.0.0-2041/spark
export HADOOP_CONF_DIR=/etc/hadoop/conf
export HADOOP_USER_NAME=hdfs
export HADOOP_GROUP=hdfs
#export SPARK_CLASSPATH=$SPARK_CLASSPATH:/locm/spark_jobs/configs/*
CLASS=$1
MASTER=$2
ARGS=$3
CLASS_ARGS=$4
echo "Running $CLASS With Master: $MASTER With Args: $ARGS And Class Args: $CLASS_ARGS"
$SPARK_HOME/bin/spark-submit --class $CLASS --master $MASTER --num-executors 4 --executor-cores 4 "application jar file"
您甚至可以尝试使用 spark-launcher,它可用于以编程方式启动 spark 应用程序:-
首先创建一个示例 spark 应用程序并为其构建一个 jar 文件。
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object SparkApp extends App{
val conf=new SparkConf().setMaster("local[*]").setAppName("spark-app")
val sc=new SparkContext(conf)
val rdd=sc.parallelize(Array(2,3,2,1))
rdd.saveAsTextFile("result")
sc.stop()
}
这是我们简单的 spark 应用程序,使用 sbt 程序集制作这个应用程序的 jar,现在我们制作一个 scala 应用程序,通过它我们启动这个 spark 应用程序,如下所示:
import org.apache.spark.launcher.SparkLauncher
object Launcher extends App {
val spark = new SparkLauncher()
.setSparkHome("/home/knoldus/spark-1.4.0-bin-hadoop2.6")
.setAppResource("/home/knoldus/spark_launcher-assembly-1.0.jar")
.setMainClass("SparkApp")
.setMaster("local[*]")
.launch();
spark.waitFor();
}
在上面的代码中,我们使用 SparkLauncher 对象并为其设置值
setSparkHome(“/home/knoldus/spark-1.4.0-bin-hadoop2.6”) 用于设置spark home,内部用于调用spark submit。
.setAppResource(“/home/knoldus/spark_launcher-assembly-1.0.jar”) 用于指定我们的 spark 应用程序的 jar。
.setMainClass(“SparkApp”) spark程序即驱动程序的入口点。
.setMaster(“local[*]”) 设置master的地址,现在我们在本地机器上运行它。
.launch() 只是启动我们的 spark 应用程序。
这是一个最低要求,您还可以设置许多其他配置,例如传递参数、添加 jar、设置配置等。
推荐阅读
- sql - 使用新表 id 更新事务表数据
- android - 将多个 JSON 对象转换为单个 JSON 对象
- python - 按下 tkinter 按钮自动更新标签
- c++11 - 运行 ska::flat_hash_map 时,std::hash 没有名为“hash_policy”的类型
- algorithm - 在 golang 中的结构之间共享信息
- c# - 如何链接曲线
- c# - DbSet 之间的区别
属性和集合 EF Core 中的 () 函数? - azure - MS 应用程序 - 不使用 /common 端点支持多个租户
- javascript - 使用相同的表单创建和编辑数据(Angular5)编辑数据时清除验证器
- java - 谷歌地图标记选项大小