首页 > 解决方案 > 如何gzip查询结果-Python/Airflow

问题描述

我正在尝试对我的查询结果进行 GZIP 压缩并将其写入 Airflow 中的某个位置。但是我得到了错误

TypeError:memoryview:需要一个类似字节的对象,而不是'str'

每当我运行我的代码时。

查看我的代码中的 fp 变量:

def create_tunnel_postgres():
    try:
        tunnel = SSHTunnelForwarder((ssh_host, 22),
                                    ssh_username=ssh_username,
                                    ssh_private_key=pkf,
                                    remote_bind_address=(psql_host,
                                    5432))

            # local_bind_address=('localhost',6543) # could be any available port
        # Start the tunnel

        tunnel.start()
    except:
        print 'connection'
    else:
        conn = psycopg2.connect(database='my_db', user='user',
                                password='my_pwd',
                                host=tunnel.local_bind_host,
                                port=tunnel.local_bind_port)

        cur = conn.cursor()
        cur.execute("""
            select * from pricing.public.seller_tiers ;
            """)
        result = cur.fetchall()

        # Getting Field Header names

        column_names = [i[0] for i in cur.description]
        fp = gzip.open(path, 'wb')
        myFile = csv.writer(fp, delimiter=',')
        myFile.writerow(column_names)
        myFile.writerows(result)
        fp.close()
        conn.close
        tunnel.stop

有什么想法或建议吗?我是 python/airflow 的新手,所以任何事情都会有所帮助。

标签: pythonpostgresqlcsvgzipairflow

解决方案


我认为错误与您使用 gzip 编写器文件内容的方式有关。

您正在尝试以字节模式打开 gzip 文件并在此处写入字符串fp = gzip.open(path, 'wb')

根据此处的python文档,说明:

'rt'、'at'、'wt' 或 'xt' 用于文本模式。

更改您的代码以使用wt写入文本或使用编码功能编码为字节:

 import gzip
 import csv

 with gzip.open("sample.gz", "wt") as gz_fp:
     fieldnames = ['first_name', 'last_name']
     writer = csv.writer(gz_fp, delimiter=",")

     writer.writerow({'first_name': 'Baked', 'last_name': 'Beans'})
     writer.writerow({'first_name': 'Lovely', 'last_name': 'Spam'})
     writer.writerow({'first_name': 'Wonderful', 'last_name': 'Spam'})

如果你只想写字节:

with gzip.open('file.gz', 'wb') as f:
    f.write('Hello world!'.encode())

推荐阅读