python - 实现对 Python XML-RPC 服务器的每个函数的最大并发调用数
问题描述
我有一个SimpleXMLRPCServer
与此类似的基于 Python 的:
from multiprocessing import Process
from SimpleXMLRPCServer import SimpleXMLRPCServer
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
import SocketServer
class RPCThreading(SocketServer.ThreadingMixIn, SimpleXMLRPCServer):
pass
# Restrict to a particular path.
class RequestHandler(SimpleXMLRPCRequestHandler):
rpc_paths = ('/RPC2',)
def main():
server = RPCThreading(('127.0.0.1', 8000), requestHandler=RequestHandler)
server.register_function(tester1)
server.register_function(tester2)
print("Server running...")
server.serve_forever()
def tester1(id):
p = Process(target=my_func1, args=(id,))
p.start()
return True
def tester2(id):
p = Process(target=my_func2, args=(id,))
p.start()
return True
我想实现一种方法来跟踪当前正在执行的并发进程数量tester1
和tester2
,并且如果仍在执行的最大(用户定义)数量以上,则将每个新请求排队并在数量下降时执行低于阈值。
也许Pool
每个功能都有一个共享?
解决方案
以下似乎符合我的要求:
from multiprocessing import Process, JoinableQueue
from SimpleXMLRPCServer import SimpleXMLRPCServer
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
import SocketServer
import threading
tester1_queue = JoinableQueue()
tester2_queue = JoinableQueue()
tester1_max_concurrent = 10
tester2_max_concurrent = 10
class RPCThreading(SocketServer.ThreadingMixIn, SimpleXMLRPCServer):
pass
# Restrict to a particular path.
class RequestHandler(SimpleXMLRPCRequestHandler):
rpc_paths = ('/RPC2',)
def main():
# spin up tester1 queue watcher threads
for i in range(tester1_max_concurrent):
worker = threading.Thread(target=tester1_queue_watcher, args=(tester1_queue,))
worker.daemon = True
worker.start()
# spin up tester2 queue watcher threads
for i in range(tester2_max_concurrent):
worker = threading.Thread(target=tester2_queue_watcher, args=(tester2_queue,))
worker.daemon = True
worker.start()
server = RPCThreading(('127.0.0.1', 8000), requestHandler=RequestHandler)
server.register_function(tester1_handler, 'tester1')
server.register_function(tester2_handler, 'tester2' )
print("Server running...")
server.serve_forever()
def tester1_handler(id):
tester1_queue.put((id,))
return True
def tester1_queue_watcher(q):
while True:
id = q.get()
p = Process(target=tester1, args=(id,))
p.start()
p.join()
q.task_done()
def tester1(id):
# do stuff
def tester2_handler(id):
tester2_queue.put((id,))
return True
def tester2_queue_watcher(q):
while True:
id = q.get()
p = Process(target=tester2, args=(id,))
p.start()
p.join()
q.task_done()
def tester2(id):
# do stuff
推荐阅读
- c# - 在 Visual Studio 中仅显示 ASP.NET Core NuGet 包的 LTS 更新
- javascript - TypeError:无法读取未定义的属性“setValues”| 反应
- python - 在 MultiPolygon 中用匀称填充一个洞 - 荷兰 2 位邮政编码
- c# - 静态类中的 Asp.Net Core 配置
- lua - 我想在 AwesomeWM 的第二个屏幕上打开一个程序,但前提是第二个屏幕可用
- java - 构建表示前缀表达式的树的算法
- assembly - 如何在汇编中进行直接内存寻址
- visual-studio-code - 在 vscode 的基于 json 的设置编辑器中对设置进行排序
- ios - 方法不是公认的客观 c 方法
- gitlab - 上次部署信息不可用 - cpanel