python - 尝试查询 mssql 数据库时出现 Airflow Fernet_Key 问题
问题描述
我对气流很陌生。我已经多次阅读了文档,在网上阅读了许多 S/O 问题和许多随机文章,但还没有解决这个问题。我有一种超级简单的感觉,我做错了。我有适用于 Windows 的 Docker,我提取了puckel/docker-airflow
图像并运行了一个暴露了端口的容器,这样我就可以从我的主机上访问 UI。我正在运行另一个容器mcr.microsoft.com/mssql/server
,我在该容器上恢复了 WideWorldImporters 示例数据库。在 Airflow UI 中,我已经能够成功地创建到该数据库的连接,甚至可以从 Data Profiling 部分对其进行查询。检查下面的图像:
连接创建
成功查询连接
因此,虽然这可行,但我的 dag 在第二个任务中失败了sqlData
。这是代码:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mssql_operator import MsSqlOperator
from datetime import timedelta, datetime
copyData = DAG(
dag_id='copyData',
schedule_interval='@once',
start_date=datetime(2019,1,1)
)
printHelloBash = BashOperator(
task_id = "print_hello_Bash",
bash_command = 'echo "Lets copy some data"',
dag = copyData
)
mssqlConnection = "WWI"
sqlData = MsSqlOperator(sql="select top 100 InvoiceDate, TotalDryItems from sales.invoices",
task_id="select_some_data",
mssql_conn_id=mssqlConnection,
database="WideWorldImporters",
dag = copyData,
depends_on_past=True
)
queryDataSuccess = BashOperator(
task_id = "confirm_data_queried",
bash_command = 'echo "We queried data!"',
dag = copyData
)
printHelloBash >> sqlData >> queryDataSuccess
最初的错误是:
*[2019-02-22 16:13:09,176] {{logging_mixin.py:95}} INFO - [2019-02-22 16:13:09,176] {{base_hook.py:83}} INFO - Using connection to: 172.17.0.3
[2019-02-22 16:13:09,186] {{models.py:1760}} ERROR - Could not create Fernet object: Incorrect padding
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 171, in get_fernet
_fernet = Fernet(fernet_key.encode('utf-8'))
File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 34, in __init__
key = base64.urlsafe_b64decode(key)
File "/usr/local/lib/python3.6/base64.py", line 133, in urlsafe_b64decode
return b64decode(s)
File "/usr/local/lib/python3.6/base64.py", line 87, in b64decode
return binascii.a2b_base64(s)
binascii.Error: Incorrect padding*
我注意到这与密码学有关,我继续运行pip install cryptography
and pip install airflow[crytpo]
,两者都返回了完全相同的结果,通知我要求已经得到满足。最后,我找到了说我只需要生成一个 fernet_key 的东西。我的airflow.cfg 文件中的默认键是fernet_key = $FERNET_KEY
. 所以从我运行的容器中的cli:
python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
并得到了我替换的代码$FERNET_KEY
。我重新启动了容器并重新运行了 dag,现在我的错误是:
[2019-02-22 16:22:13,641] {{models.py:1760}} ERROR -
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 106, in _verify_signature
h.verify(data[-32:])
File "/usr/local/lib/python3.6/site-packages/cryptography/hazmat/primitives/hmac.py", line 69, in verify
ctx.verify(signature)
File "/usr/local/lib/python3.6/site-packages/cryptography/hazmat/backends/openssl/hmac.py", line 73, in verify
raise InvalidSignature("Signature did not match digest.")
cryptography.exceptions.InvalidSignature: Signature did not match digest.
初始加密文档扫描中的哪个与兼容性有关?
我现在很迷茫,并决定问这个问题,看看我是否可能走错路来解决这个问题。任何帮助将不胜感激,因为 Airflow 看起来很棒。
解决方案
Thanks to some side communication from @Tomasz I finally got my DAG to work. He recommended I try using docker-compose which is also listed in the puckel/docker-airflow github repo. I ended up using the docker-compose-LocalExecutor.yml file instead of the Celery Executor though. There was some small troubleshooting and more configuration I had to go through as well. To begin, I took my existing MSSQL container that had the sample db in it and turned it into an image using docker commit mssql_container_name
. Only reason I did this is to save time having to restore the backup sample dbs; you could always copy the backups into the container and restore them later if you want. Then I added my new image to the existing docker-compose-LocalExecutor.yml file like so:
version: '2.1'
services:
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
mssql:
image: dw:latest
ports:
- "1433:1433"
webserver:
image: puckel/docker-airflow:1.10.2
restart: always
depends_on:
- postgres
- mssql
environment:
- LOAD_EX=n
- EXECUTOR=Local
#volumes:
#- ./dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
Mind you, dw is what I named the new image that was based off of the mssql container. Next, I renamed the file to just docker-compose.yml so that I could easily run docker-compose up
(not sure if there is a command to point directly to a different YAML file). Once everything was up and running, I navigated to the Airflow UI and configured my connection. Note: since you are using docker-compose you don't need to know the IP address of the other containers since they use DNS service discovery which I found out about here. Then to test the connection I went to Data Profiling to do an ad-hoc query, but the connection wasn't there. This is because the puckel/docker-airflow image doesn't have pymssql installed. So just bash into the container docker exec -it airflow_webserver_container bash
and install it pip install pymssql --user
. Exit the container and restart all services using docker-compose restart
. After a minute everything was up and running. My connection showed up in Ad hoc Query and I could successfully select data. Finally, I turned my DAG on, the scheduler picked it up and everything was successful! Super relieved after spending weeks of googling. Thanks to @y2k-shubham for helping out and some super huge appreciation to @Tomasz who I actually reached out to initially after his awesome and thorough post about Airflow on the r/datascience subreddit.
推荐阅读
- javascript - document.getElementById 错误:未捕获的 ReferenceError:分配中的左侧无效
- javascript - 未提交表单中更新的 DATE 字段值 - 问题
- python - 当线程在 ThreadPoolExecutor() 中死亡时,我如何捕捉?
- javascript - 在图像悬停时显示图像
- android - RecyclerView里面的RecyclerView显示错误的数据
- python - 使用 BeautifulSoup (4.9.0) 提取脚本内容
- docker - Dockerfile 创建 mariadb 镜像
- spring - Spring ConditionalOnProperty 无法读取 custom.properties 文件中的属性
- maven - 如何验证 Maven pom 标签?
- flutter - PageView 中类似 Flutter Hero 的过渡