首页 > 解决方案 > 触发定时器回调

问题描述

我有一个有状态的 DoFn,我在其中批处理元素一段时间,然后使用计时器触发批处理刷新。就像是:

class BatchToPubSub(DoFn):
    HEAP_STATE_SPEC = ReadModifyWriteStateSpec("events_heap", CDCEventHeapCoder())
    TIMER = TimerSpec("timer", TimeDomain.WATERMARK)

    BATCH_SIZE = 512

    def process(
        self,
        element,
        events_heap_state=DoFn.StateParam(HEAP_STATE_SPEC),
        timer=DoFn.TimerParam(TIMER),
        t=DoFn.TimestampParam,
    ):
        _, event = element
        events_heap = events_heap_state.read() or CDCEventHeap()
        events_heap.push(event)
        self.incoming_count.inc()
        events_heap_state.write(events_heap)

        if len(events_heap) >= self.BATCH_SIZE:
            timer.set(time.time())   # here I want to force timer callback
            # self.expire(...) manual call to callback
        else:
            timer.set(t + Duration(seconds=ALLOWED_LATENESS))

通过设置计时器触发回调的最佳方法是什么?我注意到,如果将计时器设置为当前时间戳,则永远不会触发回调。另一种方法是手动调用回调,我对此很好,但我在想是否最好让计时器触发它?

更新计时器规范和状态规范的添加代码。

标签: google-cloud-dataflowapache-beam

解决方案


推荐阅读