首页 > 解决方案 > 将 SAS 数据文件读取到 Spark Dataframe 并加载到 S3 时出现问题

问题描述

在 Windows 10 机器上本地安装 Spark 让我非常开心。我正在尝试将 SAS 数据文件读入 Spark 数据帧,我想将其写入 S3 存储桶上的镶木地板文件。

我的 Jupyter Notebook 中的代码如下。

我得到的错误低于代码。它与 saurfang 包有关。

代码一直有效,直到我添加了 SparkContext() 东西(在代码中以 '<here---' 和 '>here----' 结尾),我相信我需要这些东西才能在我的本地机器上使用 S3 (许多已解决的问题让我走到了这一步)。这也是我在“df.write.mode ...”行中添加的时候。

我假设 create_spark_session() 将使用在其定义之前构建的上下文详细信息。我想知道 SparkContext() 配置的某些部分是否会阻止该行正常工作:

.config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11") \

我是 Spark 的新手,任何帮助将不胜感激。

import configparser # to work with the configuration file
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import (StructType, StructField, StringType, DoubleType, IntegerType, TimestampType,FloatType)
import pandas as pd
import findspark
import pyspark


config = configparser.ConfigParser()
config.read('dl.cfg') # edit this file to include your own values.

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
os.environ['AWS_DEFAULT_REGION']=config['AWS']['AWS_DEFAULT_REGION']

SAS_DATA_LOCATION=config['OTHER']['SAS_DATA_LOCATION']

BUCKET_NAME = config.get("OTHER","BUCKET_NAME")

#<HERE-------------
# see: https://gist.github.com/asmaier/5768c7cda3620901440a62248614bbd0
sc=pyspark.SparkContext()

# see https://github.com/databricks/spark-redshift/issues/298#issuecomment-271834485
sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")

# see https://stackoverflow.com/questions/28844631/how-to-set-hadoop-configuration-values-from-pyspark
hadoop_conf=sc._jsc.hadoopConfiguration()

# see https://stackoverflow.com/questions/43454117/how-do-you-use-s3a-with-spark-2-1-0-on-aws-us-east-2
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("com.amazonaws.services.s3.enableV4", "true")
hadoop_conf.set("fs.s3a.access.key", os.environ['AWS_ACCESS_KEY_ID'])
hadoop_conf.set("fs.s3a.secret.key", os.environ['AWS_SECRET_ACCESS_KEY'])

# see http://blog.encomiabile.it/2015/10/29/apache-spark-amazon-s3-and-apache-mesos/
hadoop_conf.set("fs.s3a.connection.maximum", "100000")

# see https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region
hadoop_conf.set("fs.s3a.endpoint", "s3." + os.environ['AWS_DEFAULT_REGION'] + ".amazonaws.com")

#>HERE-----------

def create_spark_session():
    """Create a spark session in which to work on the data."""
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11") \
        .config("spark.driver.extraClassPath", "C:/Progra~1/Java/jdk1.8.0_251/postgresql-42.2.14.jar")\
        .getOrCreate()
    return spark

def spark_to_s3(read_format,fpath,tname,delimiter=','):
    """
    Create a Spark dataframe from an input file. 
    
    Args:
    
    read_format: E.g. csv.
    fpath: Full path for your input file, e.g. 'c:\your_file.csv'.
    tname: The name of the file to write 
    delimiter: E.g. ','
    """
    spark = create_spark_session()
    df =spark.read.format(read_format) \
                  .option("header","true") \
                  .option("delimiter",delimiter) \
                  .load(fpath)
    print(df.printSchema())
    df.write.mode("overwrite").parquet('s3a://' + BUCKET_NAME + f'/parquet/{tname}.parquet')

# build a dictionary of arguments for the four input files
parameters_dict = {'immigration': {'read_format':'com.github.saurfang.sas.spark',\
                                   'fpath':SAS_DATA_LOCATION + 'i94_apr16_sub.sas7bdat','delimiter':','}
      }

# iterate through the dictionary, writing each dataframe to a parquet file
for k in parameters_dict.keys():
    spark_to_s3(parameters_dict[k]['read_format'],parameters_dict[k]['fpath'],k,parameters_dict[k]['delimiter'])

这是错误:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-4-9aaee5b3167c> in <module>
      9 # iterate through the dictionary, writing each dataframe to a parquet file
     10 for k in parameters_dict.keys():
---> 11     spark_to_s3(parameters_dict[k]['read_format'],parameters_dict[k]['fpath'],k,parameters_dict[k]['delimiter'])

<ipython-input-3-04778a385a6a> in spark_to_s3(read_format, fpath, tname, delimiter)
     14                   .option("header","true") \
     15                   .option("delimiter",delimiter) \
---> 16                   .load(fpath)
     17     print(df.printSchema())
     18     df.write.mode("overwrite").parquet('s3a://' + BUCKET_NAME + f'/parquet/{tname}.parquet')

~\Anaconda3\lib\site-packages\pyspark\sql\readwriter.py in load(self, path, format, schema, **options)
    164         self.options(**options)
    165         if isinstance(path, basestring):
--> 166             return self._df(self._jreader.load(path))
    167         elif path is not None:
    168             if type(path) != list:

~\Anaconda3\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

~\Anaconda3\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

~\Anaconda3\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o37.load.
: java.lang.ClassNotFoundException: Failed to find data source: com.github.saurfang.sas.spark. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:194)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.github.saurfang.sas.spark.DefaultSource
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
    at scala.util.Try.orElse(Try.scala:84)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
    ... 13 more

标签: pythonamazon-s3pysparksasparquet

解决方案


推荐阅读