python - 使用 RQ 工作者、队列和作业时管理路径的正确方法是什么
问题描述
我的第一个问题/帖子......请善待......
我正在从事一个个人项目,其中一个模块在循环中运行以收集数据。当数据进入时,它将数据插入数据库的工作交给队列中的一个函数,一个侦听的 rq 工作人员将其拾取并处理该函数。数据库使用 SQLAlchemy 进行管理,这意味着它必须生成引擎、会话并定义数据库表。
代码文件的结构是:
--/home/..../collect-view/ (this is the project folder)
-- DataCollection
-- main_client.py (main loop waiting for user data)
-- collect_data.py (contains the database insertion function)
-- base.py (the base file for SQLAlchemy database definition)
-- tables.py (the file which sets up the table name and definition)
-- app.db (the database file)
注意:数据库文件位于更高级别的目录中,因为它也被位于该级别的另一个应用程序(Flask 应用程序)访问
要实现此代码,“collect_data”必须导入“base”和“tables”,“tables”必须导入“base”。这被证明是一个问题,因为一旦 worker 运行 collect_data 函数(称为“transfer”),它就无法再找到要导入的文件,并且 worker 会吐出一个异常,说它无法导入“base” ”。我在网上搜索了答案,最终在 Github 上从 nvie 找到了一个答案,其中提到使用 --path 选项将工作人员引导到正确的路径。我通过实施使其工作:
$ rq worker rq_worker_data2db --path /home/../../collect_view/DataCollection
然后我遇到了另一个与路径相关的故障,工作人员说它找不到我试图插入数据的数据库表。所以我更改了引擎创建步骤以包含我的完整路径......
base_url = '/home/.../collect_view/'
engine = create_engine ('sqlite:///' + base_url + 'app.db')
这个问题让我更加困惑,因为我的工作人员已经在我的 DataCollection 目录中工作,所以我认为 ('sqlite:///../app.db') 将是定位数据库的正确方法(因为它在没有 rq 工作人员的情况下进行测试)。
所以,经过长时间的解释,我的问题是:在这种情况下管理路径的正确方法是什么?对我来说,我必须使用来自 /home 的完整路径似乎是错误的......我是否遗漏了有关路径和/或 rq worker(和类似物)如何工作的信息?
我的代码文件的摘录如下:
main_client.py
from redis import Redis
import rq
from collect_data import transfer
redis_url = Redis.from_url('redis://') #(config['REDIS_URL'])
queue = rq.Queue('rq_worker-data2db', connection=redis_url)
#.....
#.....
def have_data(data):
rq_job = queue.enqueue('collect_data.transfer', data)
#.....
#.....
收集数据.py
from base import Session, engine, Base
from tables import FieldData
import time
from datetime import datetime
def transfer(info):
timestamp_in = datetime.utcnow()
session = Session()
data1 = FieldData(data=info, timestamp=timestamp_in)
session.add(data1)
session.commit()
基础.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
base_url = '/home/.../collect_view/'
engine = create_engine ('sqlite:///' + base_url + 'app.db')
Session = sessionmaker(bind=engine)
Base = declarative_base()
表格.py
from sqlalchemy import Column, String, Float, Integer, Date, DateTime, Table, ForeignKey
from base import Base
from datetime import datetime
# .....
#.....
class FieldData(Base):
__tablename__ = 'field_data'
id = Column(Integer, primary_key=True)
data = Column(String(20))
timestamp = Column(DateTime, index=True, default=datetime.utcnow)
def __init__(self, data, timestamp):
self.data = data
self.timestamp = timestamp
在终端中,我首先让 Redis 运行,然后运行 worker:
$ rq worker rq_worker_data2db --path /home/../../collect_view/DataCollection(其中 rq_worker_data2db 是 worker 名称)
解决方案
推荐阅读
- javascript - 使用javascript的动态表单
- angularjs - 角度获取和发布方法在角度 9 中不起作用
- spring-boot - Spring Boot 应用程序查询 - H2 Db NameGenerator.generateColumnNames - null
- javascript - amp-autocomplete:使用远程 url 时结果不好
- javascript - 为什么我调用该函数时不读取该函数?
- c++ - 在跨平台代码上使用 condition_variable 等待 recursive_mutex
- php - 使用 PHP 从数据库中提取信息
- php - 黑客可以将参数传递给 $_SERVER 吗?
- javascript - 在 javascript 中,有没有办法将背景图像放在 div 上,但让它从左边开始?
- c++ - 在 Xcode 中需要哪些设置来构建此代码