首页 > 解决方案 > Airflow PostgresToGCSOperator - 以 EPOCH 格式导出时间戳列

问题描述

我使用 PostgresToGCSOperator [v1.10v2.0 ] 来导出数据。我知道该运算符中有一个映射。我已经改变了它并作为自定义运算符。如果 Postgres 有时间戳,那么在 BQ 中使用 DATETIME 作为时间戳z 使用 TIMESTAMP。


       1114: 'DATETIME',
        1184: 'TIMESTAMP',
        1082: 'DATE',
        1083: 'TIME',
        1005: 'INTEGER',
        1007: 'INTEGER',
        1016: 'INTEGER',
        20: 'INTEGER',
        21: 'INTEGER',
        23: 'INTEGER',
        16: 'BOOLEAN',
        700: 'FLOAT',
        701: 'FLOAT',
        1700: 'FLOAT'

但是在以 CSV 格式导出此数据时,它不会考虑这一点并以 EPOCH 格式导出值。我如何解决它?

标签: pythonairflow

解决方案


我为此找到了一个棘手的解决方法。row_to_json我们可以使用函数将 postgres 本身的输出转换为 JSON 。然后将其导出到 GCS。

但是有一些小故障。

  1. schema file只有一列。所以你必须处理它。
  2. 它有row_to_json一个标题。
  3. 加载时使用源格式 as NEWLINE_DELIMITED_JSON,但在导出时使用CSV分隔符,|否则它将添加不必要的引号。
  4. 装箱schema file manually并使用它。
  5. 使用max bad records as 1因为标题名称不会被解析。
  6. 加载时使用autodetect=False

样本:

sql='SELECT row_to_json(t) FROM (SELECT * FROM mytbl) t'

输出:

row_to_json
{'id': 1, 'c1': '2020-10-16', 'c2': '10:20:30', 'c3': '2020-10-16T10:20:30', 'c4': '2020-10-16T04:50:30+00:00'}

推荐阅读