我正在尝试根据本教程 https://public.tableau.com/de-de/s/blog/2013/08/data-scraping-part-iii-python实现数据抓取脚本

(原始脚本是用 Python 2.7 编写的,我尝试将其移植到 3.9)


(其余代码已被注释掉以进行调试。)或者,我尝试使用现代标准库“请求” - 但是,相关站点的 SSL 证书在某种程度上是不完整和损坏的,所以我坚持使用较旧的库出于务实的原因。



import datetime
import requests
from os import rename
from os.path import splitext
from calendar import month_name
from http.client import HTTPConnection
from urllib.request import urlretrieve
from zipfile import ZipFile
from argparse import ArgumentParser
from post import POST

HOSTNAME = "www.transtats.bts.gov"
TRANSTAT_URL = "https://{host}/DownLoad_Table.asp?" + \

# the available data set is generally about 3 months behind, make
# sure that we never ask for data that is within the last 3 months
MAX_DATE = datetime.date.today() + datetime.timedelta(days=-90)

def main(opts):
 years = opts.years
    months = opts.months

    # create range of dates
    if len(years) == 2: years = [year for year in range(years[0], years[1]+1)]
    if len(months) == 2: months = [month for month in range(months[0], months[1]+1)]

    max_year = int(MAX_DATE.strftime('%Y'))
    max_month = int(MAX_DATE.strftime('%m'))

    list_of_files = []

    for year in years:
        # skip all years greater than this year
        if year > max_year: continue
        for month in months:
            # skip all years outside of range
            if year == max_year and month > max_month+1:
            print("Collecting RITA for: {}/{}".format(month, year))

            list_of_files.append(get_data(month, year))

    if opts.concat is None:
        return None

    # since we did specify to concatenate the files, we do that here
    write_header = True
    with open(opts.concat, 'w') as target:
        for csv_file in list_of_files:
            with open(csv_file, 'r') as source:
                lines = source.readlines()
                if not write_header:
                    lines = lines[1:]
                    write_header = False
                lines = [line for line in lines if line.strip() != '']

    print("File {}.csv written".format(opts.concat))

def get_data(month, year, post=POST, host=HOSTNAME,
             url=TRANSTAT_URL, frequency=FREQUENCY):
    """ get_data
        from month and year, create the POST
        and download the RITA data zip package

    # setup the POST
    post = post.format(year=year, month=month_name[month], frequency=frequency)
    # make sure post has no EOLs
    post = post.replace('\n','')

    # create the user agent string, and emulate a browser
    user_agent_string = {
        "Content-Type" : "application/x-www-form-urlencoded",
    # Prepare names for output files
    output_file = '{0}-{1}'.format(str(month).zfill(2), year)
    zip_file_name = '{}.zip'.format(output_file)
    csv_file_name = '{}.csv'.format(output_file)

    # now collect data
    print("Collecting RITA data for {0},{1}".format(month_name[month], year))

    # lets get some data, create the request string
    request = HTTPConnection(host)
    # set up the url
    url = url.format(host=host)
    print("Sending POST to {}".format(host))

    request.request("POST", url, post, user_agent_string)

    # get the response from the POST
    response = request.getresponse()
    print("Response = {}".format(response))
    if not response.status == 302:
        raise Exception("Request was not successful with response {}".format(

    # get location from headers
    remote_file = response.getheader('location')
    print("Host = {}".format(host))
    print("Remote File = {}".format(remote_file))
    local_file = request(remote_file, zip_file_name)

    # now unzip the file
    #print("Unzipping {}".format(zip_file_name))
   # z = ZipFile(zip_file_name)

    # extract the csv file from the zip
   # for zip_file in z.namelist():
   #     if splitext(zip_file)[-1] != '.csv': continue
   #     z.extract(zip_file)
   #     rename(zip_file, csv_file_name)
   #     print("RITA file is: {}".format(csv_file_name))
   #     break

    # Done, now return the CSV file name
   # print("Collected {}".format(csv_file_name))
   # return csv_file_name
def usage():
    parser = ArgumentParser(description=__doc__)

                        help='Year(s) for collecting the RITA data')
    parser.add_argument('-m', '--months', dest='months', default="1,12", required=False,
    parser.add_argument('-c', '--concat', default=None,
                        help="Concat data into single file with specified name,"
                             "without this option, the month-year.csv files are left as is."

    opts = parser.parse_args()
    return opts

def parse_date_range(args):
    """ convert the string into integers """
    return [int(a) for a in args.split(',')]

if __name__ == "__main__":
    opts = usage()

