LaVOZs

The World’s Largest Online Community for Developers

'; google cloud dataflow - Correct syntax for defining custom trigger with OrFinally in Apache Beam in Python? - LavOzs.Com

I am trying to define a custom trigger for a sliding window that triggers repeatedly for every element, but also triggers finally at the end of the watermark. I've looked around documentation for almost an hour now but have yet to find any example :(.

        | beam.WindowInto(
            beam.window.SlidingWindows(60, 10),
            trigger= Repeatedly(
                (AfterCount(1), OrFinally(AfterWatermark()))
            ),
            accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING
        )

This is what I'm trying right now and clearly this doesn't work but I am extremely lost in what the correct syntax is.

Can you try changing the trigger like below and see

trigger=OrFinally(Repeatedly(AfterCount(1)), AfterWatermark()),
accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING
Related
Proper way to declare custom exceptions in modern Python?
Peak detection in a 2D array
How to define a two-dimensional array in Python
Early triggering and chained aggregations in Apache Beam
Apache Beam - Sliding Windows outputting multiple windows
Apache Beam/ Google Cloud Dataflow - Any solution for regularly loading reference table in pipelines?
triggering at fixed intervals in apache beam streaming
Apache Beam: Trigger for Fixed Window
Apache Beam - fixed window with default trigger producing early results