首页 > 解决方案 > 使用具有字符串第 n 个字符的条件的块拆分非常大的 csv

问题描述

我正在尝试拆分一个非常大的 (22GB) CSV 文件,同时使用块和数据中给定列的第 n 个字符的条件。

我一直在无助地尝试将其组合起来: Python:根据第一列的第一个字符拆分 CSV 文件

有这样的东西,但我碰壁了。我有一个列不为空的条件,但我想根据给定列的第 n 个字符拆分我的文件。

反正有没有根据这样的条件创建更小的 csv 文件。任何帮助将不胜感激。

我的数据摘要如下所示:

源名称 date_naissance date_deces date_mariage 地方
dgfkf47 YYYYMMDD YYYYMMDD 等等 等等
fhfidk67 YYYYMMDD YYYYMMDD 等等 等等
kgodj45 YYYYMMDD 等等 等等
paoror76 YYYYMMDD 万维网 等等 等等
poldidj90 YYYYMMDD

我想要做的是创建一系列较小的文件,以便稍后通过根据列 ID 的第 7 个字符拆分数据来分析数据。我知道如何在 5X10 中做到这一点,因为它适合我的记忆,我只是使用 groupby,但我被困在一个非常大的范围内。as ask 似乎并没有让我迭代 groupby。

我现在的策略是对 Dask 进行所有清理操作,包括创建一个仅包含第 7 个字符的新列,然后输出可以在 pandas 中加载并按此列分组的较小文件。

目前我已经做到了这一点,但我很想知道是否有一种简单的方法可以做到这一点:

import dask.dataframe as dd
import glob, os
import pyarrow.parquet
import pyarrow as pa
from dask import multiprocessing
PATH = r"/RAW DATA/TEST/"
os.chdir(PATH)
for file in glob.glob("**/*_clean.csv", recursive=True):
    ddf = dd.read_csv(file, encoding='iso-8859-1', sep=';', dtype={'insee': 'object',
       'ref_document': 'object', 'ref_interne_sourcename': 'object',
       'sexe': 'object', 'profession': 'object', 'date_naissance': 'object', 'date_mariage': 'object', 'date_deces': 'object'})
# read to parquet
    ddf.to_parquet('file_clean.csv.parquet', engine='pyarrow', schema={'insee': pa.string(),
       'ref_document': pa.string(), 'ref_interne_sourcename': pa.string(),
       'sexe': pa.string(), 'profession': pa.string(), 'date_naissance': pa.string(), 'date_mariage': pa.string(), 'date_deces': pa.string()})
    ddf = dd.read_parquet('file_clean.csv.parquet', engine='pyarrow')
# split dates
    ddf['date_naissance']= dd.to_datetime(ddf['date_naissance'], format='%Y%m%d', errors='coerce')
    ddf['date_mariage']= dd.to_datetime(ddf['date_mariage'], format='%Y%m%d', errors='coerce')
    ddf['date_deces']= dd.to_datetime(ddf["date_deces"], format='%Y%m%d', errors='coerce')
    ddf['jour_naissance'] = ddf['date_naissance'].dt.day
    ddf['mois_mariage'] = ddf['date_naissance'].dt.month
    ddf['annee_mariage'] = ddf['date_naissance'].dt.year
    ddf['jour_mariage'] = ddf['date_mariage'].dt.day
    ddf['mois_mariage'] = ddf['date_mariage'].dt.month
    ddf['annee_mariage'] = ddf['date_mariage'].dt.year
    ddf['jour_deces'] = ddf['date_deces'].dt.day
    ddf['mois_deces'] = ddf['date_deces'].dt.month
    ddf['annee_deces'] = ddf['date_deces'].dt.year
# drop columns
    del ddf['date_naissance']
    del ddf['date_mariage']
    del ddf['date_deces']
# create IDTAG column
    ddf['IDTAG'] = ddf['sourcename'].str[6].fillna('')
    namefile = os.path.splitext(file)[0]
    ddf.to_csv(namefile)
    for partfile in glob.glob('**/*.part'):
        os.rename(partfile, (os.path.splitext(partfile)[0]) + "_{}_part.csv".format(namefile))
    for partfilecsv in glob.glob("**/*_part.csv", recursive=True):
        part_df = pd.read_csv(partfilecsv, encoding='iso-8859-1', sep=';')
# get a list of columns
        cols = list(part_df)
# move the column to head of list
        cols.insert(0, cols.pop(cols.index('IDTAG')))
# reorder
        part_df = part_df.loc[:, cols]
# group by IDTAG
        def firstletter(Index):
            firstentry = part_df.iloc[Index, 0]
            return firstentry[0]
        for letter, group in part_df.groupby(firstletter):
            group.to_csv((os.path.splitext(partfilecsv)[0]) + '_source_{}.csv'.format(letter))

标签: pandascsvchunksfilesplitting

解决方案


Well this how I managed to do it - followed by another script to merge the smaller files produced. Surely a more elegant solution exists. Happy to learn.

import dask.dataframe as dd
import glob, os
import pyarrow.parquet
import pyarrow as pa
from dask import multiprocessing
PATH = "path"
os.chdir(PATH)
for file in glob.glob("**/*_clean.csv", recursive=True):
    ddf = dd.read_csv(file, encoding='iso-8859-1', sep=';', dtype={'insee': 'object',
       'ref_document': 'object', 'ref_interne_sourcename': 'object',
       'sexe': 'object', 'profession': 'object', 'date_naissance': 'object', 'date_mariage': 'object', 'date_deces': 'object', 'lon': 'float'})
# read to parquet
    ddf.to_parquet('file_clean.csv.parquet', engine='pyarrow', schema={'insee': pa.string(),
       'ref_document': pa.string(), 'ref_interne_sourcename': pa.string(),
       'sexe': pa.string(), 'profession': pa.string(), 'date_naissance': pa.string(), 'date_mariage': pa.string(), 'date_deces': pa.string()})
    ddf = dd.read_parquet('file_clean.csv.parquet', engine='pyarrow', dtype={'insee': 'object',
       'ref_document': 'object', 'ref_interne_sourcename': 'object',
       'sexe': 'object', 'profession': 'object', 'date_naissance': 'object', 'date_mariage': 'object', 'date_deces': 'object', 'lon': 'float'})
# split dates
    ddf['date_naissance']= dd.to_datetime(ddf['date_naissance'], format='%Y%m%d', errors='coerce')
    ddf['date_mariage']= dd.to_datetime(ddf['date_mariage'], format='%Y%m%d', errors='coerce')
    ddf['date_deces']= dd.to_datetime(ddf["date_deces"], format='%Y%m%d', errors='coerce')
    ddf['jour_naissance'] = ddf['date_naissance'].dt.day
    ddf['mois_naissance'] = ddf['date_naissance'].dt.month
    ddf['annee_naissance'] = ddf['date_naissance'].dt.year
    ddf['jour_mariage'] = ddf['date_mariage'].dt.day
    ddf['mois_mariage'] = ddf['date_mariage'].dt.month
    ddf['annee_mariage'] = ddf['date_mariage'].dt.year
    ddf['jour_deces'] = ddf['date_deces'].dt.day
    ddf['mois_deces'] = ddf['date_deces'].dt.month
    ddf['annee_deces'] = ddf['date_deces'].dt.year
# drop columns
    del ddf['date_naissance']
    del ddf['date_mariage']
    del ddf['date_deces']
# create IDTAG column
    ddf['IDTAG'] = ddf['sourcename'].str[6].fillna('')
# save new part as csv
    namefile = os.path.splitext(file)[0]
    ddf.to_csv(namefile, index=False, encoding='iso-8859-1')
    for partfile in glob.glob('**/*.part'):
        os.rename(partfile, (os.path.splitext(partfile)[0]) + "_{}_part.csv".format(namefile))
# create dataframe for each part_csv file
for partfilecsv in glob.glob("**/*_part.csv", recursive=True):
    part_df = pd.read_csv(partfilecsv, encoding='iso-8859-1', dtype={'insee': 'object',
       'ref_document': 'object', 'ref_interne_sourcename': 'object',
       'sexe': 'object', 'profession': 'object', 'date_naissance': 'object', 'date_mariage': 'object', 'date_deces': 'object', 'lon': 'float', 'IDTAG': 'float'})
# get a list of columns
    cols = list(part_df)
# move the column to head of list
    cols.insert(0, cols.pop(cols.index('IDTAG')))
# reorder
    part_df = part_df.loc[:, cols]
# group by new column
    for idtag, group in part_df.groupby('IDTAG'):
        group.to_csv((os.path.splitext(partfilecsv)[0]) + '_source_{}.csv'.format(idtag), index=False)
# remove part_files
    os.remove(partfilecsv)```

推荐阅读