首页 > 解决方案 > Python并行运行多个程序

问题描述

假设我有数千个传感器读数被添加到 Redis 或 Apache Kafka 等队列中,并使用 foreach sensor_id 获取其自己的工作人员,该工作人员对历史读数进行计算。每个 sensor_id 都将发布到一个新主题,然后开始流式传输读数。

每个工作人员将首先从数据库中读取以实例化其一些可变阈值。除了使用 sensor_id 作为查找键检索到的 sensor_id 和可变阈值之外,每个 worker 的代码都是相同的。一旦将 sensor_id 添加到队列中,就会为其分配一个永久运行的工作人员。

这样做会更有效吗:

  1. multiprocessing.Pool 启动数千个无限期运行的工作人员?
  2. 或者“python script.py param1 param2 &”数千次,只是改变参数来实例化?(我会以编程方式生成一个 bash 脚本以包含所有这些逻辑仅供参考)

我希望所有工作人员尽可能快地并行运行,这些是 CPU 绑定任务,而不是 I/O 绑定。启动所有工作人员的最佳方式是什么?

说明:每个 sensor_id 每秒都会生成传感器读数,因此对于 3000 个传感器,每秒会生成 3000 个事件,队列中会生成 3000 个主题。有效负载示例是 JSON {sensor_id: "hash", temperature: 85, rpm: 1200, ...}。我的理想设置是每个工作人员在内存中保留最后 200 个左右的读数以运行计算。另一种方法是一个中央队列,其中循环工作人员必须首先建立一个数据库连接来读取它从队列中弹出的 sensor_id 的 200 个读数,但这需要时间。

标签: pythonparallel-processingmultiprocessingpython-multiprocessing

解决方案


我的理解是Redis列表比发布/订阅更可靠,更适合单个消息需要被多个消费者消费的情况。如果所有传感器都写入同一个列表,那么您的应用程序也将大大简化,然后您可以拥有一个由相同工作人员组成的处理池循环读取该列表。该消息自然会识别涉及哪个传感器,并且当工作人员读取它以前见过的新传感器 ID 的消息时,它必须通过从数据库中读取相关信息来对该传感器进行“第一次”初始化,并且将其保存在由传感器 ID 键入的字典中。最终,这本词典最终可能有 3,000 个条目。由此得出的结论是,池应该使用包含所有 3,000 个条目的字典来初始化一次,所有工作人员甚至在开始阅读消息之前都可以访问这些条目。

但是,如果出于某种原因所有 3,000 个传感器必须写入 3,000 个不同的Redis列表(如果您使用列表Redis开始),则可以使用以下代码。然后的想法是找到一种方法,以某种方式从 3,000 个列表中“同时”读取消息,在消息可用时检索它们并将它们写入工作人员可以从中读取的单个队列,从而简化工作人员逻辑。此代码基于12.13。David Beazley 和 Brian K. Jones在Python Cookbook 第 3 版中轮询多线程队列可在此处获得。这已适用于轮询 3,000 个Redis列表并将读取的项目发送到单个multiprocessing.Queue实例。此代码还包括一个producer工作人员处理池,模拟传感器读数的创建,这些读数被添加到 3,000 个列表之一,您在此处的实际代码中不会有这些。但是PollableList,负责获取传感器读数的任何代码都必须可以访问这 3,000 个实例。我们这里也只有一次Process读取消息队列,即对象,并打印它们,以保持打印“有序”。实际上,正如我所提到的,您将拥有一个进程池。

不幸的是,Cookbook 中描述的技术似乎对传递给select的列表的大小有限制,因此为了安全起见,被迫将列表大小限制为 500,这意味着我不得不打破3,000 个Redis列表分为 6 组,每组 500 个列表。

import redis
import socket
import os
import json
import select
from multiprocessing import Process, Queue
from multiprocessing.pool import ThreadPool
from functools import partial

class PollableList():
    r = redis.Redis()

    def __init__(self, idx):
        self.idx = idx
        self.list_name = f'sensor_{idx}'
        # Create a pair of connected sockets
        if os.name == 'posix':
            self._putsocket, self._getsocket = socket.socketpair()
        else:
            # Compatibility on non-POSIX systems
            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server.bind(('127.0.0.1', 0))
            server.listen(1)
            self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self._putsocket.connect(server.getsockname())
            self._getsocket, _ = server.accept()
            server.close()

    def fileno(self):
        return self._getsocket.fileno()

    def put(self, item):
        PollableList.r.rpush(self.list_name, json.dumps(item))
        self._putsocket.send(b'x')

    def get(self):
        self._getsocket.recv(1)
        return json.loads(PollableList.r.lpop(self.list_name).decode())

def producer(lists):
    """ emulate the 3000 sensors """
    # Feed data to the lists (we won't run indefinitely)
    for _ in range(3):
        for lst in lists:
            lst.put({'id': lst.idx, 'value': lst.idx * 100 + 1})

def consumer(q, list_names):
    '''
    Consumer that reads data on multiple lists simultaneously
    '''
    while True:
        can_read, _, _ = select.select(list_names, [], [])
        for r in can_read:
            item = r.get()
            q.put(item)

# in actual use case, there would be a pool of workers:
def worker(q):
    message_number = 0
    while True:
        item = q.get()
        message_number += 1
        print(message_number, item)


def main():
    lists = [PollableList(i) for i in range(0, 3000)]
    # select cannot handle all 3000 sockets at once:
    lists1 = lists[0:500]
    lists2 = lists[500:1000]
    lists3 = lists[1000:1500]
    lists4 = lists[1500:2000]
    lists5 = lists[2000:2500]
    lists6 = lists[2500:3000]

    p0 = Process(target=producer, args=(lists,))
    p0.daemon = True
    p0.start()

    q = Queue()
    thread_pool = ThreadPool(6)
    thread_pool.map_async(partial(consumer, q), [lists1, lists2, lists3, lists4, lists5, lists6])

    # This would in reality be a process pool of workers reading from q:
    p1 = Process(target=worker, args=(q,))
    p1.daemon = True
    p1.start()

    # wait for all 9000 messages to be displayed by worker:
    input('Hit enter to terminate...\n')

# required for Windows:
if __name__ == '__main__':
    main()

推荐阅读