首页 > 解决方案 > 将 Airflow XCom 作为 JSON 文件上传到 Google Cloud Storage

问题描述

在 Airflow 中,我有一个 XCom 任务 ID customer_schema,我想将其转换为名为 JSON 文件final_schema.json并上传到 Google Cloud Storage。我在 Google Cloud Storage 中的存储桶名为northern_industrial_customer. 我尝试使用以下内容FileToGoogleCloudStorageOperator,但没有成功。

有谁知道我如何将我的 XCom 任务 IDcustomer_schema作为名为 的 JSON 文件传输到 Google Cloud Storage final_schema.json

transfer_to_gcs = FileToGoogleCloudStorageOperator(task_id = 'transfer_to_gcs', src = "{{task_instance.xcom_pull(task_ids='customer_schema')}}", dst = 'final_schema.json', bucket = 'northern_industrial_customer', google_cloud_storage_conn_id = conn_id_gcs)

标签: google-cloud-platformgoogle-cloud-firestoregoogle-bigquerygoogle-cloud-storageairflow

解决方案


Airflow 中没有执行这些操作的操作符,但是 Airflow 是可扩展的,您可以编写自己的自定义操作符。

import tempfile
import warnings

from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class ContentToGoogleCloudStorageOperator(BaseOperator):
    """
    Uploads a text content to Google Cloud Storage.
    Optionally can compress the content for upload.

    :param content: Content to upload. (templated)
    :type src: str
    :param dst: Destination path within the specified bucket, it must be the full file path
        to destination object on GCS, including GCS object (ex. `path/to/file.txt`) (templated)
    :type dst: str
    :param bucket: The bucket to upload to. (templated)
    :type bucket: str
    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
    :type gcp_conn_id: str
    :param mime_type: The mime-type string
    :type mime_type: str
    :param delegate_to: The account to impersonate, if any
    :type delegate_to: str
    :param gzip: Allows for file to be compressed and uploaded as gzip
    :type gzip: bool
    """
    template_fields = ('src', 'dst', 'bucket')

    @apply_defaults
    def __init__(self,
                 content,
                 dst,
                 bucket,
                 gcp_conn_id='google_cloud_default',
                 mime_type='application/octet-stream',
                 delegate_to=None,
                 gzip=False,
                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)

        self.content = content
        self.dst = dst
        self.bucket = bucket
        self.gcp_conn_id = gcp_conn_id
        self.mime_type = mime_type
        self.delegate_to = delegate_to
        self.gzip = gzip

    def execute(self, context):
        """
        Uploads the file to Google cloud storage
        """
        hook = GoogleCloudStorageHook(
            google_cloud_storage_conn_id=self.gcp_conn_id,
            delegate_to=self.delegate_to
        )

        with tempfile.NamedTemporaryFile(prefix="gcs-local") as file:
            file.write(self.content)
            file.flush()
            hook.upload(
                bucket_name=self.bucket,
                object_name=self.dst,
                mime_type=self.mime_type,
                filename=file.name,
                gzip=self.gzip,
            )

transfer_to_gcs = ContentToGoogleCloudStorageOperator(
    task_id = 'transfer_to_gcs', 
    content = "{{task_instance.xcom_pull(task_ids='customer_schema')}}", 
    dst = 'final_schema.json', 
    bucket = 'northern_industrial_customer', 
    gcp_conn_id = conn_id_gcs)

请注意,在 Airflow 2.0google_cloud_storage_conn_id中,FileToGoogleCloudStorageOperator 运算符中的参数已停用。你应该使用gcp_conn_id


推荐阅读