python - 线程化独立函数并等待它们完成
问题描述
如果我用 启动一个线程start_new_thread
,并且有 n 个要同时运行的独立函数,我会执行以下操作:
def foo1(x):
print "foo1"
time.sleep(5)
def foo2(x):
print "foo2"
time.sleep(1)
func_list = [foo1,foo2]
for k,j in enumerate(func_list):
thread.start_new_thread(func_list[k],(1 ,))
这两个函数具有完全相同的代码,但两个函数也是独立的,因为它们将消息发送到独立的 ZMQ 套接字(依次等待来自外部 API 的响应,然后再发回消息以在内部处理foo
)。
foo1
可能需要 5 秒才能完成处理,具体取决于 API 的响应时间和有效负载的大小,因此问题是如果我尝试在新线程中再次触发它,同时它仍在处理,ZMQ 套接字会抛出一个异常(已经看过 git,这不是错误)
因此,如果 foo1 很忙,则 foo2 可用,如果 foo2 很忙,则 foo(n) 可能可用(最多 foo15),因此有很多可用的工人。但是我如何判断哪个函数正忙,如果它正忙等待它完成,或者如果其他工作人员可用,请改用它们?
请记住,我不能只处理同一函数的 15 个线程,因为出于所有意图和目的,它们都是独立的。
有人可以帮忙吗,这是我为自己创造的一个非常令人困惑的问题。谢谢。
编辑@马蒂诺-
我有我导入的套接字列表,我希望我不必这样做,但我使用的 API 没有 http 连接的限制(在合理范围内),但每个可以处理的请求数量有限制。因此,更多的连接是提高速度的唯一途径。
以下是作业的设置 - 我一次处理 10 条记录,对应于我通过 API 保持活动状态的 10 个连接。我只是将线程池化,如果一个线程很忙(这有点太复杂),我会放弃让另一个线程运行的幽灵,因此如果一个线程需要 5 秒,它将延迟下一批 10 个。这是一种妥协。
import socket_handler_a, socket_handler_b ...
def multi_call(reduce_kp, exe_func):
def trd_call_a(x,y):
exe_func(socket_handler_a(x),y)
def trd_call_b(x,y):
exe_func(socket_handler_b(x),y)
def trd_call_c(x,y):
exe_func(socket_handler_c(x),y)
def trd_call_d(x,y):
exe_func(socket_handler_d(x),y)
def trd_call_e(x,y):
exe_func(socket_handler_e(x),y)
def trd_call_f(x,y):
exe_func(socket_handler_f(x),y)
def trd_call_g(x,y):
exe_func(socket_handler_g(x),y)
def trd_call_h(x,y):
exe_func(socket_handler_h(x),y)
def trd_call_i(x,y):
exe_func(socket_handler_i(x),y)
def trd_call_j(x,y):
exe_func(socket_handler_j(x),y)
func_list = [trd_call_a, trd_call_b,
trd_call_c, trd_call_d,
trd_call_e, trd_call_f,
trd_call_g, trd_call_h,
trd_call_i, trd_call_j]
def chunks_(l, n):
for i in range(0, len(l), n):
yield l[i:i+n]
threads = []
for query_lst in chunks_([i for i in reduce_kp], 10):
for k, j in enumerate(query_lst):
thread1 = threading.Thread(target=func_list[k], args=(j[0] ,j[1]))
thread1.start()
threads.append(thread1)
for thread in threads: thread.join()
这就是所谓的:
def test_case(q_list):
reduce_kp = []
for k in q_list:
reduce_kp.append([{'QTE':'EUR_USD'}, [k,'BAL'] ])
multi_call(reduce_kp, test_case_resp)
并且响应是从线程调用的,即
def test_case_resp(resp,x):
#process resp
解决方案
推荐阅读
- python - python TypeError“+的不支持的操作数类型:'int'和'str'”为什么我得到这个?
- python-3.x - 渲染 2d 高斯 - 相对于均值取梯度
- machine-learning - 逻辑回归预测错误
- spring - 将 gradle 升级到 4.7 导致 org.gradle.api.tasks.TaskExecutionException:
- xcode - 如何修复停止 Xcode 10 iOS 12 模拟器?
- cypher - 如何使用 Cypher 将两个不同的 Graph 存储为一个?
- c# - 无法获得 while 循环来检查我的用户输入是否等于某些数字
- python - HTTPS 防止 Python3 中的网站抓取
- javascript - v-on 指令可以直接修改父级的 data() 属性吗?
- c - C 编程 malloc 和 NULL