python - 可以在 s3(csv 文件) 之间创建一个 worflow(airflow) 以将其存储在 mongodb 中吗?
问题描述
我想创建一个从 s3 获取文件并将数据存储到 mongodb 的工作流,到目前为止,我采用了这种方法:
dag = DAG('s3_to_mongo',
schedule_interval='@daily',
catchup=False)
first_task = DummyOperator(task_id='dummy_task', dag=dag)
s3_mongo_task = S3ToMongoOperator(s3_conn_id='', s3_bucket='', s3_key='',
mongo_conn_id='', mongo_collection='', mongo_method='insert',
mongo_db=None, mongo_replacement_filter=None, upsert=False, dag=dag)
first_task >> s3_mongo_task
我正在使用运营商的官方文档:https ://github.com/airflow-plugins/mongo_plugin/blob/master/operators/s3_to_mongo_operator.py
解决方案
Airflow 和 Mongo 插件不支持 S3 上的 CSV 文件。您需要编写自己的运算符。
class S3CsvToMongoOperator(S3ToMongoOperator):
def __init__(*args, **kwargs):
super().__init__(*args, **kwargs)
def execute(self, context):
s3 = S3Hook(self.s3_conn_id)
mongo = MongoHook(conn_id=self.mongo_conn_id)
data = (s3
.get_key(self.s3_key,
bucket_name=self.s3_bucket)
.get_contents_as_string(encoding='utf-8'))
lines = data.split('\n')
docs = [doc for doc in csv.DictReader(lines)]
self.method_mapper(mongo, docs)
推荐阅读
- html - 如何用不同的颜色为css中的每个li着色
- python - 如何在 if/else 结构中提问?
- arrays - 将具有 uint8 元素的数组重新解释为具有 uint16 元素的数组
- javascript - javascript 中的错误和 Xquery 中的查询
- html - 如何制作带有标题文本/正文的div框
- javascript - 何时从 Node.js Firebase 函数“返回”
- reactjs - Facebook Pixel 中如何计算 PageView?
- python - AttributeError:模块“tensorflow.python.framework.ops”没有属性“_TensorLike”
- typescript - 没有 React 的 Typescript 和 JSX:子类型
- google-cloud-platform - 如何使用 GCP REST API 执行查询