首页 > 解决方案 > 线程化独立函数并等待它们完成

问题描述

如果我用 启动一个线程start_new_thread,并且有 n 个要同时运行的独立函数,我会执行以下操作:

def foo1(x):
    print "foo1"
    time.sleep(5)

def foo2(x):
    print "foo2"
    time.sleep(1)

func_list = [foo1,foo2]

for k,j in enumerate(func_list):
    thread.start_new_thread(func_list[k],(1 ,))

这两个函数具有完全相同的代码,但两个函数也是独立的,因为它们将消息发送到独立的 ZMQ 套接字(依次等待来自外部 API 的响应,然后再发回消息以在内部处理foo)。

foo1可能需要 5 秒才能完成处理,具体取决于 API 的响应时间和有效负载的大小,因此问题是如果我尝试在新线程中再次触发它,同时它仍在处理,ZMQ 套接字会抛出一个异常(已经看过 git,这不是错误)

因此,如果 foo1 很忙,则 foo2 可用,如果 foo2 很忙,则 foo(n) 可能可用(最多 foo15),因此有很多可用的工人。但是我如何判断哪个函数正忙,如果它正忙等待它完成,或者如果其他工作人员可用,请改用它们?

请记住,我不能只处理同一函数的 15 个线程,因为出于所有意图和目的,它们都是独立的。

有人可以帮忙吗,这是我为自己创造的一个非常令人困惑的问题。谢谢。

编辑@马蒂诺-

我有我导入的套接字列表,我希望我不必这样做,但我使用的 API 没有 http 连接的限制(在合理范围内),但每个可以处理的请求数量有限制。因此,更多的连接是提高速度的唯一途径。

以下是作业的设置 - 我一次处理 10 条记录,对应于我通过 API 保持活动状态的 10 个连接。我只是将线程池化,如果一个线程很忙(这有点太复杂),我会放弃让另一个线程运行的幽灵,因此如果一个线程需要 5 秒,它将延迟下一批 10 个。这是一种妥协。

import socket_handler_a, socket_handler_b ...

def multi_call(reduce_kp, exe_func):

        def trd_call_a(x,y):
                exe_func(socket_handler_a(x),y)

        def trd_call_b(x,y):
                exe_func(socket_handler_b(x),y)

        def trd_call_c(x,y):
                exe_func(socket_handler_c(x),y)

        def trd_call_d(x,y):
                exe_func(socket_handler_d(x),y)

        def trd_call_e(x,y):
                exe_func(socket_handler_e(x),y)

        def trd_call_f(x,y):
                exe_func(socket_handler_f(x),y)

        def trd_call_g(x,y):
                exe_func(socket_handler_g(x),y)

        def trd_call_h(x,y):
                exe_func(socket_handler_h(x),y)

        def trd_call_i(x,y):
                exe_func(socket_handler_i(x),y)

        def trd_call_j(x,y):
                exe_func(socket_handler_j(x),y)

        func_list = [trd_call_a, trd_call_b,
                     trd_call_c, trd_call_d,
                     trd_call_e, trd_call_f,
                     trd_call_g, trd_call_h,
                     trd_call_i, trd_call_j]

        def chunks_(l, n):
                for i in range(0, len(l), n):
                    yield l[i:i+n]

        threads = []
        for query_lst in chunks_([i for i in reduce_kp], 10):
                for k, j in enumerate(query_lst):

                    thread1 = threading.Thread(target=func_list[k], args=(j[0] ,j[1]))
                    thread1.start()
                    threads.append(thread1)

                for thread in threads: thread.join()

这就是所谓的:

def test_case(q_list):

        reduce_kp   = []
        for k in q_list: 
                reduce_kp.append([{'QTE':'EUR_USD'}, [k,'BAL'] ])
        multi_call(reduce_kp, test_case_resp)

并且响应是从线程调用的,即

def test_case_resp(resp,x):
   #process resp

标签: pythonmultithreadingpython-2.7waitpyzmq

解决方案


推荐阅读