首页 > 解决方案 > 在过滤器方法中使用随机后修改的火花数据

问题描述

    val list = List("A", "B", "B", "B", "C", "C", "C", "C", "C")
    val df1 = sc.parallelize(list).toDF("key").withColumn("feature", lit("u4")).select("feature", "key").groupBy(col("feature"), col("key")).count()
    val df2 = sc.parallelize(list).toDF("key").withColumn("feature", lit("u5")).select("feature", "key").groupBy(col("feature"), col("key")).count()
    val df = df1.unionAll(df2)
    df.show

    val udf_sample = udf { (x: String) => {
      Math.random() < 0.3
    }
    }
    df.filter(udf_sample(col("feature"))).show()

输出是:

+-------+---+-----+
|feature|key|count|
+-------+---+-----+
|     u4|  A|    1|
|     u4|  B|    3|
|     u4|  C|    5|
|     u5|  A|    1|
|     u5|  B|    3|
|     u5|  C|    5|
+-------+---+-----+

+-------+---+-----+
|feature|key|count|
+-------+---+-----+
|     u4|  A|    1|
|     u4|  C|    2|
|     u5|  A|    1|
|     u5|  B|    1|
|     u5|  C|    2|
+-------+---+-----+

请注意,在第二个数据框中,计数列值与第一个数据框中的值不同。虽然我希望随机过滤一些行。

标签: apache-spark

解决方案


它在分组依据之前过滤特征。Spark 是惰性的,因此在找到操作(打印、计数)之前不会执行任何转换(过滤器、组)。有一篇关于这个的好文章here

当找到一个动作时,Spark 会尝试优化要应用的转换,因此它会首先运行过滤器以尝试减少需要处理的数据量:

虽然缓存不是一个动作,但它也可以解决问题,因为当找到一个动作时,它会计算 DAG 直到缓存,然后将结果用于进一步计算

您可以缓存df到 groupBy 之后执行的过滤器

输入

df.show
+-------+---+-----+
|feature|key|count|
+-------+---+-----+
|     u4|  C|    5|
|     u4|  A|    1|
|     u4|  B|    3|
|     u5|  C|    5|
|     u5|  A|    1|
|     u5|  B|    3|
+-------+---+-----+

定义 UDF

scala> val udf_sample = udf { (x: String) => {
     |       Math.random() < 0.3
     |     }
     |     }
udf_sample: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,BooleanType,Some(List(StringType)))

输出

df.cache

df.filter(udf_sample(col("feature"))).show()
+-------+---+-----+
|feature|key|count|
+-------+---+-----+
|     u4|  C|    5|
|     u4|  B|    3|
|     u5|  C|    5|
+-------+---+-----+

推荐阅读