python - Airflow docker 命令通过 xCom 进行通信
问题描述
我有一个 Airflow docker 容器和另外两个容器(dc1 和 dc2)。我正在尝试在task1中的dc1(通过DockerOperator)中执行一个命令,并将其输出用于task2中的dc2命令。
我有一个可行的解决方案,但不幸的是,它并不强大:(
我正在阅读 dc1 日志,它在 99% 的情况下都有效
命令1.py
# a simple version of the real script
print({'date': '2020-05-03'})
气流/dags/dag1.py
# a wrapper class
class DOperator(DockerOperator):
def __init__(self, task_id, command, dag, *args, **kwargs):
super().__init__(
image='docker_image:latest',
task_id=task_id,
command=command,
api_version='auto',
auto_remove=True,
docker_url='unix://var/run/docker.sock',
network_mode='bridge',
tty=True,
xcom_push=True,
dag=dag,
*args,
**kwargs
)
def execute(self, context):
# get the last log line from docker stdout
docker_log = super().execute(context)
# push XComs from the json
if docker_log:
try:
result = json.loads(docker_log)
for key in result.keys():
context['ti'].xcom_push(key=key, value=result[key])
except:
pass
return docker_log
# Dcocker container 1
task1 = DOperator(
dag=dag,
task_id='task1',
command='python comand1.py', # its output is '2020-05-03'
)
# Dcocker container 2
task2 = DOperator(
task_id='task2',
command='python comand2.py --date={}'.format(
"{{{{ task_instance.xcom_pull(dag_id='{}', task_ids='{}', key='{}') }}}}".format(
dag.dag_id,
task1.task_id,
'date'
)
)
task1 >> task2
dc1 日志
[2020-08-31 06:50:38,868] {{docker_operator.py:242}} INFO - {"date": "2020-05-03"}
但是没有的时候有1%
在这种情况下,dc1 日志包含一个空的额外行,我无法正确提取输出
dc1 日志
[2020-08-31 06:50:38,868] {{docker_operator.py:242}} INFO - {"date": "2020-05-03"}
[2020-08-31 06:50:38,868] {{docker_operator.py:242}} INFO -
所以我的问题是:
- 你知道如何解决这个问题吗
- 或者您知道在两个 docker 操作员之间进行交流的更好方式吗?
解决方案
以下内容应该适用于您正在做的事情。我的解决方案使用 XCOM 流量在容器之间传递数据。它确实需要在 dockerized 任务之间执行一个任务,以便可以提取 XCOM。以下是我用来向其他人展示如何将 XCOM 流量传入和传出容器化 Airflow 任务的示例 dag。我知道它适用于 Airflow 1.10.6
。我刚刚升级到1.10.12
,我遇到了一些问题,XCOM_JSON
没有传递给容器。希望这会有所帮助.. 如果您有问题,请提出问题,我会尽力详细说明。
使用 Docker 的示例 DAG
example_docker_dag.py
import time
import logging
import datetime
from airflow.hooks.base_hook import BaseHook
from airflow.models import DAG, Variable, XCom
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.docker_operator import DockerOperator
from airflow.utils.dates import days_ago
import ast
logger = logging.getLogger(__name__)
args = {
'start_date': days_ago(1),
'ownder': 'mjcamp'
}
dag = DAG(
dag_id ='example-docker-dag',
default_args=args,
max_active_runs=6,
schedule_interval='@daily'
)
def make_xcom_traffic(**context):
print("some task that does things..")
time.sleep(2)
data = dict(test="Pass some data to the next task or tasks.", number=545)
Variable.set("Example-XCom", data)
return data # conventional way of doing XCOM ... don't have to really return anything but for uniforimity sake
def read_xcom(**context):
print("Read XCOM from DockerOperator.. and maybe do things..")
time.sleep(1)
# Grab xcom from previous task(dockerized task)..
data = context['task_instance'].xcom_pull(task_ids=context['task'].upstream_task_ids, key='return_value')
# Airflow seems to be broken and will return all 'stdoout' as the XCOM traffic so we have to parse it or
# write our code to only `print` the serialized thing we want.. in this case we are just printing a directionary.
# if you do something like this and have string data in your return value.. either don't use
# new lines for your stirngs or choose a different way to break things..
xcom_data = data[0].split('\n')[-1]
print("THIS IS WHAT I WANT:", xcom_data)
xcom_data = ast.literal_eval(xcom_data)
# Showing we have a python dictionary now...
print("Int =>", xcom_data["Int"])
print("Str =>", xcom_data["Str"])
print("Float =>", xcom_data["Float"])
t1 = PythonOperator(
task_id='make-xcom-traffic',
provide_context=True,
python_callable=make_xcom_traffic,
dag=dag)
t2 = DockerOperator(
environment={
"Example-XCom" : Variable.get("Example-XCom"),
},
task_id="docker-text",
image="example-docker-task",
auto_remove=True,
xcom_push=True,
xcom_all=False, # <<<=== things are broken in Airflow and this doesn't do what you expect.. it does nothing
docker_url='unix:///var/run/docker.sock',
# docker_conn_id='containeryard',
api_version='auto',
dag=dag
)
t3 = PythonOperator(
task_id='var-test',
provide_context=True,
python_callable=read_xcom,
dag=dag
)
t1 >> t2 >> t3
容器化任务
Dockerfile
FROM python:3.6
ENV EXAMPLE_XCOM=not-set
COPY ./requirements.txt /tmp/requirements.txt
RUN pip install -r /tmp/requirements.txt
COPY ./code.py /tmp/code.py
CMD ["/usr/local/bin/python", "/tmp/code.py"]
要求.txt
pandas==0.24.2
numpy==1.17.1
代码.py
import ast
import pandas as pd
import numpy as np
import os
from datetime import datetime
import json
import glob
xcom_json = os.getenv('XCOM_JSON')
print('Hello, from Airflow-Worker!')
print('Time on deck is...', datetime.now())
# Jst to show what things get passed in as environment variables
print('My ENV is >>', os.environ.keys())
# Data being passed in as XCom like variable
print("Received EXAMPLE XCOM TRAFFIC >>", os.getenv("Example-XCom"))
xcom_data = ast.literal_eval(os.getenv("Example-XCom"))
print("The Number =>", xcom_data["number"])
intvar = np.random.randint(0,100)
strvar = "test_string ===>>" + xcom_data["test"]
floatvar = np.random.rand() * xcom_data["number"]
# make a dictionary / json like object to return in `stdout`
result = dict(Int=intvar, Str=strvar, Float=floatvar)
# return the results as stdout for airflow
print(result)
气流变量
Key: Example-XCom
Value: {'test': 'Pass some data to the next task or tasks.', 'number': 545}
推荐阅读
- shell - 在 Shell 中创建目录会导致最后一个文件夹有额外的“\r”
- spring-boot - Jacoco 方法错过计数无法检测到模拟测试
- stripe-payments - 如何使用 Stripe API 计算销售税?
- oracle-adf - adf Richtable 在页面刷新时重复列
- yugabyte-db - 用于确定 YugabyteDB 中服务器负载的指标端点
- python-3.x - 使用python从excel中的图表图例中获取值
- git - 从 git 中删除名为“~”的文件并取消暂存
- reactjs - useEffect 代码不会触发调度,即使它没有工作?
- javascript - 未选中时输入单选不会隐藏内容
- python - 使用 exec 或带有 python 输入的子进程运行文件