Search code examples
javaredisapache-stormjava-iobigdata

Storm java.io.NotSerializableException: when running topology


I finally think that I have a toopology that writes on a redis database. I have a bolt to print, and a bolt to insert to redis. But when I try to launch the topology it comes with this error:

...5333 [main-EventThread] INFO  o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED
5376 [main] INFO  b.s.d.supervisor - Starting supervisor with id 1917ef54-0f16-47b8-86ea-b6722aa33c68 at host amnor-A88XPLUS
5405 [main] ERROR o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Thread Thread[main,5,main] died
java.lang.RuntimeException: java.io.NotSerializableException: Storm.practice.Storm.Prova.ProvaTopology
    at backtype.storm.utils.Utils.javaSerialize(Utils.java:91) ~[storm-core-0.10.0.jar:0.10.0]
    at backtype.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:107) ~[storm-core-0.10.0.jar:0.10.0]
    at Storm.practice.Storm.Prova.ProvaTopology.main(ProvaTopology.java:383) ~[classes/:?]
Caused by: java.io.NotSerializableException: Storm.practice.Storm.Prova.ProvaTopology
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ~[?:1.7.0_91]
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[?:1.7.0_91]
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[?:1.7.0_91]
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[?:1.7.0_91]
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[?:1.7.0_91]
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[?:1.7.0_91]
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[?:1.7.0_91]
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[?:1.7.0_91]
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[?:1.7.0_91]
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) ~[?:1.7.0_91]
    at backtype.storm.utils.Utils.javaSerialize(Utils.java:87) ~[storm-core-0.10.0.jar:0.10.0]
    ... 2 more

I thaught it maybe was the Spout, but I have tried with an example Spout that is available on Storm Examples and happens the same. My code just adds smiling faces to the readed names, for example (John :) :) ), I am just tryng to really store streams to a redis database, it is just a little test topology that reads names from a file. Afterwards, I am doing a serious topology for a big-data project at my university. Here is my code(there are many unused imports, but that is beacuase I tried different ways to write to a databse):

package Storm.practice.Storm.Prova;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.base.BaseRichSpout;

import java.util.List;
import java.util.Map;
import java.util.Random;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;

import org.apache.storm.redis.bolt.AbstractRedisBolt;
import org.apache.storm.redis.bolt.RedisStoreBolt;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.redis.trident.state.RedisState;
import org.apache.storm.redis.trident.state.RedisStateQuerier;
import org.apache.storm.redis.trident.state.RedisStateUpdater;
import org.apache.storm.shade.com.google.common.collect.Lists;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCommands;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
//import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.TridentTopology;

/**
 * This is a basic example of a Storm topology.
 */
public class ProvaTopology {

  public static class ProvaBolt extends BaseRichBolt {
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      _collector = collector;
    }

    public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "  :-)"));
      _collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("Morts"));
    }


  }
  public class ProvaSpout extends BaseRichSpout {
      SpoutOutputCollector _collector;
      //Random _rand;
      private String fileName;
      //private SpoutOutputCollector _collector;
      private BufferedReader reader;
      private AtomicLong linesRead;

      public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        try {
            fileName= (String)"/home/prova.txt";
            reader = new BufferedReader(new FileReader(fileName));
            // read and ignore the header if one exists
          } catch (Exception e) {
            throw new RuntimeException(e);
          }
       // _rand = new Random();
      }

      public void nextTuple() {
        Utils.sleep(100);


      try {
            String line = reader.readLine();
            if (line != null) {
              long id = linesRead.incrementAndGet();
              System.out.println("Finished reading line, " + line);
              _collector.emit(new Values((String)line));
            } else {
              System.out.println("Finished reading file, " + linesRead.get() + " lines read");
              Thread.sleep(10000);
            }
          } catch (Exception e) {
            e.printStackTrace();
          }
      }

      public void ack(Object id) {
      }

      public void fail(Object id) {
      }

      public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("Morts"));
      }

    }

  public class RedisBolt implements IRichBolt {

        protected String channel = "Somriures";
        //    protected String configChannel;
        protected OutputCollector collector;
        //    protected Tuple currentTuple;
        //    protected Logger log;
        protected JedisPool pool;
        //    protected ConfigListenerThread configListenerThread;

        public RedisBolt(){}
        public RedisBolt(String channel) {

        //  log = Logger.getLogger(getClass().getName());
        //  setupNonSerializableAttributes();
        }

        public void prepare(Map stormConf, TopologyContext context,
                OutputCollector collector) {
        this.collector = collector;
        pool = new JedisPool("localhost");
        }



        public void execute(Tuple tuple) {
        String current = tuple.getString(0);
        if(current != null) {
            //      for(Object obj: result) {
            publish(current);
            collector.emit(tuple, new Values(current));
            //      }
            collector.ack(tuple);
        }
        }

        public void cleanup() {
        if(pool != null) {
            pool.destroy();
        }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(channel));
        }

        public void publish(String msg) {
        Jedis jedis = pool.getResource();
        jedis.publish(channel, msg);
        pool.returnResource(jedis);
        }

        protected void setupNonSerializableAttributes() {

        }

        public Map getComponentConfiguration() {
        return null;
        }
    }



  public class PrinterBolt extends BaseBasicBolt {

      public void execute(Tuple tuple, BasicOutputCollector collector) {
          System.out.println(tuple);
      }

      public void declareOutputFields(OutputFieldsDeclarer ofd) {
      }

  }


    public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
   ProvaTopology Pt = new ProvaTopology();
   JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
            .setHost("127.0.0.1").setPort(666).build();



    builder.setSpout("Morts", Pt.new ProvaSpout(), 10);//emisorTestWordSpout
    builder.setBolt("happy", new ProvaBolt(), 3).shuffleGrouping("Morts");// de on llig?
    builder.setBolt("meal", new ProvaBolt(), 2).shuffleGrouping("happy");// de on llig?
    builder.setBolt("bd", Pt.new RedisBolt(), 2).shuffleGrouping("meal");// de on llig?
    builder.setBolt("print", Pt.new PrinterBolt(), 2).shuffleGrouping("meal");
   // builder.setBolt("StoreM", (storeMapperS));
    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(5);

      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
                                   //WithProgressBar
    }
    else {

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("test", conf, builder.createTopology());
      Utils.sleep(10000);
      cluster.killTopology("test");
      cluster.shutdown();
    }
  }
}

Thanks in advance


Solution

  • The exception here is pretty clear. If you just looked at the docs for java.io.NotSerializableException, you would see that the message being printed is the class that is not serializable. To fix, simply have your Topology class implement Serializable:

    public class ProvaTopology implements Serializable {
        ...
    }
    

    This is needed so that Storm can serialize your Topology and send it to the Nimbus for execution. Since your Bolts and Spout extend or implement Storm provided classes or interfaces, you will not have to worry about marking them as serializable, as these parent classes and interfaces already do so.