python - Airflow PostgresToGCSOperator - 以 EPOCH 格式导出时间戳列
问题描述
我使用 PostgresToGCSOperator [v1.10和v2.0 ] 来导出数据。我知道该运算符中有一个映射。我已经改变了它并作为自定义运算符。如果 Postgres 有时间戳,那么在 BQ 中使用 DATETIME 作为时间戳z 使用 TIMESTAMP。
1114: 'DATETIME',
1184: 'TIMESTAMP',
1082: 'DATE',
1083: 'TIME',
1005: 'INTEGER',
1007: 'INTEGER',
1016: 'INTEGER',
20: 'INTEGER',
21: 'INTEGER',
23: 'INTEGER',
16: 'BOOLEAN',
700: 'FLOAT',
701: 'FLOAT',
1700: 'FLOAT'
但是在以 CSV 格式导出此数据时,它不会考虑这一点并以 EPOCH 格式导出值。我如何解决它?
解决方案
我为此找到了一个棘手的解决方法。row_to_json
我们可以使用函数将 postgres 本身的输出转换为 JSON 。然后将其导出到 GCS。
但是有一些小故障。
- 将
schema file
只有一列。所以你必须处理它。 - 它有
row_to_json
一个标题。 - 加载时使用源格式 as
NEWLINE_DELIMITED_JSON
,但在导出时使用CSV
分隔符,|
否则它将添加不必要的引号。 - 装箱
schema file manually
并使用它。 - 使用
max bad records as 1
因为标题名称不会被解析。 - 加载时使用
autodetect=False
样本:
sql='SELECT row_to_json(t) FROM (SELECT * FROM mytbl) t'
输出:
row_to_json
{'id': 1, 'c1': '2020-10-16', 'c2': '10:20:30', 'c3': '2020-10-16T10:20:30', 'c4': '2020-10-16T04:50:30+00:00'}
推荐阅读
- node.js - 每 X 秒控制台输出到 html 文件
- csv - 如何导入大小约为 300GB 的大型 CSV 文件?
- python - 我将如何暴力破解我忘记密码的 Facebook 帐户,而不会被 Facebook 阻止?
- python - 如何将发布请求查询写入烧瓶应用程序中的文件?
- c# - 我怎样才能做到这一点作为一个单一的界面?
- c# - 在用户控件文本框中设置值,选择索引更改了 Asp.net Web 表单
- javascript - 使用 POST 发送时,FormData textarea 放置 \r(回车)
- function - 用 J 写自定义动词
- python - 使用带有加载更多按钮的 python3.9 进行网页抓取
- image-processing - 如何去除 DCT 中的直流分量?