Search code examples
javaapacheflumespark-streamingmapr

Apache Spark Streaming and Apache Flume integration


I am trying to integrate Apache Spark Streaming and Apache Flume by following this guide. I am trying to set this up in a MapR Sandbox.

When I submit the example: JavaFlumeEventCount, everything works fine and it counts all the events. I use one terminal to start the Spark job and another terminal to start Flume.

When I try to use the example code in my own project and create a jar it runs fine, but events are not being counted and it generates the following exception in the Flume log:

19 Mar 2015 08:32:46,397 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
    at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:392)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: localhost, port: 8002 }: Failed to send batch
    at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:311)
    at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:376)
    ... 3 more
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: localhost, port: 8002 }: Exception thrown from remote handler
    at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:393)
    at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:370)
    at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:299)
    ... 4 more
Caused by: java.util.concurrent.ExecutionException: org.apache.avro.AvroRuntimeException: org.apache.avro.AvroRuntimeException: Unknown datum type: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to org.apache.flume.source.avro.AvroFlumeEvent
    at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128)
    at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:385)
    ... 6 more
Caused by: org.apache.avro.AvroRuntimeException: org.apache.avro.AvroRuntimeException: Unknown datum type: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to org.apache.flume.source.avro.AvroFlumeEvent
    at org.apache.avro.ipc.specific.SpecificRequestor.readError(SpecificRequestor.java:126)
    at org.apache.avro.ipc.Requestor$Response.getResponse(Requestor.java:554)
    at org.apache.avro.ipc.Requestor$TransceiverCallback.handleResult(Requestor.java:359)
    at org.apache.avro.ipc.Requestor$TransceiverCallback.handleResult(Requestor.java:322)
    at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.messageReceived(NettyTransceiver.java:517)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:499)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:786)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:458)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:439)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:553)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:84)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:102)
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

My own project has the following pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example</groupId>
    <artifactId>SparkStreamingExample</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume_2.10</artifactId>
            <version>1.2.1</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.spark:spark-streaming_2.10</exclude>
                                    <exclude>org.apache.spark:spark-core_2.10</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Why doesn't it work?


Solution

  • I managed to fix this by adding the following dependencies to the pom.xml. My project was using an older version. (1.7.3)

    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-ipc</artifactId>
        <version>1.7.7</version>
    </dependency>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.7.7</version>
    </dependency>