python - 如何使用 Python 将 Jinja2 传递给 HiveQL
问题描述
我使用 Gcloud Composer 作为我的气流。当我尝试在我的 HQL 代码中使用 Jinja 时,它没有正确翻译它。我知道HiveOperator有一个 Jinja 翻译器,因为我已经习惯了,但DataProcHiveOperator没有。
我尝试将 HiveConf 直接用于我的 HQL 文件,但是当将这些值设置为我的 Partition (ie INSERT INTO TABLE abc PARTITION (ds = ${hiveconf:ds})
)`时,它不起作用。我还在我的 HQL 文件中添加了以下内容:
SET ds=to_date(current_timestamp());
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
但它不起作用,因为 HIVE 正在将上面的公式转换为字符串。
所以我的想法是结合这两个运算符让 Jinja 翻译器正常工作,但是当我这样做时,我收到以下错误:ERROR - submit() takes from 3 to 4 positional arguments but 5 were given
.
我对 Python 编码不是很熟悉,任何帮助都会很棒,请参阅下面我正在尝试构建的运算符的代码;
Python 文件的标头(请注意该文件包含此问题中未提及的其他运算符):
import ntpath
import os
import re
import time
import uuid
from datetime import timedelta
from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.version import version
from googleapiclient.errors import HttpError
from airflow.utils import timezone
from airflow.utils.operator_helpers import context_to_airflow_vars
修改后的 DataprocHiveOperator:
class DataProcHiveOperator(BaseOperator):
template_fields = ['query', 'variables', 'job_name', 'cluster_name', 'dataproc_jars']
template_ext = ('.q',)
ui_color = '#0273d4'
@apply_defaults
def __init__(
self,
query=None,
query_uri=None,
hiveconfs=None,
hiveconf_jinja_translate=False,
variables=None,
job_name='{{task.task_id}}_{{ds_nodash}}',
cluster_name='cluster-1',
dataproc_hive_properties=None,
dataproc_hive_jars=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
region='global',
job_error_states=['ERROR'],
*args,
**kwargs):
super(DataProcHiveOperator, self).__init__(*args, **kwargs)
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.query = query
self.query_uri = query_uri
self.hiveconfs = hiveconfs or {}
self.hiveconf_jinja_translate = hiveconf_jinja_translate
self.variables = variables
self.job_name = job_name
self.cluster_name = cluster_name
self.dataproc_properties = dataproc_hive_properties
self.dataproc_jars = dataproc_hive_jars
self.region = region
self.job_error_states = job_error_states
def prepare_template(self):
if self.hiveconf_jinja_translate:
self.query_uri= re.sub(
"(\$\{(hiveconf:)?([ a-zA-Z0-9_]*)\})", "{{ \g<3> }}", self.query_uri)
def execute(self, context):
hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to)
job = hook.create_job_template(self.task_id, self.cluster_name, "hiveJob",
self.dataproc_properties)
if self.query is None:
job.add_query_uri(self.query_uri)
else:
job.add_query(self.query)
if self.hiveconf_jinja_translate:
self.hiveconfs = context_to_airflow_vars(context)
else:
self.hiveconfs.update(context_to_airflow_vars(context))
job.add_variables(self.variables)
job.add_jar_file_uris(self.dataproc_jars)
job.set_job_name(self.job_name)
job_to_submit = job.build()
self.dataproc_job_id = job_to_submit["job"]["reference"]["jobId"]
hook.submit(hook.project_id, job_to_submit, self.region, self.job_error_states)
我希望能够在我的 HQL 代码中使用 Jinja 模板,以允许对我的数据管道进行分区自动化。
PS:我将主要将 Jinja 模板用于 Partition DateStamp
有谁知道我收到的错误消息是什么+帮我解决它?
ERROR - submit() takes from 3 to 4 positional arguments but 5 were given
谢谢!
解决方案
这是因为第 5 个参数job_error_states
仅在 master 中而不在当前稳定版本中(1.10.1
)。
所以删除该参数,它应该可以工作。
推荐阅读
- python - 是否有可能使用 SymPy 找到复杂的特征值?
- c# - MS Dynamics 365 CRM 在线 - 转储实体
- c++ - STD 函数返回子序列开始的第一个迭代器
- android - 带有进度乘数的搜索栏
- python - 为什么 np.empty() 和 np.zeros() 返回不同的值?
- sql-server - SSIS-SQL 部署
- javascript - 每个 Cypress.io cy.route() 调用的随机响应
- intellij-idea - 如何在 Mac 上的 Intellij/Pycharm 运行/调试配置中配置 Azure 函数项目
- c# - 向数据库添加条目后 Blazor 组件不刷新
- linux - 虚拟地址空间