Search code examples
javadockerpysparkairflowminio

Airflow container running spark job cannot connect to s3 bucket


I have a docker compose file e.g.

services:
  minio:
    image: "quay.io/minio/minio"
    command: server --console-address ":9090" /data
    ports:
      - "9000:9000"
      - "9090:9090"

    volumes:
      - ~/minio/data:/data
    environment:
      MINIO_ROOT_USER: admin
      MINIO_ROOT_PASSWORD: admin

Spark Airflow Dockerfile:

FROM apache/airflow:2.7.0b1-python3.11

# install python dependancies
COPY requirements.txt /
RUN pip install --no-cache-dir "apache-airflow==${AIRFLOW_VERSION}" -r /requirements.txt

# set user to root so we can install packages
USER root

ENV SPARK_VERSION=3.4.1 \
HADOOP_VERSION=3 \
SPARK_HOME=/opt/spark \
PYTHONHASHSEED=1


RUN apt update && \
    apt-get install -y openjdk-11-jdk && \
    apt-get install -y ant && \
    apt-get clean;
# Set JAVA_HOME
RUN mv /usr/lib/jvm /opt/jvm

ENV JAVA_HOME=/opt/jvm/java-11-openjdk-arm64
RUN export JAVA_HOME

RUN apt install wget

# install Spark
RUN wget https://downloads.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz

RUN tar xvf spark-*

RUN rm spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz

RUN mv spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION} $SPARK_HOME

RUN export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin:$JAVA_HOME/bin

COPY spark/jars $SPARK_HOME/jars

And a separate spark container that is trying to read from that bucket. I can connect to the bucket within the spark container (ipython, boto3) but for some reason within a pyspark spark session this request is hanging:

Curl the minio endpoint as a sanity check:

(airflow)curl --head http://minio:9000
HTTP/1.1 400 Bad Request
Accept-Ranges: bytes
Content-Length: 225
Content-Type: application/xml
Server: MinIO
Vary: Origin
Date: Sat, 05 Aug 2023 19:26:57 GM

getting data via boto3 client

import boto3
import json
import os

AWS_PROFILE = "default"

my_session = boto3.session.Session(aws_access_key_id=ACCESS_KEY,aws_secret_access_key=SECRET_KEY)

boto3.setup_default_session(profile_name=AWS_PROFILE)
client = my_session.client("s3", endpoint_url="http://minio:9000")
s3_obj = client.get_object(Bucket="google-trends-raw-files", Key="43f6c3_2023-05-27-2023-06-03.json").get("Body")
json_data = json.loads(s3_obj.read())
print(json_data)

Out[29]: 
{'timeline_data': [{'date': 'May 27, 2023',
   'timestamp': '1685145600',
   'values': [{'query': '/m/0lwkh', 'value': '99', 'extracted_value': 99},
    {'query': '/m/026c12z', 'value': '55', 'extracted_value': 55},
    {'query': '/m/03gc5x', 'value': '18', 'extracted_value': 18},
    {'query': '/m/036rzw', 'value': '6', 'extracted_value': 6},
    {'query': '/m/033j5y', 'value': '4', 'extracted_value': 4}]},

...

However when I start a SparkSession e.g.

from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .appName("test reading s3") \
        .config("spark.pyspark.python", "python") \
        .config("fs.s3a.access.key", ACCESS_KEY) \
        .config("fs.s3a.secret.key", SECRET_KEY) \
        .config("fs.s3a.endpoint", "http://minio:9000").getOrCreate()

spark.sparkContext.setLogLevel("INFO")
spark.read.json("s3a://google-trends-raw-files/43f6c3_2023-05-27-2023-06-03.json")

23/08/06 16:14:16 INFO SparkContext: Running Spark version 3.4.1
23/08/06 16:14:16 INFO ResourceUtils: ==============================================================
23/08/06 16:14:16 INFO ResourceUtils: No custom resources configured for spark.driver.
23/08/06 16:14:16 INFO ResourceUtils: ==============================================================
23/08/06 16:14:16 INFO SparkContext: Submitted application: test reading s3
23/08/06 16:14:16 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
23/08/06 16:14:16 INFO ResourceProfile: Limiting resource is cpu
23/08/06 16:14:16 INFO ResourceProfileManager: Added ResourceProfile id: 0
23/08/06 16:14:16 INFO SecurityManager: Changing view acls to: airflow
23/08/06 16:14:16 INFO SecurityManager: Changing modify acls to: airflow
23/08/06 16:14:16 INFO SecurityManager: Changing view acls groups to: 
23/08/06 16:14:16 INFO SecurityManager: Changing modify acls groups to: 
23/08/06 16:14:16 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: airflow; groups with view permissions: EMPTY; users with modify permissions: airflow; groups with modify permissions: EMPTY
23/08/06 16:14:16 INFO Utils: Successfully started service 'sparkDriver' on port 37915.
23/08/06 16:14:16 INFO SparkEnv: Registering MapOutputTracker
23/08/06 16:14:16 INFO SparkEnv: Registering BlockManagerMaster
23/08/06 16:14:16 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
23/08/06 16:14:16 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
23/08/06 16:14:16 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/08/06 16:14:16 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-a90c99b3-eeb2-42d7-9ddf-005f854211c5
23/08/06 16:14:16 INFO MemoryStore: MemoryStore started with capacity 434.4 MiB
23/08/06 16:14:16 INFO SparkEnv: Registering OutputCommitCoordinator
23/08/06 16:14:16 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI
23/08/06 16:14:16 INFO Utils: Successfully started service 'SparkUI' on port 4040.
23/08/06 16:14:16 INFO Executor: Starting executor ID driver on host d14438f96c2a
23/08/06 16:14:16 INFO Executor: Starting executor with user classpath (userClassPathFirst = false): ''
23/08/06 16:14:17 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40691.
23/08/06 16:14:17 INFO NettyBlockTransferService: Server created on d14438f96c2a:40691
23/08/06 16:14:17 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
23/08/06 16:14:17 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, d14438f96c2a, 40691, None)
23/08/06 16:14:17 INFO BlockManagerMasterEndpoint: Registering block manager d14438f96c2a:40691 with 434.4 MiB RAM, BlockManagerId(driver, d14438f96c2a, 40691, None)
23/08/06 16:14:17 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, d14438f96c2a, 40691, None)
23/08/06 16:14:17 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, d14438f96c2a, 40691, None)
23/08/06 16:14:17 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
23/08/06 16:14:17 INFO SharedState: Warehouse path is 'file:/opt/airflow/spark-warehouse'.

this read operation hangs Indefinitely

Spark Jars:

hadoop-aws-3.3.4.jar
hadoop-client-api-3.3.4.jar
hadoop-client-runtime-3.3.4.jar
hadoop-yarn-server-web-proxy-3.3.4.jar
parquet-hadoop-1.12.3.jar
aws-java-sdk-bundle-1.12.262.jar
hadoop-aws-3.3.4.jar
delta-core_2.12-2.4.0.jar
delta-storage-2.4.0.jar

Any ideas on why my spark session is not able to read from the minio bucket?

This does however work when I am running the Spark on my local machine.


Solution

  • I have managed to find a way to get it to work: Use the socket library to resolve the minio containers ip address from within the spark container...

    Step 1. Amend thedocker-compose.yml to include hostname.

    services:
      minio:
        hostname: minio
        image: "quay.io/minio/minio"
        command: server --console-address ":9090" /data
        ports:
          - "9000:9000"
          - "9090:9090"
    
        volumes:
          - ~/minio/data:/data
        environment:
          MINIO_ROOT_USER: admin
          MINIO_ROOT_PASSWORD: admin
    
    

    Step 2. use the socket.gethostbyname method to resolve the ip address of the minio container

    import socket
    from pyspark.sql import SparkSession
    
    MINIO_IP_ADDRESS = socket.gethostbyname("minio")
    
    spark = SparkSession \
            .builder \
            .appName("test reading s3") \
            .config("spark.pyspark.python", "python") \
            .config("fs.s3a.access.key", ACCESS_KEY) \
            .config("fs.s3a.secret.key", SECRET_KEY) \
            .config("fs.s3a.endpoint", f"http://{MINIO_IP_ADDRESS}:9000").getOrCreate()
    
    spark.sparkContext.setLogLevel("INFO")
    spark.read.json("s3a://google-trends-raw-files/43f6c3_2023-05-27-2023-06-03.json")
    

    Would be interested to know if anyone has an answer as to why SparkSessions are not aware of DNS mappings.