首页 > 解决方案 > 如何使用上述要求严格使用 SPARK SQL 定义 UDF

问题描述

Duration – 旅行的持续时间 Start Date – 包括开始日期和时间 End Date – 包括结束日期和时间 Start Station – 包括起点站名称和编号 End Station – 包括终点站名称和编号 Bike Number – 包括用于自行车的 ID 号旅行会员类型 - 指明用户是“注册”会员(年度会员、30 天会员或每日主要会员)还是“休闲”骑手(单程、24 小时通票、3 天通票或 5 天通票) )

问题是如何将 UDF 严格使用 SPARK SQL 定义为: ○ 将 Start Station 和 End Station 转换为大写。○ 生成列 ■ 'start_day' 格式为“mm-dd-YY”、 ■ 'quarter_of_day'、 ■ 'is_weekend' ... 以上所有内容均使用“开始日期”列。

我尝试了各种方法,但没有发生

标签: apache-spark

解决方案


在下面的示例中,我采用了示例数据。我实现了UDF函数并注册到SparkSession,并在spark sql中使用了这些udf。

我根据您提供的信息获取了样本数据。您也可以发布您的代码,以便我们了解缺少的内容。

def main(args: Array[String]): Unit = {


    val spark: SparkSession = SparkSession.builder()
      .master("local[4]")
      .appName("Remove-String-From-One-DF-From-Another").getOrCreate()


    import spark.implicits._


    val tripDF1 = Seq(
      ("8", Timestamp.valueOf("2017-02-12 03:04:00"), Timestamp.valueOf("2017-02-12 03:12:00"), "washington", "roslyn", "01010101", "Annual Member"),
      ("60", Timestamp.valueOf("2017-02-13 11:04:00"), Timestamp.valueOf("2017-02-13 12:04:00"), "reston", "ashburn", "01010102", "casual"),
      ("20", Timestamp.valueOf("2017-02-14 19:04:00"), Timestamp.valueOf("2017-02-14 19:24:00"), "Boston ", "roslyn", "01010103", "30 Day Member"),
      ("30", Timestamp.valueOf("2017-02-12 03:14:00"), Timestamp.valueOf("2017-02-12 03:44:00"), "Philadelphia ", "Washington", "01010104", "Annual Member"),
      ("17", Timestamp.valueOf("2017-02-11 12:04:00"), Timestamp.valueOf("2017-02-11 12:21:00"), "Baltimore", "Washington", "01010105", "casual"),
      ("30", Timestamp.valueOf("2017-02-15 05:00:00"), Timestamp.valueOf("2017-02-15 05:30:00"), "washington", "Miami ", "01010106", "30 Day Member"),
      ("20", Timestamp.valueOf("2017-02-16 07:10:00"), Timestamp.valueOf("2017-02-16 07:30:00"), "Cincinnati", "Chicago", "01010107", "casual"),
      ("10", Timestamp.valueOf("2017-02-17 17:10:00"), Timestamp.valueOf("2017-02-17 17:20:00"), "Raleigh", "Charlotte", "01010108", "30 Day Member"),
      ("30", Timestamp.valueOf("2017-02-15 05:00:00"), Timestamp.valueOf("2017-02-15 05:30:00"), "washington", "Miami ", "01010106", "30 Day Member"),
      ("20", Timestamp.valueOf("2017-02-16 07:10:00"), Timestamp.valueOf("2017-02-16 07:30:00"), "Cincinnati", "Chicago", "01010107", "casual"),
      ("10", Timestamp.valueOf("2017-02-17 17:10:00"), Timestamp.valueOf("2017-02-17 17:20:00"), "Raleigh", "Charlotte", "01010108", "30 Day Member"),
      ("30", Timestamp.valueOf("2017-02-15 05:00:00"), Timestamp.valueOf("2017-02-15 05:30:00"), "washington", "Miami ", "01010106", "30 Day Member"),
      ("20", Timestamp.valueOf("2017-02-16 07:10:00"), Timestamp.valueOf("2017-02-16 07:30:00"), "Cincinnati", "Chicago", "01010107", "casual"),
      ("10", Timestamp.valueOf("2017-02-17 17:10:00"), Timestamp.valueOf("2017-02-17 17:20:00"), "Raleigh", "Charlotte", "01010108", "30 Day Member")
    ).toDF("Duration", "StartDate", "EndDate", "StartStation", "EndStation", "BikeNumber", "MemberType")



    spark.udf.register("upperCase", (inputString: String) => inputString.toUpperCase())
    tripDF1.show()

    tripDF1.createOrReplaceTempView("TRIPS")

    spark.sql(" Select " +
      "Duration, " +
      "StartDate, " +
      "to_date(cast(unix_timestamp(StartDate, 'MM/dd/yyyy') as TIMESTAMP)) as converted_StartDate, " +
      "EndDate, " +
      "to_date(CAST(unix_timestamp(EndDate, 'MM/dd/yyyy') as TIMESTAMP)) as converted_EndDate,  " +
      "date_format(EndDate, 'E') as day_of_week, " +

      "case date_format(EndDate, 'E') " +
      "when 'Sat' then 'Yes' " +
      "when 'Sun' then 'Yes' " +
      "else 'No' end as Derived_WeekDayOrNot, " +
      "upperCase(StartStation), " +
      "upperCase(EndStation), " +
      "BikeNumber, " +
      "MemberType  " +
      "from TRIPS").show()





  }

推荐阅读