Search code examples
apache-flinkpyflinkamazon-kinesis-analytics

Adding external dependencies in Pyflink and use it in UDF


I am writing a UDF which will make an API call to get back JSON payload. Here is what it looks like -

@udf(result_type=DataTypes.STRING())
def get_data():
    response = requests.get("https:api_endpoint")
    logging.info(response)
    return json.loads(response.text)

table_env.create_temporary_function("get_data", get_data)

In the source table i have - get_data as get_data(), In the S3 sink table i have - get_data VARCHAR

I have all the dependencies in a requirement.txt file and i do a pip install -r requirements.txt --target=.

Then i zip the contents with zip -r pyflink.zip *. When i run my Flink application, i see that it is not able to find the dependencies from the requirements.txt file.

What am i missing ? I eventually also want to include boto3 to interact with other services.


Solution

  • So following this documentation the issue got resolved. Placing the dependencies in a directory doing -

    pip install -r requirements.txt --target=my_deps

    and add the runtime configuration as - kinesis.analytics.flink.run.options pyFiles my_deps/

    This successfully gets the dependencies and Flink job runs without errors