python - 带有 Docker 的 Python Luigi - 线程/信号问题
问题描述
概述
我们正在使用Luigi在 Docker 容器内构建管道。这是我第一次使用 Luigi,我试图让它运行,但我遇到了 Python 线程/信号错误。
我们正在构建什么
我们有一个运行 setup.py 脚本作为入口点的容器。这个脚本导入我的 Luigi 任务,但它的主要功能是打开一个 PubSub 通道到谷歌云服务。当它在该通道上接收到消息时,它会启动一系列任务。
错误。
我直接从 Python 调用 Luigi,试验了这个命令的变体:
luigi.build([GetImageSet()], workers=2, local_scheduler=True, no_lock=True)
并收到此错误:
ValueError: signal only works in main thread
Signal和Luigi的背景
来自 Python Signal 模块文档:
signal.signal:这个函数只能从主线程调用;尝试从其他线程调用它会导致引发 ValueError 异常。
从这里
的 Luigi worker.py 脚本中,
Luigi 提供了 no_install_shutdown_handler 标志(默认为 false)。'如果为真,SIGUSR1 关闭处理程序将不会安装在工作人员上'。这也是发生错误的地方(第 538 行)。该脚本在运行 signal.signal() 之前检查 no_install_shutdown_handler 的配置标志是否(默认)为 false。到目前为止,我未能让 Luigi 读取我的 client.cfg 文件并将该标志设置为 true,而 Docker 可能是罪魁祸首。
从这里的 Luigi interface.py 脚本
如果您不想从命令行运行 luigi。您可以使用此模块中定义的方法以编程方式运行 luigi。在这个脚本中,我可以提供一个自定义的工人调度工厂,但我还不能解决这个问题。
本地与全局 Luigi 调度
器 Luigi 为运行任务提供了两个调度器选项。本地的
Dockerfile woes:在我的 Dockerfile 中,我通过 pip 安装 Luigi,但没有做太多其他事情。在 github 上查看了这个和这个docker/luigi 实现之后,我开始担心我在 Dockerfile 中做得不够。
我认为发生错误的可能原因
- pub-sub 频道订阅者是非阻塞的,所以我正在做一些可能很糟糕的事情来阻止主线程在我们在后台等待消息时退出。这似乎是我的线程问题的可能来源。
- no_install_shutdown_handler 标志未成功设置为 True,这有望规避错误,但不一定是我想要的
- 本地任务调度程序。我应该使用全局调度程序而不是本地调度程序。无论如何,我最终将不得不让这个工作用于生产......
- 从 Python 而不是命令行运行脚本
- 使用
luigi.build
. 相反,我应该使用luigi.run
,但基于从 Python 运行构建的文档页面,“如果您想从另一个源(例如数据库)获取一些动态参数,或者在开始任务之前提供额外的逻辑,这很有用。” 这听起来很适合我的用例(在从发布-订阅通道接收消息后触发任务,该消息传递了运行第一个任务所需的变量)
反正我做错了吗? 如果您对实施我所描述的系统有任何建议,请告诉我。我还将根据要求发布我的 Dockerfile 和 setup.py 尝试。
一些代码示例
这是 Dockerfile
# ./Dockerfile
# sfm-base:test is the container with tensorflow & our python sfm-library. It installs Ubuntu, Python, pip etc.
FROM sfm-base:test
LABEL maintainer "---@---.io"
# Install luigi, google-cloud w/ pip in module mode
RUN python -m pip install luigi && \
python -m pip install --upgrade google-cloud
# for now at least, start.sh just calls setup.py and sets google credentials. Ignore that chmod bit if it's bad I don't know.
COPY start.sh /usr/local/bin
RUN chmod -R 755 "/usr/local/bin/start.sh"
ENTRYPOINT [ "start.sh" ]
WORKDIR /src
COPY . .
# this was my attempt at setting the Luigi client.cfg in the container
# all I'm having the config do for now is set [core] no_install_shutdown_handler: true
ENV LUIGI_CONFIG_PATH /src/client.cfg
这是 setup.py (针对 SO 进行了编辑)
# setup.py
from google.cloud import pubsub_v1
from time import sleep
import luigitasks
import luigi
import logging
import json
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
'servicename', 'pubsubcommand')
# Example task. These are actually in luigitasks.py
class GetImageSet(luigi.Task):
uri = luigi.Parameter(default='')
def requires(self):
return []
def output(self):
# write zip to local
return
def run(self):
# use the URI to retrieve the ImageSet.zip from the bucket
logging.info('Running getImageSet')
# Pubsub message came in
def onMessageReceived(message):
print('Received message: {}'.format(message))
if message.attributes:
for key in message.attributes:
if key == 'ImageSetUri':
value = message.attributes.get(key)
# Kick off the pipeline starting with the GetImageSet Task
# I've tried setting no_lock, take_lock, local_scheduler...
# General flags to try and prevent the thread issues
luigi.build([GetImageSet()], workers=3, local_scheduler=True, no_lock=False)
message.ack()
subscriber.subscribe(subscription_path, callback=onMessageReceived)
# The subscriber is non-blocking, so I keep the main thread from
# exiting to allow it to process messages in the background. Is this
# the cause of my woes?
print('Listening for messages on {}'.format(subscription_path))
while True:
sleep(60)
解决方案
发生这种情况是因为subscriber.subscribe
启动了一个后台线程。当该线程调用时,luigi.build
将引发异常。
这里最好的解决方案是使用subscriber.pull
. 请参阅文档中的示例。
推荐阅读
- c# - 将 ScriptableObject 列表保存到 json
- python - 了解 tensorflow CNN 分类的 fc1000_softmax 输出意味着什么
- javascript - 3 显示 div jquery 的超时间隔
- react-native - 在 react-native 中按下图像时使动画视差标题可点击
- python - 在 Tkinter 中使用网格制作 5x5 板
- java - 作业错误 java.lang.IllegalAccessException
- mysql - mysql支持这种“with”sql吗?
- javascript - 为什么每次发送请求时都刷新访问令牌是个坏主意?
- java - Assertj-swagger 在执行测试时抛出 org.assertj.core.error.AssertJMultipleFailuresError 多次失败
- c++ - 在 C++ 中,如何初始化指向 wchar_t* 的指针数组(导致 wchar_t**)?