Search code examples
javascalaapache-spark

How to use spark connect interceptors?


I'm trying to use a custom interceptor following the documentation present here. I just have a simple interceptor showed bellow:

package interceptorserver;

import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;

public class Interceptor implements ServerInterceptor{

    @Override
    public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
        System.out.println("Hello world");
        return next.startCall(call, headers);
    }
}

However, when I compile this code and send this to spark connect with the following command:

./start-connect-server.sh \
    --packages org.apache.spark:spark-connect_2.12:3.4.1 \
    --jars Interceptor.jar \
    --conf spark.connect.grpc.interceptor.classes=interceptorserver.Interceptor

I get the following error:

23/07/29 01:17:00 ERROR SparkConnectServer: Error starting Spark Connect server
org.apache.spark.SparkException: [CONNECT.INTERCEPTOR_RUNTIME_ERROR] Generic Spark Connect error. Error instantiating GRPC interceptor: class interceptorserver.Interceptor cannot be cast to class org.sparkproject.connect.grpc.ServerInterceptor (interceptorserver.Interceptor and org.sparkproject.connect.grpc.ServerInterceptor are in unnamed module of loader org.apache.spark.util.MutableURLClassLoader @a5272be)
    at org.apache.spark.sql.connect.service.SparkConnectInterceptorRegistry$.createInstance(SparkConnectInterceptorRegistry.scala:99)
    at org.apache.spark.sql.connect.service.SparkConnectInterceptorRegistry$.$anonfun$createConfiguredInterceptors$4(SparkConnectInterceptorRegistry.scala:67)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
...

First I thought that org.sparkproject.connect.grpc.ServerInterceptor is different than io.grpc.ServerInterceptor but when I've checked the code and I saw that Spark is indeed using io.grpc.ServerInterceptor, besides that, the documentation itself says to use io.grpc.ServerInterceptor so my second thought was: "Does my class really implement the io.grpc.ServerInterceptor interface?", then I did the following dummy test

/*
 * This Java source file was generated by the Gradle 'init' task.
 */
package interceptorserver;

import org.junit.Test;

import org.junit.Assert;

public class LibraryTest {
    @Test public void someLibraryMethodReturnsTrue() {
        Interceptor classUnderTest = new Interceptor();
        Assert.assertTrue(classUnderTest instanceof io.grpc.ServerInterceptor);
    }
}

And my test passed. So my question is: What I'm doing wrong? Why my class can't be casted to the needed one?


Solution

  • I'm not sure If you made it work.

    I have succeeded it, thanks to the comment from hage. Although, due to my own lack of knowledge, it took me a bit to understand it. I will share it in case it helps someone.

    As hage mentioned, Spark shades the GRPC classes.

    I copied the configuration of the maven-shade-plugin from the spark-connect-server project to my own pom.xml. https://github.com/apache/spark/blob/master/connector/connect/server/pom.xml#L284

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.example</groupId>
        <artifactId>interceptor</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>11</maven.compiler.source>
            <maven.compiler.target>11</maven.compiler.target>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <spark.shade.packageName>org.sparkproject</spark.shade.packageName>
            <scala.binary.version>2.12</scala.binary.version>
            <spark.version>3.5.0</spark.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-connect-common_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.5.1</version>
                    <configuration>
                        <shadedArtifactAttached>false</shadedArtifactAttached>
                        <artifactSet>
                            <includes>
                                <include>org.example:interceptor</include>
                                <include>com.google.guava:*</include>
                                <include>io.grpc:*:</include>
                                <include>com.google.protobuf:*</include>
    
                                <!--
                                  The dependencies below are not added in SBT because SBT add them all
                                  as assembly build.
                                -->
                                <include>com.google.android:annotations</include>
                                <include>com.google.api.grpc:proto-google-common-protos</include>
                                <include>io.perfmark:perfmark-api</include>
                                <include>org.codehaus.mojo:animal-sniffer-annotations</include>
                                <include>com.google.errorprone:error_prone_annotations</include>
                                <include>com.google.j2objc:j2objc-annotations</include>
                                <include>org.checkerframework:checker-qual</include>
                                <include>com.google.code.gson:gson</include>
                                <include>org.apache.spark:spark-connect-common_${scala.binary.version}</include>
                            </includes>
                        </artifactSet>
                        <relocations>
                            <relocation>
                                <pattern>com.google.common</pattern>
                                <shadedPattern>${spark.shade.packageName}.connect.guava</shadedPattern>
                                <includes>
                                    <include>com.google.common.**</include>
                                </includes>
                            </relocation>
                            <relocation>
                                <pattern>com.google.thirdparty</pattern>
                                <shadedPattern>${spark.shade.packageName}.connect.guava</shadedPattern>
                                <includes>
                                    <include>com.google.thirdparty.**</include>
                                </includes>
                            </relocation>
                            <relocation>
                                <pattern>com.google.protobuf</pattern>
                                <shadedPattern>${spark.shade.packageName}.connect.protobuf</shadedPattern>
                                <includes>
                                    <include>com.google.protobuf.**</include>
                                </includes>
                            </relocation>
                            <relocation>
                                <pattern>io.grpc</pattern>
                                <shadedPattern>${spark.shade.packageName}.connect.grpc</shadedPattern>
                            </relocation>
    
                            <relocation>
                                <pattern>android.annotation</pattern>
                                <shadedPattern>${spark.shade.packageName}.connect.android_annotation</shadedPattern>
                            </relocation>
                            <relocation>
                                <pattern>io.perfmark</pattern>
                                <shadedPattern>${spark.shade.packageName}.connect.io_perfmark</shadedPattern>
                            </relocation>
                            <relocation>
                                <pattern>org.codehaus.mojo.animal_sniffer</pattern>
                                <shadedPattern>${spark.shade.packageName}.connect.animal_sniffer</shadedPattern>
                            </relocation>
                            <relocation>
                                <pattern>com.google.j2objc.annotations</pattern>
                                <shadedPattern>${spark.shade.packageName}.connect.j2objc_annotations</shadedPattern>
                            </relocation>
                            <relocation>
                                <pattern>com.google.errorprone.annotations</pattern>
                                <shadedPattern>${spark.shade.packageName}.connect.errorprone_annotations</shadedPattern>
                            </relocation>
                            <relocation>
                                <pattern>org.checkerframework</pattern>
                                <shadedPattern>${spark.shade.packageName}.connect.checkerframework</shadedPattern>
                            </relocation>
                            <relocation>
                                <pattern>com.google.gson</pattern>
                                <shadedPattern>${spark.shade.packageName}.connect.gson</shadedPattern>
                            </relocation>
    
                            <!--
                              For `com.google.api.grpc:proto-google-common-protos`, do not directly define pattern
                              as `common.google`, otherwise, otherwise, the relocation result may be uncertain due
                              to the change of rule order.
                            -->
                            <relocation>
                                <pattern>com.google.api</pattern>
                                <shadedPattern>${spark.shade.packageName}.connect.google_protos.api</shadedPattern>
                            </relocation>
                            <relocation>
                                <pattern>com.google.cloud</pattern>
                                <shadedPattern>${spark.shade.packageName}.connect.google_protos.cloud</shadedPattern>
                            </relocation>
                            <relocation>
                                <pattern>com.google.geo</pattern>
                                <shadedPattern>${spark.shade.packageName}.connect.google_protos.geo</shadedPattern>
                            </relocation>
                            <relocation>
                                <pattern>com.google.logging</pattern>
                                <shadedPattern>${spark.shade.packageName}.connect.google_protos.logging</shadedPattern>
                            </relocation>
                            <relocation>
                                <pattern>com.google.longrunning</pattern>
                                <shadedPattern>${spark.shade.packageName}.connect.google_protos.longrunning</shadedPattern>
                            </relocation>
                            <relocation>
                                <pattern>com.google.rpc</pattern>
                                <shadedPattern>${spark.shade.packageName}.connect.google_protos.rpc</shadedPattern>
                            </relocation>
                            <relocation>
                                <pattern>com.google.type</pattern>
                                <shadedPattern>${spark.shade.packageName}.connect.google_protos.type</shadedPattern>
                            </relocation>
                        </relocations>
                        <transformers>
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                        </transformers>
                    </configuration>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    My interceptor is like Matheus's.

    package org.example;
    
    import io.grpc.Metadata;
    import io.grpc.ServerCall;
    import io.grpc.ServerCallHandler;
    import io.grpc.ServerInterceptor;
    
    public class Interceptor implements ServerInterceptor {
    
    
        public Interceptor() {
        }
    
        @Override
        public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
            System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
            System.out.println("HELLO WORLD");
            System.out.println("-------------------------------------------------------------------------------------------");
            return serverCallHandler.startCall(serverCall, metadata);
        }
    
    }
    

    This way it has worked for me without any issues.