google-cloud-dataflow - 触发定时器回调
问题描述
我有一个有状态的 DoFn,我在其中批处理元素一段时间,然后使用计时器触发批处理刷新。就像是:
class BatchToPubSub(DoFn):
HEAP_STATE_SPEC = ReadModifyWriteStateSpec("events_heap", CDCEventHeapCoder())
TIMER = TimerSpec("timer", TimeDomain.WATERMARK)
BATCH_SIZE = 512
def process(
self,
element,
events_heap_state=DoFn.StateParam(HEAP_STATE_SPEC),
timer=DoFn.TimerParam(TIMER),
t=DoFn.TimestampParam,
):
_, event = element
events_heap = events_heap_state.read() or CDCEventHeap()
events_heap.push(event)
self.incoming_count.inc()
events_heap_state.write(events_heap)
if len(events_heap) >= self.BATCH_SIZE:
timer.set(time.time()) # here I want to force timer callback
# self.expire(...) manual call to callback
else:
timer.set(t + Duration(seconds=ALLOWED_LATENESS))
通过设置计时器触发回调的最佳方法是什么?我注意到,如果将计时器设置为当前时间戳,则永远不会触发回调。另一种方法是手动调用回调,我对此很好,但我在想是否最好让计时器触发它?
更新计时器规范和状态规范的添加代码。
解决方案
推荐阅读
- python - Python 调用 robocopy 从单个源复制到多个目标
- ionic-framework - 如何在模板中添加 VS 代码片段
- c++ - C++:使用 set 方法时程序崩溃
- react-native - RequireNativeComponent: "SRSRadialGradient" was not found in the UIManager
- android - 使用 PreferenceDataStore 时 PreferenceScreen/PreferenceFragment 的初始值
- sql - Query on two tables with overlapping dates
- python - Values in if-else
- c++ - 如何正确使用 MAXIMUM_ALLOWED?
- amazon-web-services - How to expose a NATS server externally
- java - Setting "Callable" on a Java object from within Nashorn script