首页 > 解决方案 > Pyspark NameError:将 csv 文件从本地机器读入 Pyspark 数据帧

问题描述

我正在使用 pyspark 并尝试在桌面上本地运行它。我这样导入库:

from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkContext
import pandas as pd

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

读取 csv:

df = sqlContext.read.load('D:/Databases/Datasets/file.csv', 
                      header='true', 
                      inferSchema='true') 

读入完成后的输出:

Caused by: java.io.IOException: Could not read footer for file: FileStatus{path=file:/D:/Databases/Datasets/file.csv; isDirectory=false; length=3293305101; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:526)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:513)
    at scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
    at scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
    at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
    at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
    at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
    at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)
    at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)
    at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)
    at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
    at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: file:/D:/Databases/Datasets/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [0, 0, 0, 0]
    at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:476)
    at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:445)
    at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:421)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:519)

将文件读入 spark 数据帧时似乎有问题。像这样访问数据框时:

df.show(2,truncate= True)

错误如下:

NameError                                 Traceback (most recent call last)
<ipython-input-20-c8f1d4ce926c> in <module>()
----> 1 df.show(2,truncate= True)

NameError: name 'df' is not defined

在本地读取 pyspark 数据框时,我是否缺少一个步骤?

标签: pythonpython-3.xapache-sparkpysparkanaconda

解决方案


推荐阅读