首页 > 解决方案 > 如何将 Flask connexion 与 redis 一起使用?

问题描述

这是我的main.py文件:

from flask_redis import FlaskRedis
from controllers.utils import redis_conn # get Redis URL
import connexion

BASE_PATH = "/"

def create_app(*specs):

    _app = connexion.App(__name__)

    for s in specs:
        logger.info("Adding specs {}".format(s))
        _app.add_api(s, validate_responses=True)

    return _app

app = create_app("specs.yaml")

rd_app = app.app
rd_app.config['REDIS_URL'] = redis_conn()

redis_client = FlaskRedis(rd_app)


if __name__ == '__main__':
    app.run(host='127.0.0.1', port=5000, debug = True)

似乎 Redis 有问题并产生此错误:

ImportError:无法从部分初始化的模块“main”导入名称“redis_client”(很可能是由于循环导入)

我找不到任何使用 Redis 连接的教程。用法示例get_fruit.py

from main import redis_client

def get_fruit(colour, shape, taste):

    hash_name = rd_collections[colour+'_'+shape]
    key_name = '{}:{}'.format(hash_name, taste)
    response_redis = redis_client.get(name=key_name)

    if response_redis is None:

        result = get_fruit_name(colour, shape, taste)

        logger.debug("Updating Redis by adding {}".format(location_id))

        redis_client.set(name=key_name, value=json.dumps(result['fruit_id']), ex=60*60)
        result = OrderedDict({'Result': result})

        return result

    else:
        ...

更新:

按照建议尝试:

def create_app(*specs):
    """
    Running apps using connexion
    """
    _app = connexion.App(__name__)

    rd_app = _app.app
    rd_app.config['REDIS_URL'] = redis_conn()
    rd_client = FlaskRedis(rd_app)

    for s in specs:
        logger.info("Adding specs {}".format(s))
        _app.add_api(s, validate_responses=True)

    return _app, rd_client

app, redis_client = create_app("specs.yaml")

但仍然产生同样的错误。

标签: pythonflaskredisconnexion

解决方案


避免循环导入的标准方法是以下一种。

使用您的配置创建一个 config.py 文件。

from controllers.utils import redis_conn # get Redis URL

class MyConfig(object):
    REDIS_URL = redis_conn()

创建一个 wsgi.py 文件。这将是您的应用程序的起点。

from config import MyConfig
import app_factory

app = app_factory.create_app(MyConfig)

if __name__ == "__main__":
    with app.app_context():
        app.run()

使用 App Factory 模式创建 app_factory.py 文件:

redis_client = FlaskRedis()

def create_app(config):
    app = Flask(__name__)
    app.config.from_object(config)
    redis_client.init_app(app)
    return app

使用您的路线创建文件 routes.py:

from app_factory import redis_client

@bp.route('/')
def index():
    return redis_client.get('potato')

推荐阅读