首页 > 解决方案 > 如何使用 Python 运行大型 SQL 查询并导出为 CSV?

问题描述

我正在寻找使用 Python 将 SQL 查询的结果集导出到 CSV 中,下面的代码适用于小查询,但是在尝试更大的查询(下面的示例)时,它会给出以下错误:

TypeError: 'NoneType' object is not iterable

SQL 查询 - 示例代码(非常接近实际代码,但删除了敏感信息):

DECLARE @Chosen_Month DATE
SET @Chosen_Month = '2021-01-01';

IF OBJECT_ID('tempdb..#Base_Data') IS NOT NULL
DROP TABLE #Base_Data;

SELECT
     a.region
    ,a.customer_name
    ,SUM(b.transactions) AS transactions
    ,SUM(b.turnover) AS turnover
    ,SUM(b.revenue) AS revenue
INTO
    #Base_Data
FROM
    customer_table AS a
    INNER JOIN transaction_table AS b ON a.company_id = b.company_id
WHERE
    b.trans_date = @Chosen_Month
GROUP BY
    a.region
    ,a.customer_name

IF OBJECT_ID('tempdb..#Ranked_Data') IS NOT NULL
DROP TABLE #Ranked_Data;

SELECT
    *
    ,ROW_NUMBER() OVER(ORDER BY transactions DESC) AS trans_rank
    ,ROW_NUMBER() OVER(ORDER BY turnover DESC) AS turnover_rank
    ,ROW_NUMBER() OVER(ORDER BY revenue DESC) AS revenue_rank
FROM
    #Base_Data
    
SELECT
    *
FROM
    #Ranked_Data
WHERE
    revenue_rank <= 50
ORDER BY 
    revenue_rank ASC

我尝试四处寻找可能将 SQL 查询拆分为多个执行并避免运行这些空输出但无法进入工作阶段。我如何解释同时运行标量变量等对象的大型查询?我对 Python 还很陌生,如果有任何帮助,我将不胜感激!Python代码如下:

import pyodbc
import csv

new_file_path = r'S:\Andy\Python\testdump.csv'
query_path = r'S:\Andy\Python\testquery.sql'

def read(conn):
    cursor = conn.cursor()
    with open(query_path, 'r') as sql_query_file:
        raw_data = cursor.execute(sql_query_file.read())
        
    with open(new_file_path, 'w', newline='') as csv_file:
        csv_out = csv.writer(csv_file)
        csv_out.writerow([i[0] for i in raw_data.description])
        for row in raw_data:
            csv_out.writerow(row)
            print("Finished export")
            
conn = pyodbc.connect(
    "Driver={Driver_name_here};"
    "Server=server_name_here;"
    "Database=database_name_here;"
    "Trusted_Connection=yes;"
)

read(conn)
conn.close()

标签: pythonsqlcsvpyodbc

解决方案


考虑使用 CTE 和日期参数化的纯 SQL,并避免使用任何临时表。对于大量数据,您可能会遇到超时问题。请参阅有关使用参数的pyodbc.connection文档。timeout

SQL

WITH Base_Data AS (
    SELECT
          a.region
        , a.customer_name
        , SUM(b.transactions) AS transactions
        , SUM(b.turnover) AS turnover
        , SUM(b.revenue) AS revenue
    FROM
        customer_table AS a
        INNER JOIN transaction_table AS b ON a.company_id = b.company_id
    WHERE
        b.trans_date = ?        -- PARAM PLACEHOLDER
    GROUP BY
          a.region
        , a.customer_name
), Ranked_Data AS (
    SELECT
        , *
        , ROW_NUMBER() OVER(ORDER BY transactions DESC) AS trans_rank
        , ROW_NUMBER() OVER(ORDER BY turnover DESC) AS turnover_rank
        , ROW_NUMBER() OVER(ORDER BY revenue DESC) AS revenue_rank
    FROM
        Base_Data
)
    
SELECT
    *
FROM
    Ranked_Data
WHERE
    revenue_rank <= 50
ORDER BY 
    revenue_rank ASC

Python

def sql_to_csv(conn):
    # COMBINE FILE CONTEXT MANAGERS
    with open(query_path, 'r') as sql_query_file, \
         open(new_file_path, 'w', newline='') as csv_file:
       
        # BIND PARAM TO QUERY
        raw_data = cursor.execute(sql_query_file.read(), ['2020-01-01'])
        
        csv_out = csv.writer(csv_file)
        csv_out.writerow([i[0] for i in raw_data.description])

        for row in raw_data:
            csv_out.writerow(row)
         print("Finished export")     # DE-INDENT STATUS PRINT OUTSIDE LOOP

conn = pyodbc.connect(
    "Driver={Driver_name_here};"
    "Server=server_name_here;"
    "Database=database_name_here;"
    "Trusted_Connection=yes;",
    timeout = 3                       # ADJUST ACCORDINGLY
)
cursor = conn.cursor()

try:                                  # EXCEPTION HANDLING TO ALWAYS CLOSE DB OBJECTS
    sql_to_csv(conn)
except Exception as e:
    print(e)
finally:
    cursor.close()
    conn.close()

推荐阅读