I'm trying to use a custom interceptor following the documentation present here. I just have a simple interceptor showed bellow:
package interceptorserver;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
public class Interceptor implements ServerInterceptor{
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
System.out.println("Hello world");
return next.startCall(call, headers);
}
}
However, when I compile this code and send this to spark connect with the following command:
./start-connect-server.sh \
--packages org.apache.spark:spark-connect_2.12:3.4.1 \
--jars Interceptor.jar \
--conf spark.connect.grpc.interceptor.classes=interceptorserver.Interceptor
I get the following error:
23/07/29 01:17:00 ERROR SparkConnectServer: Error starting Spark Connect server
org.apache.spark.SparkException: [CONNECT.INTERCEPTOR_RUNTIME_ERROR] Generic Spark Connect error. Error instantiating GRPC interceptor: class interceptorserver.Interceptor cannot be cast to class org.sparkproject.connect.grpc.ServerInterceptor (interceptorserver.Interceptor and org.sparkproject.connect.grpc.ServerInterceptor are in unnamed module of loader org.apache.spark.util.MutableURLClassLoader @a5272be)
at org.apache.spark.sql.connect.service.SparkConnectInterceptorRegistry$.createInstance(SparkConnectInterceptorRegistry.scala:99)
at org.apache.spark.sql.connect.service.SparkConnectInterceptorRegistry$.$anonfun$createConfiguredInterceptors$4(SparkConnectInterceptorRegistry.scala:67)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
...
First I thought that org.sparkproject.connect.grpc.ServerInterceptor
is different than io.grpc.ServerInterceptor
but when I've checked the code and I saw that Spark is indeed using io.grpc.ServerInterceptor
, besides that, the documentation itself says to use io.grpc.ServerInterceptor
so my second thought was: "Does my class really implement the io.grpc.ServerInterceptor
interface?", then I did the following dummy test
/*
* This Java source file was generated by the Gradle 'init' task.
*/
package interceptorserver;
import org.junit.Test;
import org.junit.Assert;
public class LibraryTest {
@Test public void someLibraryMethodReturnsTrue() {
Interceptor classUnderTest = new Interceptor();
Assert.assertTrue(classUnderTest instanceof io.grpc.ServerInterceptor);
}
}
And my test passed. So my question is: What I'm doing wrong? Why my class can't be casted to the needed one?
I'm not sure If you made it work.
I have succeeded it, thanks to the comment from hage. Although, due to my own lack of knowledge, it took me a bit to understand it. I will share it in case it helps someone.
As hage mentioned, Spark shades the GRPC classes.
I copied the configuration of the maven-shade-plugin
from the spark-connect-server
project to my own pom.xml.
https://github.com/apache/spark/blob/master/connector/connect/server/pom.xml#L284
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>interceptor</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.shade.packageName>org.sparkproject</spark.shade.packageName>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.5.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-connect-common_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
<include>org.example:interceptor</include>
<include>com.google.guava:*</include>
<include>io.grpc:*:</include>
<include>com.google.protobuf:*</include>
<!--
The dependencies below are not added in SBT because SBT add them all
as assembly build.
-->
<include>com.google.android:annotations</include>
<include>com.google.api.grpc:proto-google-common-protos</include>
<include>io.perfmark:perfmark-api</include>
<include>org.codehaus.mojo:animal-sniffer-annotations</include>
<include>com.google.errorprone:error_prone_annotations</include>
<include>com.google.j2objc:j2objc-annotations</include>
<include>org.checkerframework:checker-qual</include>
<include>com.google.code.gson:gson</include>
<include>org.apache.spark:spark-connect-common_${scala.binary.version}</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>${spark.shade.packageName}.connect.guava</shadedPattern>
<includes>
<include>com.google.common.**</include>
</includes>
</relocation>
<relocation>
<pattern>com.google.thirdparty</pattern>
<shadedPattern>${spark.shade.packageName}.connect.guava</shadedPattern>
<includes>
<include>com.google.thirdparty.**</include>
</includes>
</relocation>
<relocation>
<pattern>com.google.protobuf</pattern>
<shadedPattern>${spark.shade.packageName}.connect.protobuf</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
</includes>
</relocation>
<relocation>
<pattern>io.grpc</pattern>
<shadedPattern>${spark.shade.packageName}.connect.grpc</shadedPattern>
</relocation>
<relocation>
<pattern>android.annotation</pattern>
<shadedPattern>${spark.shade.packageName}.connect.android_annotation</shadedPattern>
</relocation>
<relocation>
<pattern>io.perfmark</pattern>
<shadedPattern>${spark.shade.packageName}.connect.io_perfmark</shadedPattern>
</relocation>
<relocation>
<pattern>org.codehaus.mojo.animal_sniffer</pattern>
<shadedPattern>${spark.shade.packageName}.connect.animal_sniffer</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.j2objc.annotations</pattern>
<shadedPattern>${spark.shade.packageName}.connect.j2objc_annotations</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.errorprone.annotations</pattern>
<shadedPattern>${spark.shade.packageName}.connect.errorprone_annotations</shadedPattern>
</relocation>
<relocation>
<pattern>org.checkerframework</pattern>
<shadedPattern>${spark.shade.packageName}.connect.checkerframework</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.gson</pattern>
<shadedPattern>${spark.shade.packageName}.connect.gson</shadedPattern>
</relocation>
<!--
For `com.google.api.grpc:proto-google-common-protos`, do not directly define pattern
as `common.google`, otherwise, otherwise, the relocation result may be uncertain due
to the change of rule order.
-->
<relocation>
<pattern>com.google.api</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.api</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.cloud</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.cloud</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.geo</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.geo</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.logging</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.logging</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.longrunning</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.longrunning</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.rpc</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.rpc</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.type</pattern>
<shadedPattern>${spark.shade.packageName}.connect.google_protos.type</shadedPattern>
</relocation>
</relocations>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
My interceptor is like Matheus's.
package org.example;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
public class Interceptor implements ServerInterceptor {
public Interceptor() {
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
System.out.println("HELLO WORLD");
System.out.println("-------------------------------------------------------------------------------------------");
return serverCallHandler.startCall(serverCall, metadata);
}
}
This way it has worked for me without any issues.