python - 烧瓶应用程序启动后如何自动命中端点?
问题描述
我正在开发一个烧瓶应用程序,它不断检查来自 AWS SQS 队列的新消息并接收它们。为此,我编写了一个可以由端点触发的函数。- /start
。为简单起见,在代码中,我没有发布从 SQS 队列接收消息的所有逻辑。只需打印日期时间并等待 3 秒。
一旦烧瓶服务器启动并准备好为请求提供服务,如何使该端点被命中?
是否有任何装饰器或其他东西可以使这成为可能?
quque_services.py:
from datetime import datetime
import time
def receive_messages():
print(datetime.now())
time.sleep(3)
__init__.py:
from flask import Flask, request, jsonify
from app.workload.services.queue_services import receive_messages
def create_app(**kwargs):
app = Flask(__name__, **kwargs)
@app.route('/start')
def queue_receiver():
while True:
receive_messages()
return app
wsgi.py:
from app import create_app
application = create_app()
if __name__ == "__main__":
application.run()
解决方案
您可以在创建应用程序后使用烧瓶测试客户端向应用程序发送请求。我已将您的代码简化为下面演示这一点的单个自包含脚本。
from flask import Flask
def create_app(**kwargs):
app = Flask(__name__, **kwargs)
@app.route('/start')
def queue_receiver():
print("***Endpoint hit***")
return ""
return app
application = create_app()
if __name__ == "__main__":
application.test_client().get('/start')
print("***Starting server***")
application.run()
如果这不适用于您的代码,那么我怀疑这是因为您永远不会从/start
端点返回。我曾假设您while True
在处理所有消息时打破了循环,但为简洁起见没有包含代码。如果不是这种情况,那么可能需要将应用程序拆分为单独的客户端和服务器,如之前在评论中建议的那样。
推荐阅读
- html - 如何删除我的 html 表格单元格中的白线?
- google-chrome-app - 无限离线存储
- javascript - 表单没有从数据库中获取全文
- octobercms - 如何在 OctoberCMS 中使用带有主题的 ServiceWorker
- visual-c++ - Windows 进程间通信 (IPC)(命名管道)- 从属程序自随机时间以来未收到任何数据
- php - How to call a function with parameters from plsql in php?
- ios - 将 JSON 响应解析为结构
- spring - 使用 HandlerInterceptorAdapter 时如何测试 Spring REST webservice?
- laravel - 如何在 Laravel 包开发中自动加载服务提供者
- html - 从 DOM 中提取值