Problem Summary
flink-s3-fs-hadoop
as a test dependency in my projectflink-s3-fs-hadoop
plugin usesAmazonS3Exception 403 Forbidden
when my source tries to read the parquet files that I put in localstack s3What I Have Tried
Adding AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_ENDPOINT_URL, and AWS_REGION as environment variables (IntelliJ run configurations)
Adding a conf/flink-conf.yaml
under my test resources folder and setting the following environment variable FLINK_CONF_DIR=src/test/resources/conf
Creating a local execution environment with the aws configs specified (see code below)
Properties props = new Properties();
props.put("s3.access-key", "test");
props.put("s3.secret-key", "test");
props.put("s3.endpoint", "http://localstack:4566");
props.put("s3.endpoint.region", "us-east-1");
props.put("s3.path.style.access", "true");
Configuration configuration = ConfigurationUtils.createConfiguration(props);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(16, configuration);
flink-s3-fs-hadoop
plugin, and confirmed that the flink app works correctly when submitted to this cluster (i.e. this confirmed that the app works when the configs are added and available to the plugin)
services:
localstack:
container_name: "awslocal"
image: localstack/localstack:1.4.0
ports:
- "127.0.0.1:4510-4559:4510-4559"
- "127.0.0.1:4566:4566"
- "127.0.0.1:4571:4571"
environment:
- SERVICES=s3
- DEBUG=1
- LS_LOG=trace
- AWS_ACCESS_KEY_ID=test
- AWS_SECRET_ACCESS_KEY=test
- AWS_DEFAULT_REGION=us-east-1
- HOST_TMP_FOLDER=${TMPDIR:-/tmp/}localstack
- DOCKER_HOST=unix:///var/run/docker.sock
volumes:
- "${TMPDIR:-/tmp}/localstack:/var/lib/localstack"
- "/var/run/docker.sock:/var/run/docker.sock"
jobmanager:
container_name: "jobmanager"
image: flink:1.17.0-java11
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
s3.access-key: test
s3.secret-key: test
s3.endpoint: http://awslocal:4566
s3.endpoint.region: us-east-1
s3.path.style.access: true
- ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17.0.jar
volumes:
- /Users/myname/IdeaProjects/my-flink-app/target:/opt/flink/usrlib
taskmanager:
container_name: "taskmanager"
image: flink:1.17.0-java11
depends_on:
- jobmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
s3.access-key: test
s3.secret-key: test
s3.endpoint: http://awslocal:4566
s3.endpoint.region: us-east-1
s3.path.style.access: true
- ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17.0.jar
You could try this:
1- Create your test config file (flink-config.yaml) with
## S3 config
s3.access-key: test
s3.secret-key: test
s3.endpoint: http://awslocal:4566
s3.endpoint.region: us-east-1
s3.path.style.access: true
2- Load this configuration with
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Configuration loadedConfig = GlobalConfiguration.loadConfiguration("directory/where/is/flinkconfig");
FileSystem.initialize(config, null);
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(loadedConfig);
The important part is FileSystem.initialize(config, null);