python - 使用气流蜂巢运算符并输出到文本文件
问题描述
您好我想使用气流蜂巢运算符执行蜂巢查询并将结果输出到文件。我不想在这里使用 INSERT OVERWRITE 。
hive_ex = HiveOperator(
task_id='hive-ex',
hql='/sql/hive-ex.sql',
hiveconfs={
'DAY': '{{ ds }}',
'YESTERDAY': '{{ yesterday_ds }}',
'OUTPUT': '{{ file_path }}'+'csv',
},
dag=dag
)
做这个的最好方式是什么?
我知道如何使用 bash 运算符执行此操作,但想知道我们是否可以使用 hive 运算符
hive_ex = BashOperator(
task_id='hive-ex',
bash_command='hive -f hive.sql -DAY={{ ds }} >> {{ file_path }}
/file_{{ds}}.json',
dag=dag
)
解决方案
由于它是一个非常自定义的用例,因此最好的方法是扩展 Hive 运算符(或创建您自己的 Hive2CSVOperator)。实现将取决于您是否可以通过 CLI 或 HiveServer2 访问 hive。
蜂巢 CLI
我将首先尝试配置 Hive CLI 连接并hive_cli_params
根据Hive CLI 钩子代码添加,如果这不起作用,请扩展 Hook(这将使您可以访问所有内容)。
蜂巢服务器2
这种情况有一个单独的钩子(链接)。它更方便一点,因为它有一个get_results
方法(source)或to_csv
方法(source)。
操作员代码中的execute
可能看起来与此类似:
def execute():
...
self.hook = HiveServer2Hook(...)
self.conn = self.hook.get_conn()
self.conn.to_csv(hql=self.hql, csv_filepath=self.output_filepath, ...)
推荐阅读
- javascript - 如何根据属性的值从对象中提取
- python - 如何使用python在csv中查找重复项,然后更改行
- encryption - AES 128 CTR 纯文本大小不是块大小的倍数
- r - 如何将数据列表分成两列?
- python - 在 Jupyter 笔记本中使用 SCOOP
- c++ - 普通程序会知道 cgroups 的内存限制吗?
- kotlin - Kotlin Coroutines - 无限流以扇出批次
- spring-kafka - 如何使用密钥库中的特定密钥在 Kafka 中配置 SSL
- docker - 未在 localhost 上运行的 Docker 容器
- bash - 从参数中读取 JSON