Search code examples
javaapache-kafkaapache-storm

Where the Field "value" is declared?


I'm new on apache storm and kafka and try to learn these notions via courses provided by OpenClassroom.The principle is simple, messages are sent via a python program to a kafka server, and are retrieved via a kafka spout defined in the main class of a Storm topology. The problem is that I don't understand how the bolt retrieves the messages. From what I understand this is done in the ParsingBolt class with the following line of code: JSONObject obj = (JSONObject)jsonParser.parse(input.getStringByField("value"));. The only thing is that I don't understand how we know that the messages are contained in the value field. Below you can find the main class and the parsing bolt class. (The whole project is available here)

The main class:

package velos;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;

public class App {
    public static void main(String[] args)
            throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        TopologyBuilder builder = new TopologyBuilder();

        KafkaSpoutConfig.Builder<String, String> spoutConfigBuilder = KafkaSpoutConfig.builder("localhost:9092",
                "velib-stations");
        spoutConfigBuilder.setProp(ConsumerConfig.GROUP_ID_CONFIG, "city-stats");
        KafkaSpoutConfig<String, String> spoutConfig = spoutConfigBuilder.build();
        builder.setSpout("stations", new KafkaSpout<String, String>(spoutConfig));

        builder.setBolt("station-parsing", new StationParsingBolt()).shuffleGrouping("stations");

        builder.setBolt("city-stats",
                new CityStatsBolt().withTumblingWindow(BaseWindowedBolt.Duration.of(1000 * 60 * 5)))
                .fieldsGrouping("station-parsing", new Fields("city"));

        builder.setBolt("save-results", new SaveResultsBolt()).fieldsGrouping("city-stats", new Fields("city"));

        StormTopology topology = builder.createTopology();

        Config config = new Config();
        config.setMessageTimeoutSecs(60 * 30);
        String topologyName = "Velos";
        if (args.length > 0 && args[0].equals("remote")) {
            StormSubmitter.submitTopology(topologyName, config, topology);
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(topologyName, config, topology);
        }
    }
}

The ParsingBolt:

 package velos;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.shade.org.json.simple.JSONObject;
import org.apache.storm.shade.org.json.simple.parser.JSONParser;
import org.apache.storm.shade.org.json.simple.parser.ParseException;

public class StationParsingBolt extends BaseRichBolt {
    private OutputCollector outputCollector;
    
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        outputCollector = collector;
    }

    @Override
    public void execute(Tuple input) {
        try {
            process(input);
        } catch (ParseException e) {
            e.printStackTrace();
            outputCollector.fail(input);
        }
    }
    
    public void process(Tuple input) throws ParseException {
        JSONParser jsonParser = new JSONParser();
        JSONObject obj = (JSONObject)jsonParser.parse(input.getStringByField("value"));
        String contract = (String)obj.get("contract_name");
        Long availableStands = (Long)obj.get("available_bike_stands");
        Long stationNumber = (Long)obj.get("number");
        
        outputCollector.emit(new Values(contract, stationNumber, availableStands));
        outputCollector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("city", "station_id", "available_stands"));
    }
}

Solution

  • By default the "topic", "partition", "offset", "key", and "value" will be emitted to the "default" stream.

    https://storm.apache.org/releases/2.4.0/storm-kafka-client.html

    Use a RecordTranslator to change this.