scala - Spark 数据集到 parquet 文件的转换
问题描述
我是 spark/scala 编程的新手。我正在尝试将 spark 数据集保存到 parquet 文件,但只创建 parquet 目录,其中没有任何子目录或文件。
代码片段:对象 Dimension1 { case class DIM_DATE_PR( date_key: Int, date_fld: Timestamp, date_name: String, date_key_ly: Int, date_fld_ly: Timestamp, day_in_year: Int, day_in_month: Int, day_in_week: Int, day_type: String, year_of_day: Int, year_name: Int, half_in_year: Int, half_year_name: String, quarter_in_year: Int, quarter_name: String, month_in_quarter: Int, month_in_year: Int, month_name: String, week_in_year: String, week_name_in_year: String, week_in_month: String, week_name_in_month: String, week_st_date_key:字符串,week_end_date_key:Int, month_st_date_key: Int, month_end_date_key: Int, week_number_this_year: Int, year: Int, Quarter: Int, ca_date_key: Int, day_type_seq: Int, year_st_date_key: Int, date_key_ly_day2day: Int, date_key_ct_ly: Int)
def 映射器(行:字符串):DIM_DATE_PR = {
val fields = line.split("\\|", -1)
val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val DIM_DATE_PRL1: DIM_DATE_PR = DIM_DATE_PR(
fields(0).toInt,
new Timestamp(formatter.parse(fields(1)).getTime),
fields(2),
if (fields(3).isEmpty()) { 0 } else { fields(3).toInt },
if (fields(4).isEmpty()) { new Timestamp(formatter.parse("2012-04-01 00:00:00").getTime) } else { new Timestamp(formatter.parse(fields(4)).getTime) },
fields(5).toInt,
fields(6).toInt,
fields(7).toInt,
fields(8),
fields(9).toInt,
fields(10).toInt,
fields(11).toInt,
fields(12),
fields(13).toInt,
fields(14),
fields(15).toInt,
fields(15).toInt,
fields(16),
fields(17),
fields(18),
fields(19),
fields(20),
fields(21),
fields(22).toInt,
fields(23).toInt,
fields(25).toInt,
if (fields(26).isEmpty()) { 0 } else { fields(26).toInt },
if (fields(27).isEmpty()) { 0 } else { fields(27).toInt },
if (fields(28).isEmpty()) { 0 } else { fields(28).toInt },
if (fields(29).isEmpty()) { 0 } else { fields(29).toInt },
if (fields(30).isEmpty()) { 0 } else { fields(30).toInt },
fields(31).toInt,
if (fields(32).isEmpty()) { 0 } else { fields(32).toInt },
if (fields(33).isEmpty()) { 0 } else { fields(33).toInt })
return DIM_DATE_PRL1
}
/** 动作发生的主函数 */ def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Use new SparkSession interface in Spark 2.0
val spark = SparkSession
.builder
.appName("SparkSQL")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///C:/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
.getOrCreate()
// Convert our csv file to a DataSet, using our Person case
// class to infer the schema.
import spark.implicits._
//val dataPath_tim = "D:\Project_Scala_Code_Optimizor\Input.txt" //args(0)
// val lines = spark.sparkContext.textFile("D:/Project_Scala_Code_Optimizor/input.txt")
val datardd_tim = spark.sparkContext.textFile("D:/Project_Scala_Code_Optimizor/Input.txt")
val timedimds = datardd_tim.map(mapper).toDS().cache()
timedimds.write.parquet("D:/Project_Scala_Code_Optimizor/dimension.parquet")
spark.stop()
} }
解决方案
推荐阅读
- elasticsearch - 如何保留某个索引比另一个更长的时间?
- python-3.x - model.fit_generator 中的“TypeError:'NoneType' 对象不可调用”
- c# - 有没有办法在 Linqpad 中创建 DbSet?
- python - 根据行中的索引添加列
- opencv - Rasberry pi 交叉编译设置中的 Config.cmake 缺失错误
- javascript - 如何从 React Burger 菜单中删除左侧阴影?
- javascript - 将箭头函数映射到映射中(映射不是函数)
- c# - WebAPI 切换数据库集合
- azure - azure function 和 azure static web app 之间的区别?这个怎么运作?
- python - 使用 ctypes 将 cupy 指针传递给 CUDA 内核