scala - 将内部文件转换为数据帧到另一个数据帧或 RDD
问题描述
我有一个每 5、10 或 20 分钟生成文件的过程。然后另一个进程列出绝对路径并每小时将它们保存在一个文件中。结构如下
logan@Everis-PC ~/Datasets/dev/path > cat path1
/home/logan/Datasets/novum_dev/in/TasPo_20190801_001808_D200_20190809.DAT
/home/logan/Datasets/novum_dev/in/TasPo_20190801_001808_S200_20190809.DAT
/home/logan/Datasets/novum_dev/in/TasPo_20190801_001808_V200_20190809.DAT
/home/logan/Datasets/novum_dev/in/TasPr_20190801_001828_D200_20190809.DAT
/home/logan/Datasets/novum_dev/in/TasPr_20190801_001828_S200_20190809.DAT
/home/logan/Datasets/novum_dev/in/TasPr_20190801_001828_V200_20190809.DAT
我的代码如下
val pathFile = "/home/logan/Datasets/dev/path"
sc.wholeTextFiles(pathFile).collect.foreach {
hdfspartition =>
val a = sc.parallelize(Seq(hdfspartition._2)).toDF
a.show(false)
}
但我得到一个数据框,其中的数据位于一行中。
+--------------------------------------------------------------------------------+
|value |
+--------------------------------------------------------------------------------+
|/home/logan/Datasets/novum_dev/in/TasPo_20190801_001808_D200_20190809.DAT
/home/logan/Datasets/novum_dev/in/TasPo_20190801_001808_S200_20190809.DAT
/home/logan/Datasets/novum_dev/in/TasPo_20190801_001808_V200_20190809.DAT
/home/logan/Datasets/novum_dev/in/TasPr_20190801_001828_D200_20190809.DAT
/home/logan/Datasets/novum_dev/in/TasPr_20190801_001828_S200_20190809.DAT
/home/logan/Datasets/novum_dev/in/TasPr_20190801_001828_V200_20190809.DAT
|
+------------------------------------------------------------------------------+
嗨,我需要提取在“pathFile”中找到的文件的内容。pathFile" 具有包含更多文件列表的文件。.DAT 文件 (/../../novum_dev/in/TasPo_20190801_001808_D200_20190809.DAT) 具有要分析的数据。我试图将第一个 DF (wholeTextFiles) 转换为字符串数组,然后到一个由 (,) 分割的字符串
sc.wholeTextFiles(pathFile).collect.foreach {
hdfspartition =>
val fa = hdfspartition._2.split("\\r?\\n")
val fs = fa.mkString(",")
val cdr = sc.textFile(fs).map(line => line.split("|", -1))
.map(x => Row.fromSeq(x))
解决方案
您可能应该使用 spark.read.format("text")
:
import org.apache.spark.sql._
val spark = SparkSession.builder.getOrCreate();
val pathFile = "/home/logan/Datasets/dev/path"
val dataset = spark.read.format("text").load(pathFile)
dataset.show()
推荐阅读
- c++ - GetFunctionPointerForDelegate 回调只工作一次
- reactjs - MapBox + React:标记有时只加载
- wordpress - 防止共享页面 url 的最佳实践
- c++ - 将 SOIL2 链接到 Visual Studio 项目时遇到问题
- delphi - Delphi Resizing Form 弄乱了组件
- ip - IPV6 扩展头长度
- php - 正则表达式不匹配所有必需的字符。我做错什么了?
- node.js - 如何在我的 React 应用程序中使用我的 SQLite 查询?
- css - 通过 url() 使用剪辑路径时遇到问题
- javascript - 在使用 waypoints.js 时,如何绕过由于高度或边距而导致的 div 跳跃?