Search code examples
pythonapache-beampython-typingmypy

How to type hint the Apache Beam default DoFn.TimestampParam


I'm struggling to annotate an extra parameter of a DoFn process method, specifically a timestamp parameter.

Minimal example:

import apache_beam as beam
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils.timestamp import TimestampTypes

class Do(beam.DoFn):
    def process(
        self,
        element: int,
        timestamp: TimestampTypes = beam.DoFn.TimestampParam,
    ) -> Iterable[TimestampedValue[int]]:
        yield TimestampedValue(element, timestamp)

Note: TimestampTypes has a type of Union[int, float, Timestamp]

This results in mypy stating that the parameter type is incorrect:

Incompatible default for argument "timestamp" (default has type "_DoFnParam", argument has type "Union[int, float, Timestamp]")

However, if I annotate the parameter as indicated, the resulting timestamp type is then incorrect:

import apache_beam as beam
from apache_beam.transforms.core import _DoFnParam
from apache_beam.transforms.window import TimestampedValue

class Do(beam.DoFn):
    def process(
        self,
        element: int,
        timestamp: _DoFnParam = beam.DoFn.TimestampParam,
    ) -> Iterable[TimestampedValue[int]]:
        yield TimestampedValue(element, timestamp)

Argument 2 to "TimestampedValue" has incompatible type "_DoFnParam"; expected "Union[int, float, Timestamp]"

Has anyone resolved this discrepancy successfully, or is this a limitation of type hinting in Beam that I should ignore checking for now?


Solution

  • As mentioned by @chepner and @user3412205 in the comments, this can be solved by:

    1. Typing the timestamp parameter as: Union[TimestampTypes, _DoFnParam]
    2. Adding a check in the process method like:
    if not isinstance(timestamp, (int, float)):
        raise TypeError()
    

    The full minimal example that passes type checking (mypy==1.11.1 and apache-beam==2.58.0) looks like:

    from typing import Iterable, Union
    
    import apache_beam as beam
    from apache_beam.transforms.core import _DoFnParam
    from apache_beam.transforms.window import TimestampedValue
    from apache_beam.utils.timestamp import TimestampTypes
    
    
    class Do(beam.DoFn):
        def process(
            self,
            element: int,
            timestamp: Union[TimestampTypes, _DoFnParam] = beam.DoFn.TimestampParam,
        ) -> Iterable[TimestampedValue[int]]:
            if not isinstance(timestamp, (int, float)):
                raise TypeError()
    
            yield TimestampedValue(element, timestamp)
    

    Posting the answer as community wiki for the benefit of the community that might encounter this use case in the future. Feel free to edit this answer for additional information.