python-3.x - Windows 上的 Python3 多处理使在 Linux 上运行的 ZeroMQ 代码崩溃
问题描述
我正在编写一个基于 ZeroMQ 的服务器,它需要实例化一些我通过multiprocessing.Queue
-s 与之通信的工作人员(由他们自己的脚本定义)。
基本上,我有:
- 一个主类处理与其他环境的所有通信,其中有很多事情,特别包括:
- 工作人员列表,他们每个人都通过队列获得一些指令。
有趣的部分是:我需要有几个进程能够并行地与这些工作人员进行通信。(例如,实现“安全停止”功能)
在 Linux 上一切正常,但在 Windows 上我遇到了很多问题。特别是我收到了这个错误,它似乎来自multiprocessing.spawn.py
:
no default __reduce__ due to non-trivial __cinit__
我用这个最小的工作代码重现了错误
from multiprocessing import Process
import numpy as np
import zmq
import time
class myClass():
def __init__(self):
self.context = zmq.Context()
#many zmq stuff
def foo(self, bar):
print( bar )
def run(self):
while True :
time.sleep(1)
a = np.array([1,2,3])
dico = {"a":a}
Process(target=self.foo, args=(dico,)).start()
if __name__ == "__main__":
b = myClass()
b.run()
我查了一下,似乎每次调用“运行”时都需要重新定义上下文,这是无法完成的,因为我需要通过这些队列高速发送大量数据。
如果有人知道该怎么做...
解决方案
如果有人知道该怎么做...
一、欢迎来到Zen-of-Zero的领域
如果您从未使用过 ZeroMQ,
您可能会喜欢先看看“ZeroMQ原则在不到五秒内”
,然后再深入了解更多细节
如果问题仅出现在 Windows 中(不在 linux-class O/S-es 中),人们会倾向于提出最直接的步骤,而不是......
然而,我会抵制这种诱惑,将从根本原因隔离步骤开始:
Windows 类 O/S-es 使用其他形式的进程实例化multiprocessing
最好的下一步:评估您的整个代码执行生态系统:
可以使用下面放置的测试模板来进一步添加与课程相关的实验。
ZeroMQ 部分困扰着我。鉴于Context()
-instance 在 -spawned- 期间创建,.__init__()
应multiprocessing
在Process()
Win-class O/S-es 上执行调用进程的自上而下的完整副本(是的,整个 python-interpreter-process,复制所有变量,解释器的资源的状态完整数据,... windows-mandatory spawn
-method(不是任何非 win 可用的效率fork
,forserver
在 win 中可用)只是复制所有内容 - 包括 Context()-instance...)
如果Process
-instances 在 的整个生命周期内保持半持久性(并且可能保持这种状态以备将来可能重复使用)__main__
,则Context()
-instance 将保持在预生成.__init__()
-call 的状态,并试图命令“共享” -Context()-线程副本(隐藏在每个__main__
进程副本中)。到目前为止没有冲突,但#many zmq stuff
可能会导致问题,因为ZeroMQ .Socket()
-instances 不是线程安全的(就像在 API 文档中经常警告的那样)但也是 State-Full-entities,所以“复制”-full-copy 可能很容易把事情搞砸了
待办事项:尝试记录和隔离问题演变 -
最好通过 POSACK 报告(几乎)每行执行,直到崩溃前执行和报告的最后一行(上面介绍的事后追溯太模棱两可,无法决定。__cinit__
可能与它做的地方有关实际上失败了)
这个模板可能有助于做到这一点:
import multiprocessing as mp # file: ~/p/StackOverflow_multiprocessing.py
import time, os, platform, inspect #
def test_function( i = -1 ):
pass; thisframerecord = inspect.stack()[0] # 0 represents this line
pass; callerframerecord = inspect.stack()[1] # 1 represents line at caller
pass; _INFO_ = inspect.getframeinfo( thisframerecord[0] )
pass; _CALLER_ = inspect.getframeinfo( callerframerecord[0] )
print( "{0:_>30.10f} ::".format( time.monotonic() ),
"PID:{0:} with PPID:{1:} runs".format( os.getpid(), os.getppid() ),
"{0:}( i = {2:} )-LINE[{1:_>4d}],".format( _INFO_.function, _INFO_.lineno, i ),
"invoked from {0:}()-LINE[{1:_>4d}]".format( _CALLER_.function, _CALLER_.lineno )
)
time.sleep( 10 )
pass; thisframerecord = inspect.stack()[0] # 0 represents this line
pass; _INFO_ = inspect.getframeinfo( thisframerecord[0] ) # 1 represents line at caller
print( "{0:_>30.10f} ::".format( time.monotonic() ),
"PID:{0:} with PPID:{1:} ends".format( os.getpid(), os.getppid() ),
"{0:}( i = {2:} )-LINE[{1:_>4d}],".format( _INFO_.function, _INFO_.lineno, i )
)
if __name__ == '__main__':
print( "{0:} cores reported by {1:}".format( mp.cpu_count(), "mp.cpu_count()" ) )
print( "{0:} cores permit'd by {1:}".format( os.sched_getaffinity(0), "os.sched_getaffinity(0)" ) )
print( "O/S sub-process instantiation methods {0:} available".format( mp.get_all_start_methods() ) )
print( "O/S will use this instantiation method {0:}".format( mp.get_start_method() ) )
print( "{0:_>30.10f} :: will call .Pool()".format( time.monotonic() ) )
#------mp.Pool()-----------------------------------------------------
pool = mp.Pool( mp.cpu_count() )
print( "{0:_>30.10f} :: pool.map() to be called".format( time.monotonic() ) )
#---.map()--------------------------------------?
#---.map( ?
pool.map( test_function, [i for i in range(4) ] )
#---.map( ?
#---.map()--------------------------------------?
print( "{0:_>30.10f} :: pool.map() call RETd".format( time.monotonic() ) )
pool.close()
#---.close()
print( "{0:_>30.10f} :: pool.close()-d".format( time.monotonic() ) )
pool.join()
#---.join()
print( "{0:_>30.10f} :: pool.join()-d".format( time.monotonic() ) )
print( "EXECUTED on {0:}".format( platform.version() ) )
print( "USING: python-{0:}:".format( platform.python_version() ) )
在 linux-class O/S 上可能看起来像这样:
(py3) Fri Nov 08 14:26:40 :~$ python ~/p/StackOverflow_multiprocessing.py
8 cores reported by mp.cpu_count()
{0, 1, 2, 3} cores permit'd by os.sched_getaffinity(0)
O/S sub-process instantiation methods ['fork', 'spawn', 'forkserver'] available
O/S will use this instantiation method fork
____________1284931.1678911699 :: will call .Pool()
____________1284931.2063829789 :: pool.map() to be called
____________1284931.2383207241 :: PID:15848 with PPID:15847 runs test_function( i = 0 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2506985001 :: PID:15849 with PPID:15847 runs test_function( i = 1 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2614207701 :: PID:15851 with PPID:15847 runs test_function( i = 2 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2671745829 :: PID:15850 with PPID:15847 runs test_function( i = 3 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284941.2504994699 :: PID:15848 with PPID:15847 ends test_function( i = 0 )-LINE[__16],
____________1284941.2550825749 :: PID:15849 with PPID:15847 ends test_function( i = 1 )-LINE[__16],
____________1284941.2698363690 :: PID:15851 with PPID:15847 ends test_function( i = 2 )-LINE[__16],
____________1284941.2776791099 :: PID:15850 with PPID:15847 ends test_function( i = 3 )-LINE[__16],
____________1284941.2780045229 :: pool.map() call RETd
____________1284941.2780527000 :: pool.close()-d
____________1284941.3343055181 :: pool.join()-d
EXECUTED on #1 SMP oSname M.m.n-o.p (YYYY-MM-DD)
USING: python-3.5.6:
推荐阅读
- sql - ORACLE - 物化视图
- redis - 无法连接到 Redis;嵌套异常是 io.lettuce.core.RedisConnectionException 使用 ReactiveRedisTemplate
- xml - 用一个最深的循环限制递归并为所有元素分配准确的 id
- c - 如何使用此方法将 C 宏扩展为结构和函数
- anaconda - 无法使用“c:\users\hp\anaconda3\python.exe”创建进程
- swiftui - 将 SFSymbol 添加到 VStack 会创建奇数间距
- java - Java中的2个比较器类
- xcode - 由于 Xcodebuild,每个命令的终端都停止
- google-apps-script - 满足某些条件时如何使用 GmailApp.Sendemail?
- python - imshow() 没有正确显示图像