首页 > 技术文章 > SparkSQL---UDAF

shuaidong 2018-11-07 08:21 原文

package sqlspark.Day04

import java.lang

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object UdafText05 {
def main(args: Array[String]): Unit = {
//创建sparksession
val spark: SparkSession = SparkSession.builder().appName("ipdemo2").master("local[*]").getOrCreate()
//2.创建分布式的数据集合
val number: Dataset[lang.Long] = spark.range(1, 11)

//转换为df
val numberDF: DataFrame = number.toDF()
//number.show()
//3.注册为视图
numberDF.createTempView("v_number")
//注册UDAF函数
spark.udf.register("geo",new GemMean05_1())
//sq
spark.sql("select geo(id) result from v_number").show()
spark.stop()
}
}


package sqlspark.Day04

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}

class GemMean05_1 extends UserDefinedAggregateFunction {
//运算输入数据的类型
override def inputSchema: StructType = StructType(List(
StructField("value", DataTypes.DoubleType)

))

//产生中间结果的数据类型
override def bufferSchema: StructType = StructType(List(
StructField("product", DataTypes.DoubleType),
StructField("counts", DataTypes.LongType)
))

//最终结果的数据类型
override def dataType: DataType = DataTypes.DoubleType

//数据一致性
override def deterministic: Boolean = true

//分区运算的初始值
override def initialize(buffer: MutableAggregationBuffer): Unit = {
//buffer(0) 存放的分区预算数据数量的初始值
//buffer(0) = 0L
buffer(0) = 1.0
//buffer(1) 存放的是分区运算乘积的初始值
// buffer(1) = 1.0
buffer(1) = 0L
}

//没计算一条数据,更新中间结果
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
//累加计算个数
// buffer(0) = buffer.getLong(0) + 1L
buffer(1) = buffer.getLong(1) + 1L
//累乘 数字
// buffer(1) =buffer.getDouble(0) * input.getDouble(0)
buffer(0) = buffer.getDouble(0) * input.getDouble(0)

}

//合并 将各个分区的计算结果累加和累乘
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getDouble(0) * buffer2.getDouble(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}

//最终计算
override def evaluate(buffer: Row): Any = {
math.pow(buffer.getDouble(0), 1.toDouble / buffer.getLong(1))
}
}



推荐阅读