首页 > 解决方案 > 芹菜:找不到模块

问题描述

我正在使用开放语义搜索 (OSS),我想使用Flower 工具监控其流程。Celery 需要的工人应该按照 OSS 在其网站上的说明提供

工作人员将执行排队文件的分析和索引等任务。工作程序由 etl/tasks.py 实现,并将在启动时由服务 opensemanticsearch 自动启动。

此 tasks.py 文件如下所示:

#!/usr/bin/python3
# -*- coding: utf-8 -*-

#
# Queue tasks for batch processing and parallel processing
#

# Queue handler
from celery import Celery

# ETL connectors
from etl import ETL
from etl_delete import Delete
from etl_file import Connector_File
from etl_web import Connector_Web
from etl_rss import Connector_RSS


verbose = True
quiet = False

app = Celery('etl.tasks')
app.conf.CELERYD_MAX_TASKS_PER_CHILD = 1

etl_delete = Delete()
etl_web = Connector_Web()
etl_rss = Connector_RSS()


#
# Delete document with URI from index
#

@app.task(name='etl.delete')
def delete(uri):
    etl_delete.delete(uri=uri)


#
# Index a file
#

@app.task(name='etl.index_file')
def index_file(filename, wait=0, config=None):

    if wait:
        time.sleep(wait)

    etl_file = Connector_File()

    if config:
        etl_file.config = config

    etl_file.index(filename=filename)

#
# Index file directory
#

@app.task(name='etl.index_filedirectory')
def index_filedirectory(filename):

    from etl_filedirectory import Connector_Filedirectory

    connector_filedirectory = Connector_Filedirectory()

    result = connector_filedirectory.index(filename)

    return result


#
# Index a webpage
#
@app.task(name='etl.index_web')
def index_web(uri, wait=0, downloaded_file=False, downloaded_headers=[]):

    if wait:
        time.sleep(wait)

    result = etl_web.index(uri, downloaded_file=downloaded_file, downloaded_headers=downloaded_headers)

    return result


#
# Index full website
#

@app.task(name='etl.index_web_crawl')
def index_web_crawl(uri, crawler_type="PATH"):

    import etl_web_crawl

    result = etl_web_crawl.index(uri, crawler_type)

    return result


#
# Index webpages from sitemap
#

@app.task(name='etl.index_sitemap')
def index_sitemap(uri):

    from etl_sitemap import Connector_Sitemap

    connector_sitemap = Connector_Sitemap()

    result = connector_sitemap.index(uri)

    return result


#
# Index RSS Feed
#

@app.task(name='etl.index_rss')
def index_rss(uri):

    result = etl_rss.index(uri)

    return result


#
# Enrich with / run plugins
#

@app.task(name='etl.enrich')
def enrich(plugins, uri, wait=0):

    if wait:
        time.sleep(wait)

    etl = ETL()
    etl.read_configfile('/etc/opensemanticsearch/etl')
    etl.read_configfile('/etc/opensemanticsearch/enhancer-rdf')

    etl.config['plugins'] = plugins.split(',')

    filename = uri

    # if exist delete protocoll prefix file://
    if filename.startswith("file://"):
        filename = filename.replace("file://", '', 1)

    parameters = etl.config.copy()

    parameters['id'] = uri
    parameters['filename'] = filename

    parameters, data = etl.process (parameters=parameters, data={})

    return data


#
# Read command line arguments and start
#

#if running (not imported to use its functions), run main function
if __name__ == "__main__":

    from optparse import OptionParser 

    parser = OptionParser("etl-tasks [options]")
    parser.add_option("-q", "--quiet", dest="quiet", action="store_true", default=False, help="Don\'t print status (filenames) while indexing")
    parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False, help="Print debug messages")

    (options, args) = parser.parse_args()

    if options.verbose == False or options.verbose==True:
        verbose = options.verbose
        etl_delete.verbose = options.verbose
        etl_web.verbose = options.verbose
        etl_rss.verbose = options.verbose

    if options.quiet == False or options.quiet==True:
        quiet = options.quiet

    app.worker_main()

我阅读了多个关于 Celery 的教程,据我了解,这条线应该可以完成这项工作

celery -A etl.tasks flower

但它没有。结果是语句

错误:无法加载 celery 应用程序。未找到模块 etl。

同样适用

celery -A etl.tasks worker --loglevel=debug

所以芹菜本身似乎造成了麻烦,而不是花。我也尝试过例如 celery -A etl.index_filedirectory worker --loglevel=debug 但结果相同。

我错过了什么?我是否必须以某种方式告诉 Celery 在哪里可以找到 etl.tasks?在线研究并没有真正显示出类似的情况,大多数“未找到模块”错误似乎都是在导入内容时发生的。所以这可能是一个愚蠢的问题,但我无法在任何地方找到解决方案。我希望你们能帮助我。不幸的是,我要到星期一才能回复,提前抱歉。

标签: pythoncelery

解决方案


我遇到了同样的问题,我按如下方式安装和配置了我的队列,它可以工作。

安装 RabbitMQ

苹果系统

brew install rabbitmq
sudo vim ~/.bash_profile

bash_profile添加以下行:

PATH=$PATH:/usr/local/sbin

然后更新bash_profile

sudo source ~/.bash_profile

Linux

sudo apt-get install rabbitmq-server

配置 RabbitMQ

启动队列:

sudo rabbitmq-server

在另一个终端中,配置队列:

sudo rabbitmqctl add_user myuser mypassword
sudo rabbitmqctl add_vhost myvhost
sudo rabbitmqctl set_user_tags myuser mytag
sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

启动芹菜

我建议进入包含task.py并使用以下命令的文件夹:

celery -A task worker -l info -Q celery --concurrency 5


推荐阅读