首页 > 解决方案 > 问题 - 处理后将文件写回本地系统

问题描述

我对 PySpark 有点陌生,我希望实现它以加快脚本的运行时间,该脚本用于将美国的医生数据导入每个州的 pdf 文件和最终的 csv 文件以用于数据库表。为了实用,因为我想扩大我的项目以查看不同国家的地区和省份,我希望能够使用 PySpark 进行分布式计算。

我可以更快地完成处理,但我无法让我的文件写回我想使用我multi_part_upload_with_s3在另一个脚本中编写的函数的本地系统,以便在我更多地实现 Spark 后将这些文件推送到 S3 . 在实现 Spark 之前,我在这里有原始脚本:

import sys
import pandas as pd
import numpy as np
from tabula import read_pdf
import requests
import os
from contextlib import closing
from write_to_s3 import multi_part_upload_with_s3

def download_aamc_data(save_folder: str) -> None:
    if not os.path.exists(save_folder):
        os.makedirs(save_folder)

    base_url = 'https://www.aamc.org/system/files/2019-12/state-physician-Alabama-2019%5B1%5D.pdf'

    us_states = ["Alabama","Alaska","Arizona","Arkansas","California","Colorado",
  "Connecticut","District_of_Columbia","Delaware","Florida","Georgia","Hawaii","Idaho","Illinois",
  "Indiana","Iowa","Kansas","Kentucky","Louisiana","Maine","Maryland",
  "Massachusetts","Michigan","Minnesota","Mississippi","Missouri","Montana",
  "Nebraska","Nevada","New_Hampshire","New_Jersey","New_Mexico","New_York",
  "North_Carolina","North_Dakota","Ohio","Oklahoma","Oregon","Pennsylvania",
  "Rhode_Island","South_Carolina","South_Dakota","Tennessee","Texas","Utah",
  "Vermont","Virginia","Washington","West_Virginia","Wisconsin","Wyoming"]

    for state in us_states:
        print('Processing {}'.format(state))
        if state != 'Indiana':
            curr_url = base_url.replace('Alabama', state)
        else:
            curr_url = 'https://www.aamc.org/system/files/2019-12/state-physician-Indiana-2019_0.pdf'
        response = requests.get(curr_url, stream=True)
        save_path = os.path.join(save_folder, state + '.pdf')
        with open(save_path, 'wb') as f, closing(requests.get(curr_url)) as res:
           for response, chunk in enumerate(res.iter_content(chunk_size=1024)):
             if chunk: # filter out keep-alive new chunks
               f.write(chunk)
               f.flush()
               os.fsync(f.fileno())


def clean_page_single_file(file_path: str) -> pd.DataFrame:
    state_abbr_dict = {'Alabama': 'AL','Alaska': 'AK','Arizona': 'AZ','Arkansas': 'AR','California': 'CA','Colorado': 'CO','Connecticut': 'CT','District_of_Columbia': 'DC','Delaware': 'DE','Florida': 'FL','Georgia': 'GA','Hawaii': 'HI','Idaho': 'ID','Illinois': 'IL','Indiana': 'IN','Iowa': 'IA','Kansas': 'KS','Kentucky': 'KY','Louisiana': 'LA','Maine': 'ME','Maryland': 'MD','Massachusetts': 'MA','Michigan': 'MI','Minnesota': 'MN','Mississippi': 'MS','Missouri': 'MO','Montana': 'MT','Nebraska': 'NE','Nevada': 'NV',
    'New_Hampshire': 'NH','New_Jersey': 'NJ','New_Mexico': 'NM', 'New_York': 'NY','North_Carolina': 'NC','North_Dakota': 'ND','Ohio': 'OH','Oklahoma': 'OK','Oregon': 'OR','Pennsylvania': 'PA','Rhode_Island': 'RI','South_Carolina': 'SC','South_Dakota': 'SD','Tennessee': 'TN','Texas': 'TX','Utah': 'UT','Vermont': 'VT','Virginia': 'VA','Washington': 'WA','West_Virginia': 'WV','Wisconsin': 'WI','Wyoming': 'WY'}
    state_name = file_path.split('/')[-1].split('.')[0]
    state_code = state_abbr_dict[state_name]
    
    df = read_pdf(file_path, pages = 'all')
    p1 = df[0].copy(deep=True)
    p2 = df[1].copy(deep=True)

    p1.columns = list('abcde')
    p1 = p1.iloc[2:]
    p1['a'] = p1['b']
    p1 = p1[['a', 'c']]
    p1 = p1.iloc[:19]
    
    p1 = p1.transpose()
    new_header = p1.iloc[0]
    p1 = p1.iloc[1:]
    p1.columns = new_header
    p1.insert(0, 'State', [state_code])
    p1 = p1.reset_index()
    del p1['index']
    p1.columns.name = None
    p1 = p1.replace('---', 'NA')
    
    p2_columns = ['Speciality', 'Physicians', 'People per Physicians', 'Total female Physicians', 'Percent Female Physicians', 'Total Physicians > Age 60', 'Percent Physicians > Age 60']
    p2.columns = p2_columns
    p2 = p2.replace('*', 'NA')
    p2 = p2[['Speciality', 'Physicians']]
    p2 = p2.transpose()
    
    new_header = p2.iloc[0]
    p2 = p2.iloc[1:]
    p2.columns = new_header
    del p2['SpecialtyPhysiciansPeople Per PhysicianNumberPercentNumberPercent']
    p2.insert(0, 'State', [state_code])
    p2 = p2.reset_index()
    del p2['index']
    p2.columns.name = None
    
    p1 = p1.join(p2, rsuffix = '_p2')
    del p1['State_p2']
    
    return(p1)

def clean_all_documents(folder_path: str, save_path: str) -> pd.DataFrame:
    assert os.path.exists(folder_path)
    
    pdf_list = os.listdir(folder_path)
    pdf_list = list(filter(lambda x: '.DS_Store' not in x, pdf_list))
    pdf_list.sort()
    print('Processing file {}'.format(pdf_list[0]))
    master_df = clean_page_single_file(os.path.join(folder_path, pdf_list[0]))

    for pdf in pdf_list[1:]:
        print('Processing file {}'.format(pdf))
        curr_path = os.path.join(folder_path, pdf)
        curr_df = clean_page_single_file(curr_path)
        master_df = master_df.append(curr_df)
    
    print('Saving csv to disk.')
    master_df.to_csv(save_path)
    master_df = master_df.reset_index()
    del master_df['index']
    
    return(master_df)

def main():
    print('Downloading data.')
    save_folder = './healthcare_pdf'
    download_aamc_data(save_folder = save_folder)
    
    print('Processing pdfs.')
    folder_path = save_folder
    save_path = './national_physicians.csv'
    clean_all_documents(folder_path= folder_path, save_path=save_path)

    multi_part_upload_with_s3("my-bucket", "pdf-directory" , "pdf")
    multi_part_upload_with_s3("my-bucket", "csv-directory", "csv")    

if __name__ == '__main__':
    main()

实现 Spark 后的新脚本:

import sys
import pandas as pd
import numpy as np

from tabula import read_pdf
#try:
#    from tabula import read_pdf
#except ImportError:
#    import subprocess
#    subprocess.run(['pip3', 'install', '--user', 'tabula-py'], check=True)
#    from tabula import read_pdf
import requests
import os
from contextlib import closing
from write_to_s3 import multi_part_upload_with_s3
from concurrent import futures
sys.path.append('../utilities')
from utility_setup import create_spark


def generate_url(state):
    base_url = 'https://www.aamc.org/system/files/2019-12/state-physician-{}-2019%5B1%5D.pdf'
    if state != 'Indiana':
        curr_url = base_url.format(state)
    else:
        curr_url = 'https://www.aamc.org/system/files/2019-12/state-physician-Indiana-2019_0.pdf'
    return curr_url

def download_aamc_data(save_folder: str) -> None:
    if not os.path.exists(save_folder):
        os.makedirs(save_folder)

    us_states = ["Alabama","Alaska","Arizona","Arkansas","California","Colorado",
  "Connecticut","District_of_Columbia","Delaware","Florida","Georgia","Hawaii","Idaho","Illinois",
  "Indiana","Iowa","Kansas","Kentucky","Louisiana","Maine","Maryland",
  "Massachusetts","Michigan","Minnesota","Mississippi","Missouri","Montana",
  "Nebraska","Nevada","New_Hampshire","New_Jersey","New_Mexico","New_York",
  "North_Carolina","North_Dakota","Ohio","Oklahoma","Oregon","Pennsylvania",
  "Rhode_Island","South_Carolina","South_Dakota","Tennessee","Texas","Utah",
  "Vermont","Virginia","Washington","West_Virginia","Wisconsin","Wyoming"]

    urls = {state:generate_url(state) for state in us_states}

    for state, curr_url in urls.items():
        response = requests.get(curr_url, stream=True)
        save_path = os.path.join(save_folder, state + '.pdf')                                                                                                                                                              
        with open(save_path, 'wb') as f, closing(requests.get(curr_url)) as res:
           for response, chunk in enumerate(res.iter_content(chunk_size=1024)):
             if chunk: # filter out keep-alive new chunks
               f.write(chunk)
               f.flush()
               os.fsync(f.fileno())


def clean_page_single_file(file_path: str) -> pd.DataFrame:
    state_abbr_dict = {'Alabama': 'AL','Alaska': 'AK','Arizona': 'AZ','Arkansas': 'AR','California': 'CA','Colorado': 'CO','Connecticut': 'CT','District_of_Columbia': 'DC','Delaware': 'DE','Florida': 'FL','Geor$    'New_Hampshire': 'NH','New_Jersey': 'NJ','New_Mexico': 'NM', 'New_York': 'NY','North_Carolina': 'NC','North_Dakota': 'ND','Ohio': 'OH','Oklahoma': 'OK','Oregon': 'OR','Pennsylvania': 'PA','Rhode_Island': 'R$    state_name = file_path.split('/')[-1].split('.')[0]
    state_code = state_abbr_dict[state_name]

    df = read_pdf(file_path, pages = 'all')
    p1 = df[0].copy(deep=True)
    p2 = df[1].copy(deep=True)

    p1.columns = list('abcde')
    p1 = p1.iloc[2:]
    p1['a'] = p1['b']
    p1 = p1[['a', 'c']]
    p1 = p1.iloc[:19]

    p1 = p1.transpose()
    new_header = p1.iloc[0]
    p1 = p1.iloc[1:]
    p1.columns = new_header
    p1.insert(0, 'State', [state_code])
    p1 = p1.reset_index()
    del p1['index']
    p1.columns.name = None
    p1 = p1.replace('---', 'NA')

    p2_columns = ['Speciality', 'Physicians', 'People per Physicians', 'Total female Physicians', 'Percent Female Physicians', 'Total Physicians > Age 60', 'Percent Physicians > Age 60']
    p2.columns = p2_columns
    p2 = p2.replace('*', 'NA')
    p2 = p2[['Speciality', 'Physicians']]
    p2 = p2.transpose()

    new_header = p2.iloc[0]
    p2 = p2.iloc[1:]
    p2.columns = new_header
    del p2['SpecialtyPhysiciansPeople Per PhysicianNumberPercentNumberPercent']
    p2.insert(0, 'State', [state_code])
    p2 = p2.reset_index()                                                                                                                                                                                              
    del p2['index']
    p2.columns.name = None

    p1 = p1.join(p2, rsuffix = '_p2')
    del p1['State_p2']

    return(p1)

def clean_all_documents(folder_path: str, save_path: str) -> pd.DataFrame:
    assert os.path.exists(folder_path)                                                                                                                                                                                                                                                                                                                                                                                                    
    pdf_list = os.listdir(folder_path)
    pdf_list = list(filter(lambda x: '.DS_Store' not in x, pdf_list))
    pdf_list.sort()
    # print('Processing file {}'.format(pdf_list[0]))
    # master_df = clean_page_single_file(os.path.join(folder_path, pdf_list[0]))

    with futures.ProcessPoolExecutor() as exc:
        dfs = exc.map(clean_page_single_file, [os.path.join(folder_path, pdf) for pdf in pdf_list])
    master_df = pd.concat(dfs)

    #for pdf in pdf_list[1:]:#for pdf in pdf_list[1:]:
    #    print('Processing file {}'.format(pdf))
    #    curr_path = os.path.join(folder_path, pdf)
    #    curr_df = clean_page_single_file(curr_path)
    #    master_df = master_df.append(curr_df)

    print('Saving csv to disk.')
    master_df.to_csv(save_path)
    master_df = master_df.reset_index()
    del master_df['index']

    return(master_df)


def process_on_spark():
    us_states = ["Alabama","Alaska","Arizona","Arkansas","California","Colorado",
  "Connecticut","District_of_Columbia","Delaware","Florida","Georgia","Hawaii","Idaho","Illinois",
  "Indiana","Iowa","Kansas","Kentucky","Louisiana","Maine","Maryland",
  "Massachusetts","Michigan","Minnesota","Mississippi","Missouri","Montana",
  "Nebraska","Nevada","New_Hampshire","New_Jersey","New_Mexico","New_York",
  "North_Carolina","North_Dakota","Ohio","Oklahoma","Oregon","Pennsylvania",
  "Rhode_Island","South_Carolina","South_Dakota","Tennessee","Texas","Utah",
  "Vermont","Virginia","Washington","West_Virginia","Wisconsin","Wyoming"]
    
   urls = [(state, generate_url(state)) for state in us_states]

    sc = create_spark() # app=None, mem=None)
    result = sc.sparkContext.parallelize(urls).map(download_and_extract_dataframe).collect()
    master_df = pd.concat(result)

    print('Saving csv to disk.')
    save_path = './national_physicians.csv'
    master_df.to_csv(save_path)
    master_df = master_df.reset_index()
    del master_df['index']                                                                                                                                                                                                                                                                                                                                                                                                                multi_part_upload_with_s3("covid19datalakesafford", "/home/ubuntu/insight-fellows-de2020-pandemic-platform/data_fetching_etl/healthcare_pdf" , "pdf")
    multi_part_upload_with_s3("covid19datalakesafford", "/home/ubuntu/insight-fellows-de2020-pandemic-platform/data_fetching_etl", "csv")
    return(master_df)


def download_and_extract_dataframe(state_url):

    #try:
    #    from tabula import read_pdf
    #except ImportError:
    #    import subprocess
    #    import subprocess                                                                                                                                                                                             #    subprocess.run(['pip3', 'install', '--user', 'tabula-py'], check=True)
    #    from tabula import read_pdf                                                                                                                                                                               
    save_folder = '.' # /downloaded_pdfs'
    # this doesn't seem to work
    #if not os.path.isdir(save_folder):                                                                                                                                                                                #    os.makedirs(save_folder)                                                                                                                                                                                      
    state, curr_url = state_url
    response = requests.get(curr_url, stream=True)                                                                                                                                                                     
    save_path = os.path.join(save_folder, state + '.pdf')                                                                                                                                                              
    with open(save_path, 'wb') as f, closing(requests.get(curr_url)) as res:                                                                                                                                               
       for response, chunk in enumerate(res.iter_content(chunk_size=1024)):                                                                                                                                                   
          if chunk: # filter out keep-alive new chunks
               f.write(chunk)
               f.flush()
               os.fsync(f.fileno())

    df = clean_page_single_file(save_path)
    return df


def main():
    print('Downloading data.')
    save_folder = './healthcare_pdf'
    download_aamc_data(save_folder = save_folder)                                                                                                                                                                  
    print('Processing pdfs.')
    folder_path = save_folder
    save_path = './national_physicians.csv'                                                                                                                                                                            clean_all_documents(folder_path= folder_path, save_path=save_path)                                                                                                                                             
    multi_part_upload_with_s3("my-bucket", "pdf-directory" , "pdf")                                                              multi_part_upload_with_s3("my-bucket", "csv-directory", "csv")                                                                                                                                                                                                                                                                                             

if __name__ == '__main__':                                                                                                                                                                                             main()
    # process_on_spark()

对于我的函数(来自另一个 Python 脚本):

def multi_part_upload_with_s3(bucket, path, extension):
    # Multipart upload
    """Multipart upload for files downloaded from urls/S3"""
    #create s3 object
    s3 = boto3.resource('s3')
    os.chdir(path)
    config = TransferConfig(
    multipart_threshold=2*1024*1024,
    max_concurrency=5,
    num_download_attempts=10,
    multipart_chunksize=1024*1024,
    max_io_queue=100)
    result = glob.glob('*.{}'.format(extension))

    for file in result:
       try:
         print('Upload:', file)
         s3.Bucket(bucket).upload_file(file,'{}'.format(file), Config=config, Callback=ProgressPercentage(file))
         os.remove(file)                                                                                                                                                                                                  except Exception as e:
          print('Upload:', file, 'failed')
          print(e)

有人对如何编写实现 Spark 的 Python 脚本有任何建议,以便将处理后的数据从我的工作人员那里带回我的本地实例上的文件并使用我的函数写入 S3?

编辑:我现在正试图坚持使用 Tabula-Py。就处理原始脚本的时间和尝试使用 PySpark 作为基准而言:

在我的集群上将原始脚本作为 Spark 作业执行:

real    2m33.090s
user    2m5.460s
sys     0m5.198s

在我的集群上使用 Spark 尝试脚本执行:

real    2m38.746s
user    2m6.520s
sys     0m5.339s

标签: pythonpython-3.xamazon-web-servicesapache-sparkamazon-s3

解决方案


推荐阅读