首页 > 解决方案 > 可以在 s3(csv 文件) 之间创建一个 worflow(airflow) 以将其存储在 mongodb 中吗?

问题描述

我想创建一个从 s3 获取文件并将数据存储到 mongodb 的工作流,到目前为止,我采用了这种方法:

dag = DAG('s3_to_mongo',
        schedule_interval='@daily',
        catchup=False)


first_task = DummyOperator(task_id='dummy_task', dag=dag)

s3_mongo_task = S3ToMongoOperator(s3_conn_id='', s3_bucket='', s3_key='',
                                mongo_conn_id='', mongo_collection='', mongo_method='insert',
                                mongo_db=None, mongo_replacement_filter=None, upsert=False, dag=dag)

first_task >> s3_mongo_task

我正在使用运营商的官方文档:https ://github.com/airflow-plugins/mongo_plugin/blob/master/operators/s3_to_mongo_operator.py

标签: pythonamazon-web-servicesamazon-s3airflow

解决方案


Airflow 和 Mongo 插件不支持 S3 上的 CSV 文件。您需要编写自己的运算符。

class S3CsvToMongoOperator(S3ToMongoOperator):
    def __init__(*args, **kwargs):
        super().__init__(*args, **kwargs)

    def execute(self, context):
        s3 = S3Hook(self.s3_conn_id)
        mongo = MongoHook(conn_id=self.mongo_conn_id)

        data = (s3
                .get_key(self.s3_key,
                         bucket_name=self.s3_bucket)
                .get_contents_as_string(encoding='utf-8'))

        lines = data.split('\n')
        docs = [doc for doc in csv.DictReader(lines)]

        self.method_mapper(mongo, docs)

推荐阅读