首页 > 技术文章 > spark 广播变量

learn-bigdata 2019-04-30 11:11 原文

Spark广播变量

使用广播变量来优化,广播变量的原理是:

在每一个Executor中保存一份全局变量,task在执行的时候需要使用和这一份变量就可以,极大的减少了Executor的内存开销。

Executor中task在执行的时候如果使用到了广播变量,会找Executor里面的BlockManager来获取广播变量。

如果BlockManager中没有这个关闭变量,会从driver端拉取关闭变量。

在Driver端也有一个blockManagerMaster,其他的task执行的时候直接使用blockmanager中的广播变量就可以。

package SparkStreaming;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.broadcast.Broadcast;

import java.util.Arrays;
import java.util.List;

public class BroadCast {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setMaster("local")
                .setAppName("BroadCast");
        JavaSparkContext sc = new JavaSparkContext(conf);
        /*
        * 使用广播变量,广播变量的定义必须在driver端,因为sc没有被序列化不能被发送到Executor端
        * */
        Broadcast<String> blackname = sc.broadcast("dwj3");
        List<String> name = Arrays.asList(
                "dwj1",
                "dwj2",
                "dwj3");
        //String blackName = "dwj3";
        JavaRDD<String> nameRDD = sc.parallelize(name);
        JavaRDD<String> namefilter = nameRDD.filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String s) throws Exception {
                String blacknames = blackname.getValue();
                return !blacknames.equals(s);
            }
        });
        List<String> lastname = namefilter.collect();
        for(String str:lastname){
            System.out.println(str);
        }
    }
}

注意:在声明广播变量的时候,必须在driver端,因为sc没有被序列化,是不能被发送到Executor端的。

推荐阅读