google-analytics - 气流连接构建凭据
问题描述
我已经编写了一个可以从命令行正常工作的 python 脚本('my-analytics.json' 文件存储在与脚本相同的文件夹中。现在我将此脚本移动到我将要移动的 AirFlow(云作曲家)这段代码到 PythonOperator。
注意(上下文):此脚本发送 API 请求以从 Google Analytics 中删除用户。
SCOPES = ['https://www.googleapis.com/auth/analytics.user.deletion']
SERVICE_ACCOUNT_FILE = 'my-analytics.json'
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE,
scopes=SCOPES
)
analytics_client = googleapiclient.discovery.build(
'analytics',
'v3',
credentials=credentials
)
user_deletion_request_resource = analytics_client.userDeletion().userDeletionRequest()
def delete_users(id):
return user_deletion_request_resource.upsert(
body = {
"deletionRequestTime": str(datetime.datetime.now()),
"kind": "analytics#userDeletionRequest",
"id": {
"userId": id,
"type": "CLIENT_ID", # Type of user (APP_INSTANCE_ID,CLIENT_ID or USER_ID)
},
"webPropertyId": "UA-XXXXX-YY" # Web property ID of the form UA-XXXXX-YY.
}
).execute()
我已经创建了一个像这样的 Google Analytics 连接并将 json 存储到 KeyFile JSON 字段中。
我的问题是如何从这个连接中建立“凭证”?我不知道如何SERVICE_ACCOUNT_FILE = 'my-analytics.json'
用 AirFlow 连接替换。非常感谢任何帮助/指导。
解决方案
在 Airflow 中,最好使用钩子和运算符进行编码,这样连接将开箱即用,无需所有额外的手动工作。
无论如何,至于你的问题。与此处的答案相似 不同之处在于 Google 连接具有唯一字段,因此您需要使用 GoogleBaseHook。
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
gcp_hook = GoogleCloudBaseHook(gcp_conn_id="your_conn")
scope = gcp_hook._get_field('scope') # or gcp_hook.scope
keyfile = gcp_hook._get_field('keyfile_dict')
keyfile_path = gcp_hook._get_field('key_path')