首页 > 解决方案 > 读取包含错误编码的某些行的 csv 文件并向用户返回不正确的行

问题描述

我有一些用户上传要摄取的 csv 文件。在 Python 2 中,我能够以二进制格式打开文件,将其传递给 a unicodecsv.DictReader,如果某些行存在编码问题,例如由于客户使用 CP1251 或其他内容而导致的无效 Unicode 字符,我可以记录这些行并准确返回哪个行有问题。

使用 py3.7,我似乎无法做到这一点——该csv模块需要对文件进行解码,如果我改为将它传递给类似的生成器(line.decode('utf8') for line in my_binary_file),我不能让它只为坏行抛出异常并继续追求。我尝试使用unicodecsv,即使它已经四年多没有提交并且在技术上不支持 py > 3.5,而且它似乎也不起作用——迭代器在坏行之后停止。

我可以看到两种解决方法,这两种方法都不吸引人:
1)事先逐行解码文件并找到错误的行,这很浪费,或者
2)编写我自己的 CSV 解析器,它允许跳过错误的行,这看起来像自找麻烦。

我可以用另一种方式做到这一点吗?

作为参考,这里是在 py2 中工作的示例代码:

def unicode_safe_iterator(reader):
    while True:
        try:
            yield True, next(reader)
        except UnicodeDecodeError as exc:
            yield False, 'UnicodeDecodeError: %s' % str(exc)
        # uncomment for py3:
        # except StopIteration:
        #     return

def get_data_iter_from_csv(csv_file, ...):
    reader = unicodecsv.DictReader(csv_file)
    error_messages = []
    line_num = 1
    for valid, row in unicode_safe_iterator(reader):
        line_num += 1
        if not valid:
            error_messages.append(dict(line_number=line_num, error=row))
        else:
            row_data = validate_row_data(row)  # check for errors other than encoding, etc.
        if not error_messages:
            # stop yielding in case of errors, but keep iterating to find all errors.
            yield row_data
    if error_messages:
        raise ValidationError(Errors.CSV_FILE_ERRORS, error_items=error_messages)


data_iter = get_data_iter_from_csv(open(path_to_csv, 'rb'), ...)

标签: pythonpython-3.xcsvunicodepython-unicode

解决方案


这是一种解决方法。我们将文件作为字节流读取,将其拆分为新行并尝试将行转换为 utf8 字符串。如果失败,尝试将不正确的部分转换为 cp1251 字符串。Therafter 可以使用 io.StringIO 来模拟打开一个文件。

import csv, io

def convert(bl):

    rslt=[]
    done=False
    pos=0
    while not done:
        try:
            s=bl[pos:].decode("utf8")
            rslt.append(s)
            done=True
        except UnicodeDecodeError as ev:
            abs_start, abs_end= pos+ev.start, pos+ev.end
            rslt.append(bl[pos:abs_start].decode("utf8"))
            rslt.append(bl[abs_start:abs_end].decode("cp1251",errors="replace"))
            pos= abs_end
            if pos>= len(bl):
                done=True

    return "".join(rslt)


with open(path_to_csv,"rb") as ff:

    data= ff.read().split(b'\x0a')
    text= [ convert(line)  for line in data ]

text="\n".join(text)
print(text)

rdr= csv.DictReader(io.StringIO(text))

它也可以一次完成,而不是逐行完成:

with open(path_to_csv,"rb") as ff:  
    text= convert( ff.read() )

rdr= csv.DictReader(io.StringIO(text))

推荐阅读