首页 > 解决方案 > Python时间和大小批处理器

问题描述

我需要一个小工具来按计数或持续时间来批处理消息,以先到者为准(应用程序:向 Kinesis 发送消息,如果生产缓慢,则一次发送一条消息,或者如果突然有很多消息要发送,则分批发送)。

给猫剥皮的方法有很多,但我想出了以下方法,它使用 adequethreading.Timer. 问题是:

  1. 它安全吗(这是由主线程使用的)?
  2. 有没有更简单或更pythonic的方式来做到这一点?
  3. 分析表明获取_thread.lock_thread.start_new_thread需要一段时间;有没有更快的方法?(注:如果Batcher(..., seconds=None)使用,则无此费用)。

import threading
import time
from collections import deque

class Batcher():

    def __init__(self, size=None, seconds=None, callback=None):
        self.batch = deque()
        self.size = size
        self.seconds = seconds
        self.callback = callback
        self.thread = None
    
    def flush(self):
        if self.thread:
            self.thread.cancel()
            self.thread = None
        if self.batch:
            a = list(self.batch)
            self.batch.clear()
            if self.callback:
                self.callback(a)

    def add(self, e):
        self.batch.append(e)
        if self.size is not None and len(self.batch) >= self.size:
            self.flush()
        elif self.seconds is not None and self.thread is None:
            self.thread = threading.Timer(self.seconds, self.flush)
            self.thread.start()
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_value, traceback):
        self.flush()

简单测试:

origin = time.time()

def walltime(origin):
    dt = time.time() - origin
    return f'{dt:6.3f} s'

def foo(batch):
    print(f'now={walltime(origin)}, batch={batch}')

with Batcher(size=3, seconds=0.5, callback=foo) as b:
    for k in range(7):
        b.add(f'at {walltime(origin)}: {k}')
        time.sleep(0.3)

Out[ ]:
now= 0.501 s, batch=['at  0.000 s: 0', 'at  0.301 s: 1']
now= 1.101 s, batch=['at  0.601 s: 2', 'at  0.902 s: 3']
now= 1.702 s, batch=['at  1.202 s: 4', 'at  1.503 s: 5']
now= 2.103 s, batch=['at  1.803 s: 6']

速度测试:

In[ ]:
%%time
batch_stats = []

def proc(batch):
    batch_stats.append(len(batch))

with Batcher(size=100, seconds=5, callback=proc) as b:
    for k in range(120164):
        b.add(k)

Out[ ]:
CPU times: user 166 ms, sys: 74.7 ms, total: 240 ms
Wall time: 178 ms

In[ ]:
Counter(batch_stats)

Out[ ]:
Counter({100: 1201, 64: 1})

标签: python

解决方案


代码在其中花费如此多时间的原因acquire.lock以及start_thread每次需要启动延迟发送的计时器时都会启动一个线程。

这是一个线程在后台持续运行的解决方案。两个条件用于两个触发并等待请求的延迟。在我的计时测试中它快了一倍,但我只能在一台机器上测试:

class Batcher:
    def __init__(self, size=None, seconds=None, callback=None):
        self._batch = []
        self._seconds = seconds
        self._size = size
        self._callback = callback
        self._cyclic_requested = False
        self._wait_for_start = False
        self._cancelled = False
        self._timer_started = False
        self._lock = threading.RLock()
        self._timer_condition = Condition(self._lock)
        self._finished = Condition(self._lock)
        self._finished_flag = False
        self._thread = threading.Thread(target=self._cycle_send)
        self._thread.start()

    def _cycle_send(self):
        while True:
            with self._lock:
                # Wait for the timer_condition to be set to start the timer
                self._wait_for_start = True
                # If a cyclic send was requested and the thread was not
                # not waiting we go directly to the wait time
                if not self._cyclic_requested:
                    self._timer_condition.wait()

                # If finished is set end the thread
                if self._finished_flag:
                    return

                # Reset the flags
                self._cyclic_requested = False
                self._wait_for_start = False
                self._cancelled = False
                self._timer_started = True

                # Wait for the finished timer to be set or the timeout
                self._finished.wait(self._seconds)
                # If finished is set end the thread
                if self._finished_flag:
                    return

                self._timer_started = False
                # If the time_condition has been clear no sending
                # is needed anymore, go back to waiting
                if self._cancelled:
                    continue
                self._timer_condition_flag = False


                batch = self._batch
                self._batch = []

            self._send_batch(batch)

    def _send_batch(self, batch):
        if self._callback:
            self._callback(batch)

    def add(self, e):
        batch = None
        with self._lock:
            # Unconditionally append to the batch
            self._batch.append(e)

            if  self._size is not None and len(self._batch) >= self._size:
                # If immediate send required, copy the batch and reset the shared variable
                # also cancel the cycle_send by clearing
                # the timer_condition and setting the finished event
                batch = self._batch
                self._batch = []
                self._cancelled = True
                self._cyclic_requested = False
                self._finished.notify_all()

            # If the batch is not full, set the timer send condition
            elif not self._timer_started:
                if self._wait_for_start:
                    self._timer_condition.notify_all()
                else:
                    self._cyclic_requested = True

        # the sending is done outside the lock to avoid keeping the lock for too long
        if batch is not None:
            self._send_batch(batch)


    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        with self._lock:
            # Set the finish and timer condition to let the thread terminate
            self._finished_flag = True
            self._timer_condition.notify_all()
            self._finished.notify_all()

        self._thread.join()
        # Send what is left
        self._send_batch(self._batch)

推荐阅读