Search code examples
javaamazon-web-servicesamazon-s3apache-flinkflink-streaming

How can I pass AWS Configs to Flink's flink-s3-fs-hadoop plugin in local tests?


Problem Summary

  • I am trying to run a local integration test for my flink app, which reads parquet files from AWS s3, does some transformations, and writes the output back to s3; I am using the below:
  • Because the flink app interacts with s3 (I believe the parquet format above requires it), I have flink-s3-fs-hadoop as a test dependency in my project
  • The issue:
    • There seems to be no way to pass all required AWS configs to the S3 client that the flink-s3-fs-hadoop plugin uses
    • Due to the above, a local test using Flink & AWS s3 is not currently possible; I run into AmazonS3Exception 403 Forbidden when my source tries to read the parquet files that I put in localstack s3
    • The AWS configs I need to pass are below:
      • s3.access-key
      • s3.secret-key
      • s3.endpoint
      • s3.endpoint.region
      • s3.path.style.access
  • Note: the application works in "prod" because it runs on EMR; it's just locally that this issue occurs

What I Have Tried

  1. Adding AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_ENDPOINT_URL, and AWS_REGION as environment variables (IntelliJ run configurations)

  2. Adding a conf/flink-conf.yaml under my test resources folder and setting the following environment variable FLINK_CONF_DIR=src/test/resources/conf

    • It seemed that Flink could find the file, but it did not pass any of the values to the plugin
  3. Creating a local execution environment with the aws configs specified (see code below)

        Properties props = new Properties();
        props.put("s3.access-key", "test");
        props.put("s3.secret-key",  "test");
        props.put("s3.endpoint", "http://localstack:4566");
        props.put("s3.endpoint.region", "us-east-1");
        props.put("s3.path.style.access", "true");
        Configuration configuration = ConfigurationUtils.createConfiguration(props);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(16, configuration);
  1. I spun up a Flink cluster (jobmanager + taskmanager) locally using docker-compose, added the AWS configs as k,v pairs within the FLINK_PROPERTIES environment variable (which then appends to the flink-conf), enabled the flink-s3-fs-hadoop plugin, and confirmed that the flink app works correctly when submitted to this cluster (i.e. this confirmed that the app works when the configs are added and available to the plugin)
    • Note: even though this way works, it is not good enough for my use-case; I need to be able to run a test within an automated test suite like Junit
services:
  localstack:
    container_name: "awslocal"
    image: localstack/localstack:1.4.0
    ports:
      - "127.0.0.1:4510-4559:4510-4559"
      - "127.0.0.1:4566:4566"
      - "127.0.0.1:4571:4571"
    environment:
      - SERVICES=s3
      - DEBUG=1
      - LS_LOG=trace
      - AWS_ACCESS_KEY_ID=test
      - AWS_SECRET_ACCESS_KEY=test
      - AWS_DEFAULT_REGION=us-east-1
      - HOST_TMP_FOLDER=${TMPDIR:-/tmp/}localstack
      - DOCKER_HOST=unix:///var/run/docker.sock
    volumes:
      - "${TMPDIR:-/tmp}/localstack:/var/lib/localstack"
      - "/var/run/docker.sock:/var/run/docker.sock"
  jobmanager:
    container_name: "jobmanager"
    image: flink:1.17.0-java11
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        s3.access-key: test
        s3.secret-key: test
        s3.endpoint: http://awslocal:4566
        s3.endpoint.region: us-east-1
        s3.path.style.access: true
      - ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17.0.jar
    volumes:
      - /Users/myname/IdeaProjects/my-flink-app/target:/opt/flink/usrlib
  taskmanager:
    container_name: "taskmanager"
    image: flink:1.17.0-java11
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2
        s3.access-key: test
        s3.secret-key: test
        s3.endpoint: http://awslocal:4566
        s3.endpoint.region: us-east-1
        s3.path.style.access: true
      - ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17.0.jar

Solution

  • You could try this:

    1- Create your test config file (flink-config.yaml) with

    ## S3 config
    s3.access-key: test
    s3.secret-key: test
    s3.endpoint: http://awslocal:4566
    s3.endpoint.region: us-east-1
    s3.path.style.access: true
    

    2- Load this configuration with

    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.configuration.GlobalConfiguration;
    import org.apache.flink.core.fs.FileSystem;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    Configuration loadedConfig = GlobalConfiguration.loadConfiguration("directory/where/is/flinkconfig");
    FileSystem.initialize(config, null);
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(loadedConfig);
    

    The important part is FileSystem.initialize(config, null);