Search code examples
amazon-emraws-glueaws-glue-data-catalogapache-hudi

Unable to run spark.sql on AWS Glue Catalog in EMR when using Hudi


Our setup is configured that we have a default Data Lake on AWS using S3 as storage and Glue Catalog as our metastore.

We are starting to use Apache Hudi and we could get it working following de AWS documentation. The issue is that, when using the configuration and JARs indicated in the doc, we are unable to run spark.sql on our Glue metastore.

Here follows some information.

We are creating the cluster with boto3:

emr.run_job_flow(
    Name='hudi_debugging',
    LogUri='s3n://mybucket/elasticmapreduce/',
    ReleaseLabel='emr-5.28.0',
    Applications=[
        {
            'Name': 'Spark'
        },
        {
            'Name': 'Hive'
        },
        {
            'Name': 'Hadoop'
        }
    ],
    Instances={
        'InstanceGroups': [
            {
                'Name': 'Core Instance Group',
                'Market': 'SPOT',
                'InstanceCount': 3,
                'EbsConfiguration': {'EbsBlockDeviceConfigs': [
                    {'VolumeSpecification': {'SizeInGB': 16, 'VolumeType': 'gp2'},
                     'VolumesPerInstance': 1}]},
                'InstanceRole': 'CORE',
                'InstanceType': 'm5.xlarge'
            },
            {
                'Name': 'Master Instance Group',
                'Market': 'ON_DEMAND',
                'InstanceCount': 1,
                'EbsConfiguration': {'EbsBlockDeviceConfigs': [
                    {'VolumeSpecification': {'SizeInGB':16, 'VolumeType': 'gp2'},
                     'VolumesPerInstance': 1}]},
                'InstanceRole': 'MASTER',
                'InstanceType': 'm5.xlarge'
            }
        ],
        'Ec2KeyName': 'dataengineer',
        'Ec2SubnetId': 'mysubnet',
        'EmrManagedMasterSecurityGroup': 'mysg',
        'EmrManagedSlaveSecurityGroup': 'mysg',
        'KeepJobFlowAliveWhenNoSteps': True,
    },
    Configurations=[
        {
            'Classification': 'hive-site',
            'Properties': {
                'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'}
        },
        {
            'Classification': 'spark-hive-site',
            'Properties': {
                'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'}
        },
        {
            "Classification": "spark-env",
            "Configurations": [
                {
                    "Classification": "export",
                    "Properties": {
                        "PYSPARK_PYTHON": "/usr/bin/python3",
                        "PYSPARK_DRIVER_PYTHON": "/usr/bin/python3",
                        "PYTHONIOENCODING": "utf8"
                    }
                }
            ]
        },
        {
            "Classification": "spark-defaults",
            "Properties": {
                "spark.sql.execution.arrow.enabled": "true"
            }
        },
        {
            "Classification": "spark",
            "Properties": {
                "maximizeResourceAllocation": "true"
            }
        }
    ],
    BootstrapActions=[
        {
            'Name': 'Bootstrap action',
            'ScriptBootstrapAction': {
                'Path': 's3://mybucket/bootstrap_emr.sh',
                'Args': []
            }
        },
    ],
    Steps=[
        {
            'Name': 'Setup Hadoop Debugging',
            'ActionOnFailure': 'TERMINATE_CLUSTER',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': ['state-pusher-script']
            }
        }
    ],
    JobFlowRole='EMR_EC2_DefaultRole',
    ServiceRole='EMR_DefaultRole',
    ScaleDownBehavior='TERMINATE_AT_TASK_COMPLETION',
    VisibleToAllUsers=True
)

We start the pyspark shell using the example from the doc pointed above:

pyspark \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

Then, inside the shell, when we run spark.sql("show tables") we get the following error:

Using Python version 3.7.9 (default, Aug 27 2020 21:59:41)
SparkSession available as 'spark'.
>>> spark.sql("show tables").show()
21/04/09 19:49:21 WARN HiveConf: HiveConf of name hive.server2.thrift.url does not exist
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o67.sql.
: org.apache.spark.sql.AnalysisException: java.lang.NoSuchMethodError: org.apache.http.conn.ssl.SSLConnectionSocketFactory.<init>(Ljavax/net/ssl/SSLContext;Ljavax/net/ssl/HostnameVerifier;)V;
        at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:128)
        at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:238)
        at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:117)
        ...
    ... 44 more


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/sql/session.py", line 767, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'java.lang.NoSuchMethodError: org.apache.http.conn.ssl.SSLConnectionSocketFactory.<init>(Ljavax/net/ssl/SSLContext;Ljavax/net/ssl/HostnameVerifier;)V;'

We have also tried submitting it as a step using deploy-mode client and cluster and got a similar result.

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o66.sql.
: org.apache.spark.sql.AnalysisException: java.lang.NoSuchMethodError: org.apache.http.conn.ssl.SSLConnectionSocketFactory.<init>(Ljavax/net/ssl/SSLContext;Ljavax/net/ssl/HostnameVerifier;)V;
    at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:128)
    at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:238)
    at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:117)
    ...
    at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:239)
    at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
    ... 97 more


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/mnt/tmp/spark-cd3c9851-5edd-4c6f-abf8-2f4a0af807e3/spark_sql_test.py", line 7, in <module>
    _ = spark.sql("select * from dl_raw.pagnet_clients limit 10")
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 767, in sql
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
pyspark.sql.utils.AnalysisException: 'java.lang.NoSuchMethodError: org.apache.http.conn.ssl.SSLConnectionSocketFactory.<init>(Ljavax/net/ssl/SSLContext;Ljavax/net/ssl/HostnameVerifier;)V;'

Solution

  • spark.sql.hive.convertMetastoreParquet needs to be set to true.