java - 如何从 JavaRDD 中读取 csv 格式的数据在 Spark Java 中使用 StructType Schema
问题描述
使用 Spark Java,我正在尝试使用 StructType 动态模式读取具有 JavaRDD 形式的逗号分隔值的数据。
我知道我可以使用以下传递模式的方法读取 json(其中 schemaStr->StructType 和 javaRDD->JavaRDD):
Dataset<Row> df = spark.read().schema(schemaStr).json(javaRDD);
javaRDD has value as: name1,address11,city111
请建议如何使用 StructType 模式读取 JavaRDD,因为我有逗号分隔的数据作为 JavaRDD。而且我需要使用 StructType 模式读取数据,以将其转换为数据框,因为我有一个动态模式生成器实用程序。
解决方案
所以如果我理解正确,你想转换JavaRDD
为Dataset<Row>
?如果是,您可以JavaRDD<Row>
通过拆分您的创建JavaRDD<String>
并将其spark.createDataFrame
与您的架构一起传递给
StructType schema = new StructType(new StructField[]{
new StructField("_1", DataTypes.StringType, false, Metadata.empty()),
new StructField("_2", DataTypes.StringType, false, Metadata.empty()),
new StructField("_3", DataTypes.StringType, false, Metadata.empty())
});
JavaRDD<String> rdd1 = spark
.range(5)
.javaRDD()
.map(s -> s+",b,c");
JavaRDD<Row> rdd2 = rdd1.map(s -> s.split(","))
.map(s -> RowFactory.create((Object[]) s));
Dataset<Row> df = spark.createDataFrame(rdd2, schema);
df.show();
输出:
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 0| b| c|
| 1| b| c|
| 2| b| c|
| 3| b| c|
| 4| b| c|
+---+---+---+
推荐阅读
- java - 单元测试 AWS SNS 发布消息
- join - Hive 加入分区
- python - 使用“is”标识而不是“==”相等来检查对象是否在可迭代中
- python - Python Graphene 处理多对多关系
- angular - 如何处理仅在验证成功时才应返回 Observable 的函数?
- java - TabLayout 上的标签标题未显示
- sql - 如何使用 ifelse 在 AWS QuickSight 中编写计算字段公式
- android - 使用 InputMethodManager.showSoftKeyboard() 时如何禁用自动完成/建议
- java - 如何从arrayList中获取对象的属性
- typescript - 使用 angularfirestore 获取字符串值以外的记录