首页 > 解决方案 > 使用 Vault 后端的配置后,气流部署卡住

问题描述

我有一个在 Kubernetes 上运行的 Airflow 应用程序,它使用 Vault 作为秘密后端。最近我设法将我的配置值移动sql_alchemy_conn到 Vault,因为它包含用户的密码。我可以看到它正在从秘密后端获取值并能够连接到数据库并运行迁移作业。

但从那以后,我无法部署应用程序的其余部分,因为所有其他资源都卡在 init 容器中wait-for-airflow-migrations。我正在使用官方 helm chart 来部署应用程序,这是用于检查迁移是否运行的 python 代码

import airflow
import logging
import os
import time

from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory

from airflow import settings

package_dir = os.path.abspath(os.path.dirname(airflow.__file__))
directory = os.path.join(package_dir, 'migrations')
config = Config(os.path.join(package_dir, 'alembic.ini'))
config.set_main_option('script_location', directory)
config.set_main_option('sqlalchemy.url', settings.SQL_ALCHEMY_CONN.replace('%', '%%'))
script_ = ScriptDirectory.from_config(config)

timeout=60

with settings.engine.connect() as connection:
    context = MigrationContext.configure(connection)
    ticker = 0
    while True:
        source_heads = set(script_.get_heads())

        db_heads = set(context.get_current_heads())
        if source_heads == db_heads:
            break

        if ticker >= timeout:
            raise TimeoutError("There are still unapplied migrations after {} seconds.".format(ticker))
        ticker += 1
        time.sleep(1)
        logging.info('Waiting for migrations... %s second(s)', ticker)

这是wait-for-migration容器的日志

[2021-07-29 12:01:13,650] {migration.py:164} INFO - Context impl SQLiteImpl.
[2021-07-29 12:01:13,650] {migration.py:171} INFO - Will assume non-transactional DDL.
[2021-07-29 12:01:20,592] {<string>:49} INFO - Waiting for migrations... 1 second(s)
[2021-07-29 12:01:21,593] {<string>:49} INFO - Waiting for migrations... 2 second(s)
[2021-07-29 12:01:22,595] {<string>:49} INFO - Waiting for migrations... 3 second(s)
[2021-07-29 12:01:23,596] {<string>:49} INFO - Waiting for migrations... 4 second(s)
[2021-07-29 12:01:24,597] {<string>:49} INFO - Waiting for migrations... 5 second(s)
[2021-07-29 12:01:25,598] {<string>:49} INFO - Waiting for migrations... 6 second(s)
[2021-07-29 12:01:26,600] {<string>:49} INFO - Waiting for migrations... 7 second(s)
[2021-07-29 12:01:27,601] {<string>:49} INFO - Waiting for migrations... 8 second(s)
[2021-07-29 12:01:28,602] {<string>:49} INFO - Waiting for migrations... 9 second(s)
[2021-07-29 12:01:29,603] {<string>:49} INFO - Waiting for migrations... 10 second(s)
[2021-07-29 12:01:30,604] {<string>:49} INFO - Waiting for migrations... 11 second(s)
[2021-07-29 12:01:31,606] {<string>:49} INFO - Waiting for migrations... 12 second(s)
[2021-07-29 12:01:32,607] {<string>:49} INFO - Waiting for migrations... 13 second(s)
[2021-07-29 12:01:33,608] {<string>:49} INFO - Waiting for migrations... 14 second(s)
[2021-07-29 12:01:34,610] {<string>:49} INFO - Waiting for migrations... 15 second(s)
[2021-07-29 12:01:35,611] {<string>:49} INFO - Waiting for migrations... 16 second(s)
[2021-07-29 12:01:36,612] {<string>:49} INFO - Waiting for migrations... 17 second(s)
[2021-07-29 12:01:37,613] {<string>:49} INFO - Waiting for migrations... 18 second(s)
[2021-07-29 12:01:38,614] {<string>:49} INFO - Waiting for migrations... 19 second(s)
[2021-07-29 12:01:39,615] {<string>:49} INFO - Waiting for migrations... 20 second(s)
[2021-07-29 12:01:40,616] {<string>:49} INFO - Waiting for migrations... 21 second(s)
[2021-07-29 12:01:41,617] {<string>:49} INFO - Waiting for migrations... 22 second(s)
[2021-07-29 12:01:42,618] {<string>:49} INFO - Waiting for migrations... 23 second(s)
[2021-07-29 12:01:43,619] {<string>:49} INFO - Waiting for migrations... 24 second(s)
[2021-07-29 12:01:44,621] {<string>:49} INFO - Waiting for migrations... 25 second(s)
[2021-07-29 12:01:45,622] {<string>:49} INFO - Waiting for migrations... 26 second(s)
[2021-07-29 12:01:46,623] {<string>:49} INFO - Waiting for migrations... 27 second(s)
[2021-07-29 12:01:47,625] {<string>:49} INFO - Waiting for migrations... 28 second(s)
[2021-07-29 12:01:48,626] {<string>:49} INFO - Waiting for migrations... 29 second(s)
[2021-07-29 12:01:49,628] {<string>:49} INFO - Waiting for migrations... 30 second(s)
[2021-07-29 12:01:50,628] {<string>:49} INFO - Waiting for migrations... 31 second(s)
[2021-07-29 12:01:51,630] {<string>:49} INFO - Waiting for migrations... 32 second(s)
[2021-07-29 12:01:52,631] {<string>:49} INFO - Waiting for migrations... 33 second(s)
[2021-07-29 12:01:53,632] {<string>:49} INFO - Waiting for migrations... 34 second(s)
[2021-07-29 12:01:54,634] {<string>:49} INFO - Waiting for migrations... 35 second(s)
[2021-07-29 12:01:55,635] {<string>:49} INFO - Waiting for migrations... 36 second(s)
[2021-07-29 12:01:56,636] {<string>:49} INFO - Waiting for migrations... 37 second(s)
[2021-07-29 12:01:57,637] {<string>:49} INFO - Waiting for migrations... 38 second(s)
[2021-07-29 12:01:58,638] {<string>:49} INFO - Waiting for migrations... 39 second(s)
[2021-07-29 12:01:59,639] {<string>:49} INFO - Waiting for migrations... 40 second(s)
[2021-07-29 12:02:00,641] {<string>:49} INFO - Waiting for migrations... 41 second(s)
[2021-07-29 12:02:01,641] {<string>:49} INFO - Waiting for migrations... 42 second(s)
[2021-07-29 12:02:02,642] {<string>:49} INFO - Waiting for migrations... 43 second(s)
[2021-07-29 12:02:03,644] {<string>:49} INFO - Waiting for migrations... 44 second(s)
[2021-07-29 12:02:04,644] {<string>:49} INFO - Waiting for migrations... 45 second(s)
[2021-07-29 12:02:05,646] {<string>:49} INFO - Waiting for migrations... 46 second(s)
[2021-07-29 12:02:06,647] {<string>:49} INFO - Waiting for migrations... 47 second(s)
[2021-07-29 12:02:07,648] {<string>:49} INFO - Waiting for migrations... 48 second(s)
[2021-07-29 12:02:08,649] {<string>:49} INFO - Waiting for migrations... 49 second(s)
[2021-07-29 12:02:09,651] {<string>:49} INFO - Waiting for migrations... 50 second(s)
[2021-07-29 12:02:10,651] {<string>:49} INFO - Waiting for migrations... 51 second(s)
[2021-07-29 12:02:11,652] {<string>:49} INFO - Waiting for migrations... 52 second(s)
[2021-07-29 12:02:12,654] {<string>:49} INFO - Waiting for migrations... 53 second(s)
[2021-07-29 12:02:13,655] {<string>:49} INFO - Waiting for migrations... 54 second(s)
[2021-07-29 12:02:14,656] {<string>:49} INFO - Waiting for migrations... 55 second(s)
[2021-07-29 12:02:15,657] {<string>:49} INFO - Waiting for migrations... 56 second(s)
[2021-07-29 12:02:16,659] {<string>:49} INFO - Waiting for migrations... 57 second(s)
[2021-07-29 12:02:17,659] {<string>:49} INFO - Waiting for migrations... 58 second(s)
[2021-07-29 12:02:18,660] {<string>:49} INFO - Waiting for migrations... 59 second(s)
[2021-07-29 12:02:19,662] {<string>:49} INFO - Waiting for migrations... 60 second(s)
Traceback (most recent call last):
  File "<string>", line 46, in <module>
TimeoutError: There are still unapplied migrations after 60 seconds.

我做了一些调试,发现它使用的是sql_alchemy_conn配置文件中的默认值,这基本上是使用 sqlite 引擎。

我应该更新这个脚本并从 Vault 后端手动获取值,还是我缺少一些配置?

标签: airflowhashicorp-vault

解决方案


我通过从 Vault 后端手动获取配置值并创建引擎,而不是使用settings.engine.connect(). 这是我的wait-for-migration-command模板

{{ define "wait-for-migrations-command" }}
  {{/* From Airflow 2.0.0 this can become [airflow, db, check-migrations] */}}
  - python
  - -c
  - |
        import airflow
        import logging
        import os
        import time

        from airflow.providers.hashicorp.secrets.vault import VaultBackend
        from alembic.config import Config
        from alembic.runtime.migration import MigrationContext
        from alembic.script import ScriptDirectory
        from sqlalchemy.engine import create_engine

        try:
            sql_alchemy_conn = VaultBackend(
                config_path="applications/secrets/airflow/config/",
                auth_type="kubernetes",
                kubernetes_role="kubernetes_role",
            ).get_config(key="sql_alchemy_conn")
        except Exception:
            raise

        package_dir = os.path.abspath(os.path.dirname(airflow.__file__))
        directory = os.path.join(package_dir, "migrations")
        config = Config(os.path.join(package_dir, "alembic.ini"))
        config.set_main_option("script_location", directory)
        config.set_main_option("sqlalchemy.url", sql_alchemy_conn)
        script_ = ScriptDirectory.from_config(config)

        engine = create_engine(sql_alchemy_conn)

        timeout = 60
        with engine.connect() as connection:
            context = MigrationContext.configure(connection)
            ticker = 0
            while True:
                source_heads = set(script_.get_heads())

                db_heads = set(context.get_current_heads())
                if source_heads == db_heads:
                    break

                if ticker >= timeout:
                    raise TimeoutError("There are still unapplied migrations after {} seconds.".format(ticker))
                ticker += 1
                time.sleep(1)
                logging.info("Waiting for migrations... %s second(s)", ticker)
{{- end }}

我确信必须有其他方法来解决这个问题。会继续找。


推荐阅读