Search code examples
apache-flinkflink-streamingflink-sqlpyflink

PyFlink SQL local test


So I have a simple aggregation job written in PyFlink SQL API. The job read data from AWS kinesis and output result to Kinesis.

I am curious if I can unit-test my pipeline with say pytest? I am guessing I need mock the source and sink with filesystem connector? but how can I create a local Flink session to run the job inside the pytest ? Do we have best practice recommendation here?

Thanks!


Solution

  • You should take a look at how the tests for PyFlink itself are implemented. It sets up various base classes for implementing table test cases; PyFlinkStreamTableTestCase might a good place to start. Using this it's possible to write tests like this one that I've copied from here:

        def test_sql_query(self):
            t_env = self.t_env
            source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
            field_names = ["a", "b", "c"]
            field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
            t_env.register_table_sink(
                "sinks",
                source_sink_utils.TestAppendSink(field_names, field_types))
    
            result = t_env.sql_query("select a + 1, b, c from %s" % source)
            result.execute_insert("sinks").wait()
            actual = source_sink_utils.results()
    
            expected = ['+I[2, Hi, Hello]', '+I[3, Hello, Hello]']
            self.assert_equals(actual, expected)
    

    There are many more tests where that one came from.