首页 > 解决方案 > Windows 上的 Python3 多处理使在 Linux 上运行的 ZeroMQ 代码崩溃

问题描述

我正在编写一个基于 ZeroMQ 的服务器,它需要实例化一些我通过multiprocessing.Queue-s 与之通信的工作人员(由他们自己的脚本定义)。

基本上,我有:
- 一个主类处理与其他环境的所有通信,其中有很多事情,特别包括:
- 工作人员列表,他们每个人都通过队列获得一些指令。

有趣的部分是:我需要有几个进程能够并行地与这些工作人员进行通信。(例如,实现“安全停止”功能)

在 Linux 上一切正常,但在 Windows 上我遇到了很多问题。特别是我收到了这个错误,它似乎来自multiprocessing.spawn.py

no default __reduce__ due to non-trivial __cinit__

我用这个最小的工作代码重现了错误

from multiprocessing import Process
import numpy as np
import zmq
import time

class myClass():
    def __init__(self):
        self.context = zmq.Context()
        #many zmq stuff
    def foo(self, bar):
        print( bar )
    def run(self):
        while True :
            time.sleep(1)
            a = np.array([1,2,3])
            dico = {"a":a}
            Process(target=self.foo, args=(dico,)).start()

if __name__ == "__main__":
    b = myClass()
    b.run()

我查了一下,似乎每次调用“运行”时都需要重新定义上下文,这是无法完成的,因为我需要通过这些队列高速发送大量数据。

如果有人知道该怎么做...

标签: python-3.xwindowsmultiprocessingzeromqcrash-reports

解决方案


如果有人知道该怎么做...

一、欢迎来到Zen-of-Zero的领域

如果您从未使用过 ZeroMQ,
您可能会喜欢先看看“ZeroMQ原则在不到五秒内
,然后再深入了解更多细节


如果问题仅出现在 Windows 中(不在 linux-class O/S-es 中),人们会倾向于提出最直接的步骤,而不是......

然而,我会抵制这种诱惑,将从根本原因隔离步骤开始:

Windows 类 O/S-es 使用其他形式的进程实例化multiprocessing

最好的下一步:评估您的整个代码执行生态系统:

可以使用下面放置的测试模板来进一步添加与课程相关的实验。

ZeroMQ 部分困扰着我。鉴于Context()-instance 在 -spawned- 期间创建,.__init__()multiprocessingProcess()Win-class O/S-es 上执行调用进程的自上而下的完整副本(是的,整个 python-interpreter-process,复制所有变量,解释器的资源的状态完整数据,... windows-mandatory spawn-method(不是任何非 win 可用的效率forkforserver在 win 中可用)只是复制所有内容 - 包括 Context()-instance...)

如果Process-instances 在 的整个生命周期内保持半持久性(并且可能保持这种状态以备将来可能重复使用)__main__,则Context()-instance 将保持在预生成.__init__()-call 的状态,并试图命令“共享” -Context()-线程副本(隐藏在每个__main__进程副本中)。到目前为止没有冲突,但#many zmq stuff可能会导致问题,因为ZeroMQ .Socket()-instances 不是线程安全的(就像在 API 文档中经常警告的那样)但也是 State-Full-entities,所以“复制”-full-copy 可能很容易把事情搞砸了

待办事项:尝试记录和隔离问题演变 -

最好通过 POSACK 报告(几乎)每行执行,直到崩溃前执行和报告的最后一行(上面介绍的事后追溯太模棱两可,无法决定。__cinit__可能与它做的地方有关实际上失败了)

这个模板可能有助于做到这一点:

import multiprocessing as mp                                            # file: ~/p/StackOverflow_multiprocessing.py
import time, os, platform, inspect                                      # 

def test_function( i = -1 ):
    pass;                                                                                                  thisframerecord = inspect.stack()[0] # 0 represents this line
    pass;                                                                                                callerframerecord = inspect.stack()[1] # 1 represents line at caller
    pass;                                                                 _INFO_ = inspect.getframeinfo(   thisframerecord[0] )
    pass;                                                               _CALLER_ = inspect.getframeinfo( callerframerecord[0] )
    print( "{0:_>30.10f} ::".format(              time.monotonic() ),
           "PID:{0:} with PPID:{1:} runs".format( os.getpid(), os.getppid() ),
           "{0:}( i = {2:} )-LINE[{1:_>4d}],".format(                     _INFO_.function,   _INFO_.lineno, i ),
           "invoked from {0:}()-LINE[{1:_>4d}]".format(                 _CALLER_.function, _CALLER_.lineno )
            )
    time.sleep( 10 )
    pass;                                                                                                  thisframerecord = inspect.stack()[0] # 0 represents this line
    pass;                                                                 _INFO_ = inspect.getframeinfo(   thisframerecord[0] )                 # 1 represents line at caller
    print( "{0:_>30.10f} ::".format(              time.monotonic() ),
           "PID:{0:} with PPID:{1:} ends".format( os.getpid(), os.getppid() ),
           "{0:}( i = {2:} )-LINE[{1:_>4d}],".format(                     _INFO_.function,   _INFO_.lineno, i )
            )

if __name__ == '__main__':
    print( "{0:} cores reported by {1:}".format( mp.cpu_count(), "mp.cpu_count()" ) )
    print( "{0:} cores permit'd by {1:}".format( os.sched_getaffinity(0), "os.sched_getaffinity(0)" ) )
    print( "O/S sub-process instantiation methods {0:} available".format( mp.get_all_start_methods() ) )
    print( "O/S will use this instantiation method {0:}".format( mp.get_start_method() ) )
    print( "{0:_>30.10f} :: will call .Pool()".format( time.monotonic() ) )
    #------mp.Pool()-----------------------------------------------------
    pool = mp.Pool( mp.cpu_count() )
    print( "{0:_>30.10f} :: pool.map() to be called".format( time.monotonic() ) )
    #---.map()--------------------------------------?
    #---.map(                                       ?
    pool.map( test_function, [i for i in range(4) ] )
    #---.map(                                       ?
    #---.map()--------------------------------------?
    print( "{0:_>30.10f} :: pool.map() call RETd".format( time.monotonic() ) )
    pool.close()
    #---.close()
    print( "{0:_>30.10f} :: pool.close()-d".format( time.monotonic() ) )
    pool.join()
    #---.join()
    print( "{0:_>30.10f} :: pool.join()-d".format( time.monotonic()          ) )
    print( "EXECUTED on {0:}".format(              platform.version()        ) )
    print( "USING: python-{0:}:".format(           platform.python_version() ) )

在 linux-class O/S 上可能看起来像这样:

(py3) Fri Nov 08 14:26:40 :~$ python ~/p/StackOverflow_multiprocessing.py
8 cores reported by mp.cpu_count()
{0, 1, 2, 3} cores permit'd by os.sched_getaffinity(0)
O/S sub-process instantiation methods ['fork', 'spawn', 'forkserver'] available
O/S will use this instantiation method fork
____________1284931.1678911699 :: will call .Pool()
____________1284931.2063829789 :: pool.map() to be called
____________1284931.2383207241 :: PID:15848 with PPID:15847 runs test_function( i = 0 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2506985001 :: PID:15849 with PPID:15847 runs test_function( i = 1 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2614207701 :: PID:15851 with PPID:15847 runs test_function( i = 2 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2671745829 :: PID:15850 with PPID:15847 runs test_function( i = 3 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284941.2504994699 :: PID:15848 with PPID:15847 ends test_function( i = 0 )-LINE[__16],
____________1284941.2550825749 :: PID:15849 with PPID:15847 ends test_function( i = 1 )-LINE[__16],
____________1284941.2698363690 :: PID:15851 with PPID:15847 ends test_function( i = 2 )-LINE[__16],
____________1284941.2776791099 :: PID:15850 with PPID:15847 ends test_function( i = 3 )-LINE[__16],
____________1284941.2780045229 :: pool.map() call RETd
____________1284941.2780527000 :: pool.close()-d
____________1284941.3343055181 :: pool.join()-d

EXECUTED on #1 SMP oSname M.m.n-o.p (YYYY-MM-DD)
USING: python-3.5.6:

推荐阅读