google bigquery – Apache Beam : WriteToBigQuery does not work if preceded by a stateful Transform unless re-windowing is applied

I have a simple pipeline that does not work as expected, and any documentation I find does not explain its behaviour. In short, WriteToBigQuery in streaming mode fails if it is preceded by a stateful Transform, like GroupIntoBatches, unless global windows are re-applied before streaming it into BQ. Does anyone have a meaningful explanation?

This does work:

    result = (
        p
        | "Read Data"
            >> beam.io.ReadFromPubSub(
                subscription="projects/myproject/subscriptions/mysubscription",
                with_attributes=False,
            ).with_output_types(bytes)
        | "Decompose Data" >> beam.ParDo(DecomposeData())
        | "Window into Fixed Intervals"
                >> beam.WindowInto(window.FixedWindows(self.window_size))
        | "Transform Data" >> beam.ParDo(TransformData())
        | "Write to BigQuery Table" >> beam.io.WriteToBigQuery(
            table=_choose_table,
            schema=_choose_schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

This fails:

    result = (
        p
        | "Read Data"
            >> beam.io.ReadFromPubSub(
                subscription="projects/myproject/subscriptions/mysubscription",
                with_attributes=False,
            ).with_output_types(bytes)
        | "Decompose Data" >> beam.ParDo(DecomposeData())
        | "Window into Fixed Intervals"
                >> beam.WindowInto(window.FixedWindows(self.window_size))
        | "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
        | "Group into Batches"
            >> beam.GroupIntoBatches(
                max_buffering_duration_secs=self.window_size,
                batch_size=self.batch_size,
            )
        | "Discard Dummy Key" >> beam.MapTuple(lambda _, val: val)**
        | "Transform Data" >> beam.ParDo(TransformData())
        | "Write to BigQuery Table" >> beam.io.WriteToBigQuery(
            table=_choose_table,
            schema=_choose_schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

This works again:

    result = (
        p
        | "Read Data"
            >> beam.io.ReadFromPubSub(
                subscription="projects/myproject/subscriptions/mysubscription",
                with_attributes=False,
            ).with_output_types(bytes)
        | "Decompose Data" >> beam.ParDo(DecomposeData())
        | "Window into Fixed Intervals"
                >> beam.WindowInto(window.FixedWindows(self.window_size))
        | "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
        | "Group into Batches"
            >> beam.GroupIntoBatches(
                max_buffering_duration_secs=self.window_size,
                batch_size=self.batch_size,
            )
        | "Discard Dummy Key" >> beam.MapTuple(lambda _, val: val)
        | "Transform Data" >> beam.ParDo(TransformData())
        | "Re-window" >> beam.WindowInto(window.GlobalWindows())
        | "Write to BigQuery Table" >> beam.io.WriteToBigQuery(
            table=_choose_table,
            schema=_choose_schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))