首页 > 解决方案 > 使用 RQ 工作者、队列和作业时管理路径的正确方法是什么

问题描述

我的第一个问题/帖子......请善待......

我正在从事一个个人项目,其中一个模块在循环中运行以收集数据。当数据进入时,它将数据插入数据库的工作交给队列中的一个函数,一个侦听的 rq 工作人员将其拾取并处理该函数。数据库使用 SQLAlchemy 进行管理,这意味着它必须生成引擎、会话并定义数据库表。

代码文件的结构是:

--/home/..../collect-view/  (this is the project folder)
    -- DataCollection
        -- main_client.py  (main loop waiting for user data)
        -- collect_data.py (contains the database insertion function)
        -- base.py         (the base file for SQLAlchemy database definition)
        -- tables.py       (the file which sets up the table name and definition)
    -- app.db                  (the database file)

注意:数据库文件位于更高级别的目录中,因为它也被位于该级别的另一个应用程序(Flask 应用程序)访问

要实现此代码,“collect_data”必须导入“base”和“tables”,“tables”必须导入“base”。这被证明是一个问题,因为一旦 worker 运行 collect_data 函数(称为“transfer”),它就无法再找到要导入的文件,并且 worker 会吐出一个异常,说它无法导入“base” ”。我在网上搜索了答案,最终在 Github 上从 nvie 找到了一个答案,其中提到使用 --path 选项将工作人员引导到正确的路径。我通过实施使其工作:

$ rq worker rq_worker_data2db --path /home/../../collect_view/DataCollection

然后我遇到了另一个与路径相关的故障,工作人员说它找不到我试图插入数据的数据库表。所以我更改了引擎创建步骤以包含我的完整路径......

base_url = '/home/.../collect_view/'
engine = create_engine ('sqlite:///' + base_url + 'app.db')

这个问题让我更加困惑,因为我的工作人员已经在我的 DataCollection 目录中工作,所以我认为 ('sqlite:///../app.db') 将是定位数据库的正确方法(因为它在没有 rq 工作人员的情况下进行测试)。

所以,经过长时间的解释,我的问题是:在这种情况下管理路径的正确方法是什么?对我来说,我必须使用来自 /home 的完整路径似乎是错误的......我是否遗漏了有关路径和/或 rq worker(和类似物)如何工作的信息?

我的代码文件的摘录如下:

main_client.py

from redis import Redis
import rq
from collect_data import transfer

redis_url = Redis.from_url('redis://')  #(config['REDIS_URL'])
queue = rq.Queue('rq_worker-data2db', connection=redis_url)

#.....
#.....

def have_data(data):

    rq_job = queue.enqueue('collect_data.transfer', data)

#.....
#.....

收集数据.py

from base import Session, engine, Base
from tables import FieldData
import time
from datetime import datetime

def transfer(info):
    timestamp_in = datetime.utcnow()
    session = Session()
    data1 = FieldData(data=info, timestamp=timestamp_in)
    session.add(data1)
    session.commit()

基础.py

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
base_url = '/home/.../collect_view/'
engine = create_engine ('sqlite:///' + base_url + 'app.db')
Session = sessionmaker(bind=engine)
Base = declarative_base()

表格.py

from sqlalchemy import  Column, String, Float, Integer, Date, DateTime, Table, ForeignKey
from base import Base
from datetime import datetime
# .....
#.....
class FieldData(Base):
    __tablename__ = 'field_data'
    id = Column(Integer, primary_key=True)
    data = Column(String(20))
    timestamp = Column(DateTime, index=True, default=datetime.utcnow)

    def __init__(self, data, timestamp):
        self.data = data
        self.timestamp = timestamp

在终端中,我首先让 Redis 运行,然后运行 ​​worker:

$ rq worker rq_worker_data2db --path /home/../../collect_view/DataCollection(其中 rq_worker_data2db 是 worker 名称)

标签: pythonsqlalchemytask-queuepython-rq

解决方案


推荐阅读