Search code examples
elasticsearchweb-crawlerapache-stormstormcrawler

Stormcrawler not fetching/indexing pages for elasticsearch


I am using the Stormcrawler with the Elasticsearch example and no pages are shown with the FETCHED status in Kibana while crawling the webpage http://books.toscrape.com/

Still on the console the webpages appear to be fetched and parsed

48239 [Thread-26-fetcher-executor[3 3]] INFO  c.d.s.b.FetcherBolt - [Fetcher #3] Threads : 0    queues : 1      in_queues : 1
48341 [FetcherThread #7] INFO  c.d.s.b.FetcherBolt - [Fetcher #3] Fetched http://books.toscrape.com/catalogue/category/books_1/index.html with status 200 in msec 86
48346 [Thread-46-parse-executor[5 5]] INFO  c.d.s.b.JSoupParserBolt - Parsing : starting http://books.toscrape.com/catalogue/category/books_1/index.html
48362 [Thread-46-parse-executor[5 5]] INFO  c.d.s.b.JSoupParserBolt - Parsed http://books.toscrape.com/catalogue/category/books_1/index.html in 13 msec

Also it seems that the index of Elasticsearch gets some items even though these have no title

Screenshot of Kibana

I expanded the com.digitalpebble.stormcrawler.elasticsearch.bolt.IndexerBolt to also store the metadata of a web page in a local file and it seems like it does not get any tuples at all. Since the IndexerBolt also marks the status of a url as FETCHED that would explain the mentioned observation in Kibana.

Is there any explanation for this behaviour? I already reverted the crawler configuration to the standard, except the index bolt in crawler.flux to run my class.

The Topology Configuration:

name: "crawler"

includes:
    - resource: true
      file: "/crawler-default.yaml"
      override: false



    - resource: false
      file: "es-conf.yaml"
      override: true

spouts:
  - id: "spout"
    className: "com.digitalpebble.stormcrawler.elasticsearch.persistence.AggregationSpout"
    parallelism: 10

bolts:
  - id: "partitioner"
    className: "com.digitalpebble.stormcrawler.bolt.URLPartitionerBolt"
    parallelism: 1
  - id: "fetcher"
    className: "com.digitalpebble.stormcrawler.bolt.FetcherBolt"
    parallelism: 1
  - id: "sitemap"
    className: "com.digitalpebble.stormcrawler.bolt.SiteMapParserBolt"
    parallelism: 1
  - id: "parse"
    className: "com.digitalpebble.stormcrawler.bolt.JSoupParserBolt"
    parallelism: 1
  - id: "index"
    className: "de.hpi.bpStormcrawler.IndexerBolt"
    parallelism: 1
  - id: "status"
    className: "com.digitalpebble.stormcrawler.elasticsearch.persistence.StatusUpdaterBolt"
    parallelism: 1
  - id: "status_metrics"
    className: "com.digitalpebble.stormcrawler.elasticsearch.metrics.StatusMetricsBolt"
    parallelism: 1

streams:
  - from: "spout"
    to: "partitioner"
    grouping:
      type: SHUFFLE

  - from: "spout"
    to: "status_metrics"
    grouping:
      type: SHUFFLE

  - from: "partitioner"
    to: "fetcher"
    grouping:
      type: FIELDS
      args: ["url"]

  - from: "fetcher"
    to: "sitemap"
    grouping:
      type: LOCAL_OR_SHUFFLE

  - from: "sitemap"
    to: "parse"
    grouping:
      type: LOCAL_OR_SHUFFLE

  - from: "parse"
    to: "index"
    grouping:
      type: LOCAL_OR_SHUFFLE

  - from: "fetcher"
    to: "status"
    grouping:
      type: FIELDS
      args: ["url"]
      streamId: "status"

  - from: "sitemap"
    to: "status"
    grouping:
      type: FIELDS
      args: ["url"]
      streamId: "status"

  - from: "parse"
    to: "status"
    grouping:
      type: FIELDS
      args: ["url"]
      streamId: "status"

  - from: "index"
    to: "status"
    grouping:
      type: FIELDS
      args: ["url"]
      streamId: "status"

The reconfigured IndexerBolt

package de.hpi.bpStormcrawler;

/**
 * Licensed to DigitalPebble Ltd under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * DigitalPebble licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


import static com.digitalpebble.stormcrawler.Constants.StatusStreamName;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

import java.io.*;
import java.util.Iterator;
import java.util.Map;

import org.apache.storm.metric.api.MultiCountMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.digitalpebble.stormcrawler.Metadata;
import com.digitalpebble.stormcrawler.elasticsearch.ElasticSearchConnection;
import com.digitalpebble.stormcrawler.indexing.AbstractIndexerBolt;
import com.digitalpebble.stormcrawler.persistence.Status;
import com.digitalpebble.stormcrawler.util.ConfUtils;

/**
 * Sends documents to ElasticSearch. Indexes all the fields from the tuples or a
 * Map <String,Object> from a named field.
 */
@SuppressWarnings("serial")
public class IndexerBolt extends AbstractIndexerBolt {

    private static final Logger LOG = LoggerFactory
            .getLogger(IndexerBolt.class);

    private static final String ESBoltType = "indexer";

    static final String ESIndexNameParamName = "es.indexer.index.name";
    static final String ESDocTypeParamName = "es.indexer.doc.type";
    private static final String ESCreateParamName = "es.indexer.create";

    private OutputCollector _collector;

    private String indexName;
    private String docType;

    // whether the document will be created only if it does not exist or
    // overwritten
    private boolean create = false;
    File indexFile;

    private MultiCountMetric eventCounter;

    private ElasticSearchConnection connection;

    @SuppressWarnings({ "unchecked", "rawtypes" })
    @Override
    public void prepare(Map conf, TopologyContext context,
                        OutputCollector collector) {
        super.prepare(conf, context, collector);
        _collector = collector;

        indexName = ConfUtils.getString(conf, IndexerBolt.ESIndexNameParamName,
                "fetcher");
        docType = ConfUtils.getString(conf, IndexerBolt.ESDocTypeParamName,
                "doc");
        create = ConfUtils.getBoolean(conf, IndexerBolt.ESCreateParamName,
                false);

        try {
            connection = ElasticSearchConnection
                    .getConnection(conf, ESBoltType);
        } catch (Exception e1) {
            LOG.error("Can't connect to ElasticSearch", e1);
            throw new RuntimeException(e1);
        }

        this.eventCounter = context.registerMetric("ElasticSearchIndexer",
                new MultiCountMetric(), 10);

        indexFile = new File("/Users/jonaspohlmann/code/HPI/BP/stormCrawlerSpike/spikeStormCrawler2/index.log");
    }

    @Override
    public void cleanup() {
        if (connection != null)
            connection.close();
    }

    @Override
    public void execute(Tuple tuple) {

        String url = tuple.getStringByField("url");

        // Distinguish the value used for indexing
        // from the one used for the status
        String normalisedurl = valueForURL(tuple);

        Metadata metadata = (Metadata) tuple.getValueByField("metadata");
        String text = tuple.getStringByField("text");


        //BP: added Content Field
        String content = new String(tuple.getBinaryByField("content"));

        boolean keep = filterDocument(metadata);
        if (!keep) {
            eventCounter.scope("Filtered").incrBy(1);
            // treat it as successfully processed even if
            // we do not index it
            _collector.emit(StatusStreamName, tuple, new Values(url, metadata,
                    Status.FETCHED));
            _collector.ack(tuple);
            return;
        }

        try {
            XContentBuilder builder = jsonBuilder().startObject();

            // display text of the document?
            if (fieldNameForText() != null) {
                builder.field(fieldNameForText(), trimText(text));
            }

            // send URL as field?
            if (fieldNameForURL() != null) {
                builder.field(fieldNameForURL(), normalisedurl);
            }


            // which metadata to display?
            Map<String, String[]> keyVals = filterMetadata(metadata);

            Iterator<String> iterator = keyVals.keySet().iterator();
            while (iterator.hasNext()) {
                String fieldName = iterator.next();
                String[] values = keyVals.get(fieldName);
                if (values.length == 1) {
                    builder.field(fieldName, values[0]);
                    try {
                        saveStringToFile(indexFile, fieldName + "\t" + values[0]);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                } else if (values.length > 1) {
                    builder.array(fieldName, values);
                }
            }

            builder.endObject();

            String sha256hex = org.apache.commons.codec.digest.DigestUtils
                    .sha256Hex(normalisedurl);

            IndexRequest indexRequest = new IndexRequest(indexName, docType,
                    sha256hex).source(builder);

            DocWriteRequest.OpType optype = DocWriteRequest.OpType.INDEX;

            if (create) {
                optype = DocWriteRequest.OpType.CREATE;
            }

            indexRequest.opType(optype);

            connection.getProcessor().add(indexRequest);

            eventCounter.scope("Indexed").incrBy(1);

            _collector.emit(StatusStreamName, tuple, new Values(url, metadata,
                    Status.FETCHED));
            _collector.ack(tuple);

        } catch (IOException e) {
            LOG.error("Error sending log tuple to ES", e);
            // do not send to status stream so that it gets replayed
            _collector.fail(tuple);
        }
    }
    private void saveStringToFile(File file, String stringToWrite) throws IOException {
        String pathName = file.getPath();
        File folder = file.getParentFile();

        if (!folder.exists() && !folder.mkdirs()) {
            throw new IOException("Couldn't create the storage folder: " + folder.getAbsolutePath() + " does it already exist ?");
        }

        try (PrintWriter out = new PrintWriter(new FileOutputStream(file, true))) {
            out.append(stringToWrite + '\n');

        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
    }

}

Solution

  • Have you merged all your configs i.e generic SC + specific ES one into a single es-conf.yaml? If not then your Flux file is probably missing

       - resource: false
         file: "crawler-conf.yaml"
         override: true
    

    where the indexer config typically looks like:

      indexer.url.fieldname: "url"
      indexer.text.fieldname: "content"
      indexer.canonical.name: "canonical"
      indexer.md.mapping:
      - parse.title=title
      - parse.keywords=keywords
      - parse.description=description
      - domain=domain
    

    Not having any md mappings defined would explain why your modified indexer does not write to the files and why the index contains urls but no additional fields.

    Please note that the 'index' index (excuse the terminology) does not contain the status of the URL. See https://stackoverflow.com/a/49316316/432844 for an explanation of status vs index.