首页 > 解决方案 > 格式化字典中的字符串以与 cvs.DictReader 一起使用以创建动态 SQL (KeyError: 'xxx')

问题描述

我正在使用 Python (3.6) 编写脚本以将 CSV 文件加载到数据库中。我想提供表模式作为字典(而不是硬编码)。

这是我的代码片段:

import os
import csv
from pathlib import Path

dirname='/path/to/folder'
dbname="{0}/mydata.db".format(dirname)

schema = {
            'tstamp': 'TIMESTAMP PRIMARY KEY',
            'col1':   'REAL',
            'col2':   'REAL',
            'col3':   'REAL',
            'col4':   'REAL',
            'col5':   'REAL',
            'col6':   'REAL'           
         }

tablename = 'foobar'
fieldnames = "({0})".format(", ".join(["'{0}'".format(x) for x in schema.keys()]) )
print("fieldnames: {0}".format(fieldnames))
schema_field_types = ", ".join(["{0} {1}".format(k,v) for k,v in schema.items()])
print("schema_field_types: {0}".format(schema_field_types))
create_table_ddl = "CREATE TABLE {0} ({1});".format(tablename, schema_field_types )
print("create_table_ddl: {0}".format(create_table_ddl))
values_string = ", ".join(['?' for i, x in enumerate(schema)])
print("values_string: {0}".format(values_string))
insert_table_sql = "INSERT INTO {0} ({1}) VALUES ({2});".format(tablename, schema_field_types, values_string)
print("insert_table_sql: {0}".format(insert_table_sql))

# if database does not exist, create it
if not os.path.isfile(dbname):
    print('mode: CREATE_DB')
    mode = CREATE_DB
    Path(dirname).mkdir(parents=True, exist_ok=True)
    Path(dbname).touch()    
else:
    print('mode: UPDATE_DB')
    mode = UPDATE_DB


con = sqlite3.connect(dbname, detect_types=sqlite3.PARSE_DECLTYPES)
cur = con.cursor()

if mode == CREATE_DB:
    cur.execute(create_table_ddl) 

with open(filename,'r') as fin: 
    # csv.DictReader uses first line in file for column headings by default
    dr = csv.DictReader(fin, fieldnames=fieldnames)
    s = ", ".join(["{{{0}}}".format(k) for k in schema.keys()])
    print("s: {0}".format(s))
    row = next(dr)
    print(row)
    to_db = [(s.format(*i)) for i in dr]  # <- KeyError: tstamp

为什么会引发 KeyError 异常 - 我该如何解决?

标签: pythoncsv

解决方案


问题是这一行:

fieldnames = "({0})".format(", ".join(["'{0}'".format(x) for x in schema.keys()]))

csv.DictReader需要一系列键,例如

[k1, k2, k3...]

fieldnames是一个字符串

"('tstamp', 'col1', 'col2', 'col3', 'col4', 'col5', 'col6')"

所以DictReader将字段名评估为

['(', "'", 't', 's', 't', 'a', 'm', 'p', "'", ',', ' ', "'", 'c', 'o', 'l', '1', "'", ',', ' ', "'", 'c', 'o', 'l', '2', "'", ',', ' ', "'", 'c', 'o', 'l', '3', "'", ',', ' ', "'", 'c', 'o', 'l', '4', "'", ',', ' ', "'", 'c', 'o', 'l', '5', "'", ',', ' ', "'", 'c', 'o', 'l', '6', "'", ')']

使用

fieldnames = schema.keys()

和改变

to_db = [(s.format(*i)) for i in dr]

to_db = [(s.format(**i)) for i in dr]

所以每一行都被解包为 adict而不是 alist将停止错误。

一个观察:做你正在做的事情可能更容易,csv.reader而不是DictReader- 这样你的列已经按照定义的顺序排列,你不必担心字典键的顺序。


推荐阅读