python - 芹菜:找不到模块
问题描述
我正在使用开放语义搜索 (OSS),我想使用Flower 工具监控其流程。Celery 需要的工人应该按照 OSS 在其网站上的说明提供
工作人员将执行排队文件的分析和索引等任务。工作程序由 etl/tasks.py 实现,并将在启动时由服务 opensemanticsearch 自动启动。
此 tasks.py 文件如下所示:
#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Queue tasks for batch processing and parallel processing
#
# Queue handler
from celery import Celery
# ETL connectors
from etl import ETL
from etl_delete import Delete
from etl_file import Connector_File
from etl_web import Connector_Web
from etl_rss import Connector_RSS
verbose = True
quiet = False
app = Celery('etl.tasks')
app.conf.CELERYD_MAX_TASKS_PER_CHILD = 1
etl_delete = Delete()
etl_web = Connector_Web()
etl_rss = Connector_RSS()
#
# Delete document with URI from index
#
@app.task(name='etl.delete')
def delete(uri):
etl_delete.delete(uri=uri)
#
# Index a file
#
@app.task(name='etl.index_file')
def index_file(filename, wait=0, config=None):
if wait:
time.sleep(wait)
etl_file = Connector_File()
if config:
etl_file.config = config
etl_file.index(filename=filename)
#
# Index file directory
#
@app.task(name='etl.index_filedirectory')
def index_filedirectory(filename):
from etl_filedirectory import Connector_Filedirectory
connector_filedirectory = Connector_Filedirectory()
result = connector_filedirectory.index(filename)
return result
#
# Index a webpage
#
@app.task(name='etl.index_web')
def index_web(uri, wait=0, downloaded_file=False, downloaded_headers=[]):
if wait:
time.sleep(wait)
result = etl_web.index(uri, downloaded_file=downloaded_file, downloaded_headers=downloaded_headers)
return result
#
# Index full website
#
@app.task(name='etl.index_web_crawl')
def index_web_crawl(uri, crawler_type="PATH"):
import etl_web_crawl
result = etl_web_crawl.index(uri, crawler_type)
return result
#
# Index webpages from sitemap
#
@app.task(name='etl.index_sitemap')
def index_sitemap(uri):
from etl_sitemap import Connector_Sitemap
connector_sitemap = Connector_Sitemap()
result = connector_sitemap.index(uri)
return result
#
# Index RSS Feed
#
@app.task(name='etl.index_rss')
def index_rss(uri):
result = etl_rss.index(uri)
return result
#
# Enrich with / run plugins
#
@app.task(name='etl.enrich')
def enrich(plugins, uri, wait=0):
if wait:
time.sleep(wait)
etl = ETL()
etl.read_configfile('/etc/opensemanticsearch/etl')
etl.read_configfile('/etc/opensemanticsearch/enhancer-rdf')
etl.config['plugins'] = plugins.split(',')
filename = uri
# if exist delete protocoll prefix file://
if filename.startswith("file://"):
filename = filename.replace("file://", '', 1)
parameters = etl.config.copy()
parameters['id'] = uri
parameters['filename'] = filename
parameters, data = etl.process (parameters=parameters, data={})
return data
#
# Read command line arguments and start
#
#if running (not imported to use its functions), run main function
if __name__ == "__main__":
from optparse import OptionParser
parser = OptionParser("etl-tasks [options]")
parser.add_option("-q", "--quiet", dest="quiet", action="store_true", default=False, help="Don\'t print status (filenames) while indexing")
parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False, help="Print debug messages")
(options, args) = parser.parse_args()
if options.verbose == False or options.verbose==True:
verbose = options.verbose
etl_delete.verbose = options.verbose
etl_web.verbose = options.verbose
etl_rss.verbose = options.verbose
if options.quiet == False or options.quiet==True:
quiet = options.quiet
app.worker_main()
我阅读了多个关于 Celery 的教程,据我了解,这条线应该可以完成这项工作
celery -A etl.tasks flower
但它没有。结果是语句
错误:无法加载 celery 应用程序。未找到模块 etl。
同样适用
celery -A etl.tasks worker --loglevel=debug
所以芹菜本身似乎造成了麻烦,而不是花。我也尝试过例如 celery -A etl.index_filedirectory worker --loglevel=debug 但结果相同。
我错过了什么?我是否必须以某种方式告诉 Celery 在哪里可以找到 etl.tasks?在线研究并没有真正显示出类似的情况,大多数“未找到模块”错误似乎都是在导入内容时发生的。所以这可能是一个愚蠢的问题,但我无法在任何地方找到解决方案。我希望你们能帮助我。不幸的是,我要到星期一才能回复,提前抱歉。
解决方案
我遇到了同样的问题,我按如下方式安装和配置了我的队列,它可以工作。
安装 RabbitMQ
苹果系统
brew install rabbitmq
sudo vim ~/.bash_profile
在bash_profile
添加以下行:
PATH=$PATH:/usr/local/sbin
然后更新bash_profile
:
sudo source ~/.bash_profile
Linux
sudo apt-get install rabbitmq-server
配置 RabbitMQ
启动队列:
sudo rabbitmq-server
在另一个终端中,配置队列:
sudo rabbitmqctl add_user myuser mypassword
sudo rabbitmqctl add_vhost myvhost
sudo rabbitmqctl set_user_tags myuser mytag
sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
启动芹菜
我建议进入包含task.py
并使用以下命令的文件夹:
celery -A task worker -l info -Q celery --concurrency 5
推荐阅读
- c# - 实体框架返回所有包含子字符串的项目
- swift - 如何在 SwiftUI 中创建自定义单元格?其中每个单元格颜色来自特定的十六进制数字列表
- android - 在设备上同时安装 2 个不同版本的 Flutter?
- typescript - `{a: number; 之间的任何行为差异;b: string}` 和 `{a: number} & {b: string}`?
- mysql - 带有弹簧启动的 Ec2 或 Azure
- oracle - PERL 在数据库准备中使用 WITH 子句
- python - 使用python预处理数据框中多列的文本数据
- python - 使用 Pandas.remove_duplicates() 时出错
- apache-nifi - Apache NiFi:将流文件属性添加到 ExecuteStreamCommand 处理器
- python - Pandas 无法访问计数列,尽管它显示在打印头中