python - 调用 SQL Server 存储过程以摄取 CSV 的 Python 代码需要 HOURS 才能执行
问题描述
我正在使用 python 读取带有 Pandas 的 CSV,修复一些字段,然后将数据逐行写入 SQL Server 中的表。批量导入在服务器上被禁用 - 同样,因为最终会有几十个这样的文件,以自动化文件下载和摄取。我可以看到这需要几分钟,但运行需要 HOURS。
我知道我可以在几秒钟内批量上传这些东西,如果启用的话,但这可能是不可能的。
问题是使用 python 每次运行可能需要 1 到 3 个小时。这是不可接受的。我想知道是否有更快的方法来进行此上传。我可以对表格做些什么来加快导入速度,或者采用不同的编码方式。
这是我正在使用的代码类型的示例:
def ingest_glief_reporting_exceptions_csv():
global conn
global cursor
filename = r"20200824-0800-gleif-goldencopy-repex-golden-copy.csv"
# filename = r"repex_1K.csv"
full_filename = os.path.join(raw_data_dir, filename)
sql_str = "exec dbo.util_get_gleif_last_reporting_exception"
cursor.execute(sql_str)
last_lei = ''
for result in cursor.fetchall():
last_lei = result[0]
# "repex" is short for "reporting exceptions", shorten the headers
repex_headers = [
'LEI',
'ExceptionCategory',
'ExceptionReason1',
'ExceptionReason2',
'ExceptionReason3',
'ExceptionReason4',
'ExceptionReason5',
'ExceptionReference1',
'ExceptionReference2',
'ExceptionReference3',
'ExceptionReference4',
'ExceptionReference5'
]
df = pd.read_csv(full_filename, header=0, quotechar='"')
# Change to the column headers generated in VBA
df.columns = repex_headers
for colname in df.columns:
df[colname] = df[colname].astype(str)
df[colname] = df[colname].replace({'nan': ''})
place_holder = '?,?'
for i in range(1, len(repex_headers)):
place_holder += ',?'
sql_str = "exec save_gleif_reporting_exception " + place_holder
row_count = 0
row = dict()
do_not_upload = True
if last_lei == '':
do_not_upload = False # There was no last uploaded record, so we can start now
for index, row in df.iterrows():
row_count += 1
if do_not_upload:
if row['LEI'] == last_lei:
do_not_upload = False
continue
else:
continue
values = (
row['LEI'],
row['ExceptionCategory'],
row['ExceptionReason1'],
row['ExceptionReason2'],
row['ExceptionReason3'],
row['ExceptionReason4'],
row['ExceptionReason5'],
row['ExceptionReference1'],
row['ExceptionReference2'],
row['ExceptionReference3'],
row['ExceptionReference4'],
row['ExceptionReference5'],
filename
)
if index % 1000 == 0:
print("Imported %s rows" % (index))
# print(values)
# print("processing row ", row_count)
# return Key is the unique ID the database generated as it inserted this row of data.
error_sql_str = "exec log_message ?,?,?,?,?, ?,?,?,?"
connection_failures = 0
connection_failing = True
while connection_failures < 3 and connection_failing:
try:
return_key = cursor.execute(sql_str, values).fetchval()
except pyodbc.OperationalError as e:
connection_failures += 1
connection_failing = True
print("Connection issue. connection failures = ", connection_failures)
time.sleep(30) # wait 30 seconds and go to the top of the loop to try again.
continue
except pyodbc.ProgrammingError as e:
print("Bad field ", values)
error_values = (
'ERROR',
__file__,
filename,
'gleif_reporting_exceptions',
row['LEI'],
'',
'',
'',
str(e)
)
return_key = cursor.execute(error_sql_str, error_values).fetchval()
connection_failures = 0
connection_failures = 0
connection_failing = False
if connection_failures >= 3:
print("Unable to reconnect after 3 tries")
exit(1)
conn.close()
return
我像这样打开数据库:
def init_connection(server_name, db_name):
"""
Connect to SQL Server database
:param server_name:
:param db_name:
:return:
"""
pyodbc.pooling = False
try:
conn = pyodbc.connect(
r'DRIVER={ODBC Driver 17 for SQL Server};SERVER=' + server_name + '; \
Database=' + db_name + ';Trusted_Connection=yes;', timeout=5, autocommit=True)
except Exception as e:
print("Unable to connect to database [" + db_name + '] and server [' + server_name + ']')
print(e)
exit(1)
cursor = conn.cursor()
return [conn, cursor]
好的。
该表定义如下:
CREATE TABLE [dbo].[gleif_exceptions](
[id] [bigint] IDENTITY(1,1) NOT NULL,
[ida_last_update_date] [datetime] NULL,
[ida_last_update_source_file] [nvarchar](500) NULL,
[LEI] [nvarchar](500) NULL,
[ExceptionCategory] [nvarchar](500) NULL,
[ExceptionReason1] [nvarchar](500) NULL,
[ExceptionReason2] [nvarchar](500) NULL,
[ExceptionReason3] [nvarchar](500) NULL,
[ExceptionReason4] [nvarchar](500) NULL,
[ExceptionReason5] [nvarchar](500) NULL,
[ExceptionReference1] [nvarchar](500) NULL,
[ExceptionReference2] [nvarchar](500) NULL,
[ExceptionReference3] [nvarchar](500) NULL,
[ExceptionReference4] [nvarchar](500) NULL,
[ExceptionReference5] [nvarchar](500) NULL
) ON [PRIMARY]
GO
以下是一些示例数据:
LEI,Exception.Category,Exception.Reason.1,Exception.Reason.2,Exception.Reason.3,Exception.Reason.4,Exception.Reason.5,Exception.Reference.1,Exception.Reference.2,Exception.Reference.3,Exception.Reference.4,Exception.Reference.5
004L5FPTUREIWK9T2N63,DIRECT_ACCOUNTING_CONSOLIDATION_PARENT,NON_CONSOLIDATING,,,,,,,,,
00EHHQ2ZHDCFXJCPCL46,DIRECT_ACCOUNTING_CONSOLIDATION_PARENT,NON_CONSOLIDATING,,,,,,,,,
这是我调用以将记录存储到表中的相应存储过程:
ALTER PROCEDURE [dbo].[save_gleif_reporting_exception]
@LEI [nvarchar] (500) = NULL,
@ExceptionCategory [nvarchar] (500) = NULL,
@ExceptionReason1 [nvarchar] (500) = NULL,
@ExceptionReason2 [nvarchar] (500) = NULL,
@ExceptionReason3 [nvarchar] (500) = NULL,
@ExceptionReason4 [nvarchar] (500) = NULL,
@ExceptionReason5 [nvarchar] (500) = NULL,
@ExceptionReference1 [nvarchar] (500) = NULL,
@ExceptionReference2 [nvarchar] (500) = NULL,
@ExceptionReference3 [nvarchar] (500) = NULL,
@ExceptionReference4 [nvarchar] (500) = NULL,
@ExceptionReference5 [nvarchar] (500) = NULL,
@ida_last_update_source_file [nvarchar] (500) NULL
AS
BEGIN
-- SET NOCOUNT ON added to prevent extra result sets from
-- interfering with SELECT statements.
SET NOCOUNT ON;
-- Insert statements for procedure here
INSERT INTO dbo.gleif_reporting_exceptions(
[LEI],
[ExceptionCategory],
[ExceptionReason1],
[ExceptionReason2],
[ExceptionReason3],
[ExceptionReason4],
[ExceptionReason5],
[ExceptionReference1],
[ExceptionReference2],
[ExceptionReference3],
[ExceptionReference4],
[ExceptionReference5],
[ida_last_update_date],
[ida_last_update_source_file]
)
VALUES (
@LEI,
@ExceptionCategory,
@ExceptionReason1,
@ExceptionReason2,
@ExceptionReason3,
@ExceptionReason4,
@ExceptionReason5,
@ExceptionReference1,
@ExceptionReference2,
@ExceptionReference3,
@ExceptionReference4,
@ExceptionReference5,
GETDATE(),
@ida_last_update_source_file
)
SELECT @@IDENTITY
END
注意 1:虽然我将字符串声明为 nvarchar (500),但它们中的大多数都没有那么长。我不认为这很重要。我尝试使用较短的字符串定义,但运行例程仍然需要很长时间。
注 2:这只是迄今为止 7 个示例中的一个。最小的表大约有几十个 K 行,多达几百万。列数在 7 到大约 230 之间变化。
解决方案
关闭自动提交
conn = pyodbc.connect(
r'DRIVER={ODBC Driver 17 for SQL Server};SERVER=' + server_name + '; \
Database=' + db_name + ';Trusted_Connection=yes;', timeout=5, autocommit=False)
并在此处提交,并在循环结束时提交。
if index % 1000 == 0:
print("Imported %s rows" % (index))
使用自动提交,您必须在每一行之后等待日志文件保存到磁盘。
为了进一步优化,如果您使用的是 SQL 2016+,请使用 JSON 将成批的行发送到 SQL Server,并在服务器端使用OPENJSON进行解析。
推荐阅读
- javascript - 使用一个或多个条件在数组中查找对象的函数
- arduino-uno - 重置 GRBL 命令在 arduino IDE 串行监视器中不起作用
- core-data - SwiftUI - 将 @Binding 与核心数据 NSManagedObject 一起使用?
- r - 当动态输入选择导致嵌入在 inline flexdashboard 图表注释中的反应对象出现问题时,更好的错误处理
- mysql - 如何将新表列匹配并插入到以前的数据库列
- javascript - 变量在 p5.js 中不起作用它使用与我在 if 循环中输入的变量不同的变量
- gnuplot - 使用 gnuplot 绘制坐标系
- html - 如何修复 IE11 和 Edge 中溢出的缩放 div 外观?
- android - 如何处理 Activity 的结果?
- html - 如何让css子类继承父css类