python - Python时间和大小批处理器
问题描述
我需要一个小工具来按计数或持续时间来批处理消息,以先到者为准(应用程序:向 Kinesis 发送消息,如果生产缓慢,则一次发送一条消息,或者如果突然有很多消息要发送,则分批发送)。
给猫剥皮的方法有很多,但我想出了以下方法,它使用 adeque
和threading.Timer
. 问题是:
- 它安全吗(这是由主线程使用的)?
- 有没有更简单或更pythonic的方式来做到这一点?
- 分析表明获取
_thread.lock
并_thread.start_new_thread
需要一段时间;有没有更快的方法?(注:如果Batcher(..., seconds=None)
使用,则无此费用)。
import threading
import time
from collections import deque
class Batcher():
def __init__(self, size=None, seconds=None, callback=None):
self.batch = deque()
self.size = size
self.seconds = seconds
self.callback = callback
self.thread = None
def flush(self):
if self.thread:
self.thread.cancel()
self.thread = None
if self.batch:
a = list(self.batch)
self.batch.clear()
if self.callback:
self.callback(a)
def add(self, e):
self.batch.append(e)
if self.size is not None and len(self.batch) >= self.size:
self.flush()
elif self.seconds is not None and self.thread is None:
self.thread = threading.Timer(self.seconds, self.flush)
self.thread.start()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.flush()
简单测试:
origin = time.time()
def walltime(origin):
dt = time.time() - origin
return f'{dt:6.3f} s'
def foo(batch):
print(f'now={walltime(origin)}, batch={batch}')
with Batcher(size=3, seconds=0.5, callback=foo) as b:
for k in range(7):
b.add(f'at {walltime(origin)}: {k}')
time.sleep(0.3)
Out[ ]:
now= 0.501 s, batch=['at 0.000 s: 0', 'at 0.301 s: 1']
now= 1.101 s, batch=['at 0.601 s: 2', 'at 0.902 s: 3']
now= 1.702 s, batch=['at 1.202 s: 4', 'at 1.503 s: 5']
now= 2.103 s, batch=['at 1.803 s: 6']
速度测试:
In[ ]:
%%time
batch_stats = []
def proc(batch):
batch_stats.append(len(batch))
with Batcher(size=100, seconds=5, callback=proc) as b:
for k in range(120164):
b.add(k)
Out[ ]:
CPU times: user 166 ms, sys: 74.7 ms, total: 240 ms
Wall time: 178 ms
In[ ]:
Counter(batch_stats)
Out[ ]:
Counter({100: 1201, 64: 1})
解决方案
代码在其中花费如此多时间的原因acquire.lock
以及start_thread
每次需要启动延迟发送的计时器时都会启动一个线程。
这是一个线程在后台持续运行的解决方案。两个条件用于两个触发并等待请求的延迟。在我的计时测试中它快了一倍,但我只能在一台机器上测试:
class Batcher:
def __init__(self, size=None, seconds=None, callback=None):
self._batch = []
self._seconds = seconds
self._size = size
self._callback = callback
self._cyclic_requested = False
self._wait_for_start = False
self._cancelled = False
self._timer_started = False
self._lock = threading.RLock()
self._timer_condition = Condition(self._lock)
self._finished = Condition(self._lock)
self._finished_flag = False
self._thread = threading.Thread(target=self._cycle_send)
self._thread.start()
def _cycle_send(self):
while True:
with self._lock:
# Wait for the timer_condition to be set to start the timer
self._wait_for_start = True
# If a cyclic send was requested and the thread was not
# not waiting we go directly to the wait time
if not self._cyclic_requested:
self._timer_condition.wait()
# If finished is set end the thread
if self._finished_flag:
return
# Reset the flags
self._cyclic_requested = False
self._wait_for_start = False
self._cancelled = False
self._timer_started = True
# Wait for the finished timer to be set or the timeout
self._finished.wait(self._seconds)
# If finished is set end the thread
if self._finished_flag:
return
self._timer_started = False
# If the time_condition has been clear no sending
# is needed anymore, go back to waiting
if self._cancelled:
continue
self._timer_condition_flag = False
batch = self._batch
self._batch = []
self._send_batch(batch)
def _send_batch(self, batch):
if self._callback:
self._callback(batch)
def add(self, e):
batch = None
with self._lock:
# Unconditionally append to the batch
self._batch.append(e)
if self._size is not None and len(self._batch) >= self._size:
# If immediate send required, copy the batch and reset the shared variable
# also cancel the cycle_send by clearing
# the timer_condition and setting the finished event
batch = self._batch
self._batch = []
self._cancelled = True
self._cyclic_requested = False
self._finished.notify_all()
# If the batch is not full, set the timer send condition
elif not self._timer_started:
if self._wait_for_start:
self._timer_condition.notify_all()
else:
self._cyclic_requested = True
# the sending is done outside the lock to avoid keeping the lock for too long
if batch is not None:
self._send_batch(batch)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
with self._lock:
# Set the finish and timer condition to let the thread terminate
self._finished_flag = True
self._timer_condition.notify_all()
self._finished.notify_all()
self._thread.join()
# Send what is left
self._send_batch(self._batch)
推荐阅读
- laravel - 如何将表单数据和此 URL 保存到数据库中.. 我可以将文件上传到 cloudinary
- c++ - 如何在虚幻引擎 C++ 中检测演员是否被击中?
- c# - Encoding.UTF8.GetString 在读取字节对象时将拉丁字母 (ç,ã) 转换为问号
- javascript - Vue 组件没有使用 props 渲染
- azure - webrequest.getrequeststream 在以“nt_authority\”系统运行时抛出异常
- jekyll - YML 内容未显示在 Jekyll 集合中
- python - 为什么 QImage 的像素函数超出范围?
- jsf-2.2 - myfaces JSF2.2中的自定义属性
- android - 使用bottomsheet的动作打开另一个对话框片段而不会崩溃android导航组件
- apache-spark - 从 Spark Streaming 中的 ConsumerRecord 值创建 RDD