首页 > 解决方案 > 如何在 Python 中高效读取 CSV?

问题描述

我们被要求从https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data/csse_covid_19_daily_reports掌握香港 2020 年 COVID-19 的数据。

如您所见,有 345 个 CSV 文件满足需求。但是,我们在每个 CSV 文件中只需要一行。使用 pandas 读取一个 CSV 并将“区域”行定位为“香港”并附加到另一个数据帧非常耗时。

这个任务有更好的解决方案吗?

#hrefs here is the list containing all the csv URL
df1 = pd.read_csv(hrefs[0])
df1 = df1[df1["Province/State"] == "Hong Kong"]
myL = []
for i in range(1,len(hrefs)):
    df2 = pd.read_csv(hrefs[i])
    if "Province/State" in df2.columns:
        #df2 = df2[df2["Province/State"] == "Hong Kong"]
        df2 = df2.loc[df2["Province/State"] == "Hong Kong"]
    else:
        #df2 = df2[df2["Province_State"] == "Hong Kong"]
        df2 = df2.loc[df2["Province_State"] == "Hong Kong"]
        df2.rename(columns={"Country_Region":"Country/Region","Province_State":"Province/State","Last_Update":"Last Update"}, inplace=True)
        df2 = df2[["Province/State","Country/Region","Last Update","Confirmed","Deaths","Recovered"]] 
    myL.append(df2)

df1.append(myL)

标签: pandasdataframe

解决方案


pip install httpx convtools第一的。

第一个用于异步请求,第二个用于数据处理(它在后台生成代码)。

希望这次疫情快点离开我们,希望以下内容对我们有所帮助。

import asyncio
import logging
from datetime import date, timedelta

import httpx

from convtools import conversion as c
from convtools.contrib.tables import Table


logging.basicConfig(level=logging.DEBUG)


async def fetch_urls(urls, max_connections):
    """Fetches urls asynchronously"""
    # TODO: implement local caching here
    results = []
    async with httpx.AsyncClient(
        limits=httpx.Limits(max_connections=max_connections),
        timeout=10,
    ) as client:
        for url in urls:
            results.append(client.get(url))

        for f in asyncio.as_completed(results):
            try:
                response = await f
            except Exception as e:
                logging.exception(e)
                continue
            yield response


async def filter_and_chain_csvs(responses, province, output_file):
    """Reads csv content from responses, renames columns, filters by province
    and puts together the resulting report"""
    table = None
    number = 0
    rename_columns = {
        "Province_State": "Province/State",
        "Country_Region": "Country/Region",
        "Case_Fatality_Ratio": "Case-Fatality_Ratio",
        "Lat": "Latitude",
        "Long_": "Longitude",
        "Incident_Rate": "Incidence_Rate",
        "Last_Update": "Last Update",
    }
    async for response in responses:
        table_ = Table.from_rows(
            list(
                Table.from_csv(response.iter_lines(), header=True)
                .rename(rename_columns)
                .update(source_url=str(response.url))
                .filter(c.col("Province/State") == province)
                .into_iter_rows(include_header=True)
            ),
            header=True,
        )

        if table is None:
            table = table_
        else:
            table = table.chain(table_)
        number += 1

    if table:
        table.into_csv(output_file)


async def fetch_info(
    start_date, end_date, province, output_file, max_connections=5
):
    urls = [
        (start_date + timedelta(days=i)).strftime(
            "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports/%m-%d-%Y.csv"
        )
        for i in range((end_date - start_date).days + 1)
    ]
    streams = fetch_urls(urls, max_connections)
    await filter_and_chain_csvs(streams, province, output_file)


asyncio.run(
    fetch_info(
        date(2020, 1, 22),
        date(2021, 1, 15),
        "Hong Kong",
        "results.csv",
        max_connections=3,
    )
)


推荐阅读