Search code examples
apache-flinkflink-sqlhive-metastoreapache-iceberg

Flink with Iceberg Catalog and Hive Metastore: org.apache.hadoop.fs.s3a.S3AFileSystem not found


I'm trying to set up Flink SQL with the Apache Iceberg catalog and Hive Metastore, but having no luck. Below are the steps I've taken on a clean Flink 1.18.1 installation, and the resulting error that I get.

Set up components

Run Hive MetaStore:

docker run --rm --detach --name hms-standalone \
           --publish 9083:9083 \
           ghcr.io/recap-build/hive-metastore-standalone:latest 

Run MinIO using Docker:

docker run --rm --detach --name minio \
            -p 9001:9001 -p 9000:9000 \
            -e "MINIO_ROOT_USER=admin" \
            -e "MINIO_ROOT_PASSWORD=password" \
            minio/minio server /data --console-address ":9001"

Provision a bucket:

docker exec minio \
    mc config host add minio http://localhost:9000 admin password
docker exec minio \
    mc mb minio/warehouse

Add the required MinIO configuration to ./conf/flink-conf.yaml:

cat >> ./conf/flink-conf.yaml <<EOF
fs.s3a.access.key: admin
fs.s3a.secret.key: password
fs.s3a.endpoint: http://localhost:9000
fs.s3a.path.style.access: true
EOF

Add JARs to Flink

Flink's S3 plugin:

mkdir ./plugins/s3-fs-hadoop
cp ./opt/flink-s3-fs-hadoop-1.18.1.jar ./plugins/s3-fs-hadoop/

Flink's Hive connector:

mkdir -p ./lib/hive
curl -s https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.18.1/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar -o ./lib/hive/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar

Dependencies for Iceberg:

mkdir ./lib/iceberg
curl https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.17/1.4.3/iceberg-flink-runtime-1.17-1.4.3.jar -o ./lib/iceberg/iceberg-flink-runtime-1.17-1.4.3.jar

mkdir -p ./lib/aws
curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.6/hadoop-aws-3.3.6.jar -o ./lib/aws/hadoop-aws-3.3.6.jar

Run it

Set the Hadoop dependency:

export HADOOP_CLASSPATH=$(~/hadoop/hadoop-3.3.2/bin/hadoop classpath)

Launch SQL Client:

./bin/sql-client.sh
Flink SQL> CREATE CATALOG c_iceberg_hive WITH (
>    'type' = 'iceberg',
>    'client.assume-role.region' = 'us-east-1',
>    'warehouse' = 's3a://warehouse',
>    's3.endpoint' = 'http://localhost:9000',
>    's3.path-style-access' = 'true',
>    'catalog-type'='hive',
>    'uri'='thrift://localhost:9083');
[INFO] Execute statement succeed.

Flink SQL> USE CATALOG c_iceberg_hive;
[INFO] Execute statement succeed.

Flink SQL> CREATE DATABASE db_rmoff;
[ERROR] Could not execute SQL statement. Reason:
MetaException(message:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found)

Flink SQL>

Full stacktrace

Caused by: org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed to execute the operation b685c995-3280-4a9e-b6c0-18ab9369d790.                                       │
│       at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:414)                                                          │
│       at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:267)                                                              │
│       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)                                                                                                │
│       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)                                                                                                               │
│       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)                                                                                                │
│       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)                                                                                                               │
│       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)                                                                                        │
│       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)                                                                                        │
│       ... 1 more                                                                                                                                                                          │
│Caused by: org.apache.flink.table.api.TableException: Could not execute CREATE DATABASE: (catalogDatabase: [{}], catalogName: [c_iceberg_hive], databaseName: [db_rmoff], ignoreIfExists: [│
│       at org.apache.flink.table.operations.ddl.CreateDatabaseOperation.execute(CreateDatabaseOperation.java:90)                                                                           │
│       at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1092)                                                                         │
│       at org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:556)                                                                     │
│       at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:444)                                                                  │
│       at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:207)                                                                  │
│       at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)                                                           │
│       at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)                                                            │
│       at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)                                                              │
│       ... 7 more                                                                                                                                                                          │
│Caused by: java.lang.RuntimeException: Failed to create namespace db_rmoff in Hive Metastore                                                                                               │
│       at org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:294)                                                                                                        │
│       at org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:222)                                                                                                      │
│       at org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:213)                                                                                                      │
│       at org.apache.flink.table.catalog.CatalogManager.createDatabase(CatalogManager.java:1381)                                                                                           │
│       at org.apache.flink.table.operations.ddl.CreateDatabaseOperation.execute(CreateDatabaseOperation.java:84)                                                                           │
│       ... 14 more                                                                                                                                                                         │
│Caused by: MetaException(message:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found)                                     │
│       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39343)                        │
│       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39311)                        │
│       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result.read(ThriftHiveMetastore.java:39245)                                                             │
│       at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)                                                                                                             │
│       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_create_database(ThriftHiveMetastore.java:1106)                                                              │
│       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.create_database(ThriftHiveMetastore.java:1093)                                                                   │
│       at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:811)                                                                                │
│       at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)                                                                                                   │
│       at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)                                                                                 │
│       at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)                                                                         │
│       at java.base/java.lang.reflect.Method.invoke(Method.java:566)                                                                                                                       │
│       at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:208)                                                                                │
│       at com.sun.proxy.$Proxy35.createDatabase(Unknown Source)                                                                                                                            │
│       at org.apache.iceberg.hive.HiveCatalog.lambda$createNamespace$8(HiveCatalog.java:283)                                                                                               │
│       at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:58)                                                                                                                    │
│       at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)                                                                                                                    │
│       at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)                                                                                                          │
│       at org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:281)                                                                                                        │
│       ... 18 more

Diagnostics

Verify that hadoop-aws is on the Classpath:

❯ ps -ef|grep sql-client|grep hadoop-aws
  501 51499 45632   0  7:38pm ttys007    0:06.81 /Users/rmoff/.sdkman/candidates/java/current/bin/java -XX:+IgnoreUnrecognizedVMOptions --add-exports=java.base/sun.net.util=ALL-UNNAMED --ad
d-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=
jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exp
orts=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-
opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens
=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNN
AMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED -Dlog.file=/Users/rmoff/flink/flink-1.18.1/log/flink-rmoff-sql-client-asgard08.log -Dlog4j.configuration=file:/Users/rmoff/
flink/flink-1.18.1/conf/log4j-cli.properties -Dlog4j.configurationFile=file:/Users/rmoff/flink/flink-1.18.1/conf/log4j-cli.properties -Dlogback.configurationFile=file:/Users/rmoff/flink/fli
nk-1.18.1/conf/logback.xml -classpath /Users/rmoff/flink/flink-1.18.1/lib/aws/hadoop-aws-3.3.6.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-cep-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/l[…]

Confirm that the JAR holds the S3AFileSystem class:

❯ jar tvf lib/aws/hadoop-aws-3.3.6.jar|grep -i filesystem.class
157923 Sun Jun 18 08:56:00 BST 2023 org/apache/hadoop/fs/s3a/S3AFileSystem.class
  3821 Sun Jun 18 08:56:02 BST 2023 org/apache/hadoop/fs/s3native/NativeS3FileSystem.class

I get the same error if I strip the CREATE CATALOG back to bare-bones too:

Flink SQL> CREATE CATALOG c_iceberg_hive2 WITH (
>    'type' = 'iceberg',
>    'warehouse' = 's3a://warehouse',
>    'catalog-type'='hive',
>    'uri'='thrift://localhost:9083');
[INFO] Execute statement succeed.

Flink SQL> USE CATALOG c_iceberg_hive2;
[INFO] Execute statement succeed.

Flink SQL> CREATE DATABASE db_rmoff;
[ERROR] Could not execute SQL statement. Reason:
MetaException(message:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found)

Java version:

❯ java --version
openjdk 11.0.21 2023-10-17
OpenJDK Runtime Environment Temurin-11.0.21+9 (build 11.0.21+9)
OpenJDK 64-Bit Server VM Temurin-11.0.21+9 (build 11.0.21+9, mixed mode)

Edit 01

Other things that I've tried:

  1. Using Flink 1.17.1 (to align with the 1.17 version in the Iceberg jar)
  2. Using Hadoop 3.3.4 components throughout
  3. Moving jars into ./lib instead of subfolders
  4. Removing the Flink s3-fs-hadoop plugin
  5. Add iceberg-aws-bundle-1.4.3.jar and aws-java-sdk-bundle-1.12.648.jar (separately, and together)
  6. Using the same setup to write to S3 (MinIO) with parquet format, which works fine.

More diagnostics:

If I add the three SQL statements (CREATE CATALOG / USE CATALOG / CREATE DATABASE) to a file and launch SQL Client with verbose class logging:

JVM_ARGS=-verbose:class ./bin/sql-client.sh -f ../iceberg.sql > iceberg.log

I get this output, showing that the hadoop-aws JAR just isn't picked up, even though it's in the classpath.

If I add Flink's s3-fs-hadoop back in we can see it being picked up (log), but still get the same failure.


Edit 02

If I switch from s3a to s3 I get a different error ¯\_(ツ)_/¯

Flink SQL> CREATE CATALOG c_iceberg_hive WITH (
>     'type' = 'iceberg',
>     'client.assume-role.region' = 'us-east-1',
>     'warehouse' = 's3://warehouse',
>     's3.endpoint' = 'http://localhost:9000',
>     's3.path-style-access' = 'true',
>     'catalog-type'='hive',
>     'uri'='thrift://localhost:9083');
[INFO] Execute statement succeed.

Flink SQL> USE CATALOG c_iceberg_hive;
[INFO] Execute statement succeed.

Flink SQL> CREATE DATABASE db_rmoff;
[ERROR] Could not execute SQL statement. Reason:
MetaException(message:Got exception: org.apache.hadoop.fs.UnsupportedFileSystemException No FileSystem for scheme "s3")

If I add in io-impl I get yet another different error, which again seems (to my limited understanding) to suggest that hadoop-aws JAR isn't being picked up

Flink SQL> CREATE CATALOG c_iceberg_hive2 WITH (
>    'type' = 'iceberg',
>    'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
>    'client.assume-role.region' = 'us-east-1',
>    'warehouse' = 's3://warehouse',
>    's3.endpoint' = 'http://localhost:9000',
>    's3.path-style-access' = 'true',
>    'catalog-type'='hive',
>    'uri'='thrift://localhost:9083');
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: software.amazon.awssdk.services.s3.model.S3Exception

Solution

  • The error you observe originates from Hive Metastore server, not from Flink:

    │Caused by: MetaException(message:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found)                                     │
    │       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39343)                        │
    │       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39311)                        │
    │       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result.read(ThriftHiveMetastore.java:39245)                                                             │
    │       at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)                                                                                                             │
    │       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_create_database(ThriftHiveMetastore.java:1106)                                                              │
    │       at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.create_database(ThriftHiveMetastore.java:1093)                                                                   │
    │       at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:811)                                                                                │
    │       at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)                                                                                                   │
    │       at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)                                                                                 │
    │       at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)                                                                         │
    │       at java.base/java.lang.reflect.Method.invoke(Method.java:566)                                                                                                                       │
    │       at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:208)                                                                                │
    │       at com.sun.proxy.$Proxy35.createDatabase(Unknown Source)                                                                                                                            │
    │       at org.apache.iceberg.hive.HiveCatalog.lambda$createNamespace$8(HiveCatalog.java:283)                                                                                               │
    │       at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:58)                                                        
    

    This indicates that this error had been received from Hive Thrift API.

    Docker image used here to run Hive does not include hadoop-aws - you need to add it yourself, or use another Hive image that contains required dependencies.