I am using the Stormcrawler with the Elasticsearch example and no pages are shown with the FETCHED
status in Kibana while crawling the webpage http://books.toscrape.com/
Still on the console the webpages appear to be fetched and parsed
48239 [Thread-26-fetcher-executor[3 3]] INFO c.d.s.b.FetcherBolt - [Fetcher #3] Threads : 0 queues : 1 in_queues : 1
48341 [FetcherThread #7] INFO c.d.s.b.FetcherBolt - [Fetcher #3] Fetched http://books.toscrape.com/catalogue/category/books_1/index.html with status 200 in msec 86
48346 [Thread-46-parse-executor[5 5]] INFO c.d.s.b.JSoupParserBolt - Parsing : starting http://books.toscrape.com/catalogue/category/books_1/index.html
48362 [Thread-46-parse-executor[5 5]] INFO c.d.s.b.JSoupParserBolt - Parsed http://books.toscrape.com/catalogue/category/books_1/index.html in 13 msec
Also it seems that the index of Elasticsearch gets some items even though these have no title
I expanded the com.digitalpebble.stormcrawler.elasticsearch.bolt.IndexerBolt
to also store the metadata of a web page in a local file and it seems like it does not get any tuples at all. Since the IndexerBolt also marks the status of a url as FETCHED
that would explain the mentioned observation in Kibana.
Is there any explanation for this behaviour? I already reverted the crawler configuration to the standard, except the index bolt in crawler.flux to run my class.
The Topology Configuration:
name: "crawler"
includes:
- resource: true
file: "/crawler-default.yaml"
override: false
- resource: false
file: "es-conf.yaml"
override: true
spouts:
- id: "spout"
className: "com.digitalpebble.stormcrawler.elasticsearch.persistence.AggregationSpout"
parallelism: 10
bolts:
- id: "partitioner"
className: "com.digitalpebble.stormcrawler.bolt.URLPartitionerBolt"
parallelism: 1
- id: "fetcher"
className: "com.digitalpebble.stormcrawler.bolt.FetcherBolt"
parallelism: 1
- id: "sitemap"
className: "com.digitalpebble.stormcrawler.bolt.SiteMapParserBolt"
parallelism: 1
- id: "parse"
className: "com.digitalpebble.stormcrawler.bolt.JSoupParserBolt"
parallelism: 1
- id: "index"
className: "de.hpi.bpStormcrawler.IndexerBolt"
parallelism: 1
- id: "status"
className: "com.digitalpebble.stormcrawler.elasticsearch.persistence.StatusUpdaterBolt"
parallelism: 1
- id: "status_metrics"
className: "com.digitalpebble.stormcrawler.elasticsearch.metrics.StatusMetricsBolt"
parallelism: 1
streams:
- from: "spout"
to: "partitioner"
grouping:
type: SHUFFLE
- from: "spout"
to: "status_metrics"
grouping:
type: SHUFFLE
- from: "partitioner"
to: "fetcher"
grouping:
type: FIELDS
args: ["url"]
- from: "fetcher"
to: "sitemap"
grouping:
type: LOCAL_OR_SHUFFLE
- from: "sitemap"
to: "parse"
grouping:
type: LOCAL_OR_SHUFFLE
- from: "parse"
to: "index"
grouping:
type: LOCAL_OR_SHUFFLE
- from: "fetcher"
to: "status"
grouping:
type: FIELDS
args: ["url"]
streamId: "status"
- from: "sitemap"
to: "status"
grouping:
type: FIELDS
args: ["url"]
streamId: "status"
- from: "parse"
to: "status"
grouping:
type: FIELDS
args: ["url"]
streamId: "status"
- from: "index"
to: "status"
grouping:
type: FIELDS
args: ["url"]
streamId: "status"
The reconfigured IndexerBolt
package de.hpi.bpStormcrawler;
/**
* Licensed to DigitalPebble Ltd under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* DigitalPebble licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import static com.digitalpebble.stormcrawler.Constants.StatusStreamName;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.io.*;
import java.util.Iterator;
import java.util.Map;
import org.apache.storm.metric.api.MultiCountMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.digitalpebble.stormcrawler.Metadata;
import com.digitalpebble.stormcrawler.elasticsearch.ElasticSearchConnection;
import com.digitalpebble.stormcrawler.indexing.AbstractIndexerBolt;
import com.digitalpebble.stormcrawler.persistence.Status;
import com.digitalpebble.stormcrawler.util.ConfUtils;
/**
* Sends documents to ElasticSearch. Indexes all the fields from the tuples or a
* Map <String,Object> from a named field.
*/
@SuppressWarnings("serial")
public class IndexerBolt extends AbstractIndexerBolt {
private static final Logger LOG = LoggerFactory
.getLogger(IndexerBolt.class);
private static final String ESBoltType = "indexer";
static final String ESIndexNameParamName = "es.indexer.index.name";
static final String ESDocTypeParamName = "es.indexer.doc.type";
private static final String ESCreateParamName = "es.indexer.create";
private OutputCollector _collector;
private String indexName;
private String docType;
// whether the document will be created only if it does not exist or
// overwritten
private boolean create = false;
File indexFile;
private MultiCountMetric eventCounter;
private ElasticSearchConnection connection;
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void prepare(Map conf, TopologyContext context,
OutputCollector collector) {
super.prepare(conf, context, collector);
_collector = collector;
indexName = ConfUtils.getString(conf, IndexerBolt.ESIndexNameParamName,
"fetcher");
docType = ConfUtils.getString(conf, IndexerBolt.ESDocTypeParamName,
"doc");
create = ConfUtils.getBoolean(conf, IndexerBolt.ESCreateParamName,
false);
try {
connection = ElasticSearchConnection
.getConnection(conf, ESBoltType);
} catch (Exception e1) {
LOG.error("Can't connect to ElasticSearch", e1);
throw new RuntimeException(e1);
}
this.eventCounter = context.registerMetric("ElasticSearchIndexer",
new MultiCountMetric(), 10);
indexFile = new File("/Users/jonaspohlmann/code/HPI/BP/stormCrawlerSpike/spikeStormCrawler2/index.log");
}
@Override
public void cleanup() {
if (connection != null)
connection.close();
}
@Override
public void execute(Tuple tuple) {
String url = tuple.getStringByField("url");
// Distinguish the value used for indexing
// from the one used for the status
String normalisedurl = valueForURL(tuple);
Metadata metadata = (Metadata) tuple.getValueByField("metadata");
String text = tuple.getStringByField("text");
//BP: added Content Field
String content = new String(tuple.getBinaryByField("content"));
boolean keep = filterDocument(metadata);
if (!keep) {
eventCounter.scope("Filtered").incrBy(1);
// treat it as successfully processed even if
// we do not index it
_collector.emit(StatusStreamName, tuple, new Values(url, metadata,
Status.FETCHED));
_collector.ack(tuple);
return;
}
try {
XContentBuilder builder = jsonBuilder().startObject();
// display text of the document?
if (fieldNameForText() != null) {
builder.field(fieldNameForText(), trimText(text));
}
// send URL as field?
if (fieldNameForURL() != null) {
builder.field(fieldNameForURL(), normalisedurl);
}
// which metadata to display?
Map<String, String[]> keyVals = filterMetadata(metadata);
Iterator<String> iterator = keyVals.keySet().iterator();
while (iterator.hasNext()) {
String fieldName = iterator.next();
String[] values = keyVals.get(fieldName);
if (values.length == 1) {
builder.field(fieldName, values[0]);
try {
saveStringToFile(indexFile, fieldName + "\t" + values[0]);
} catch (IOException e) {
e.printStackTrace();
}
} else if (values.length > 1) {
builder.array(fieldName, values);
}
}
builder.endObject();
String sha256hex = org.apache.commons.codec.digest.DigestUtils
.sha256Hex(normalisedurl);
IndexRequest indexRequest = new IndexRequest(indexName, docType,
sha256hex).source(builder);
DocWriteRequest.OpType optype = DocWriteRequest.OpType.INDEX;
if (create) {
optype = DocWriteRequest.OpType.CREATE;
}
indexRequest.opType(optype);
connection.getProcessor().add(indexRequest);
eventCounter.scope("Indexed").incrBy(1);
_collector.emit(StatusStreamName, tuple, new Values(url, metadata,
Status.FETCHED));
_collector.ack(tuple);
} catch (IOException e) {
LOG.error("Error sending log tuple to ES", e);
// do not send to status stream so that it gets replayed
_collector.fail(tuple);
}
}
private void saveStringToFile(File file, String stringToWrite) throws IOException {
String pathName = file.getPath();
File folder = file.getParentFile();
if (!folder.exists() && !folder.mkdirs()) {
throw new IOException("Couldn't create the storage folder: " + folder.getAbsolutePath() + " does it already exist ?");
}
try (PrintWriter out = new PrintWriter(new FileOutputStream(file, true))) {
out.append(stringToWrite + '\n');
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
}
Have you merged all your configs i.e generic SC + specific ES one into a single es-conf.yaml? If not then your Flux file is probably missing
- resource: false
file: "crawler-conf.yaml"
override: true
where the indexer config typically looks like:
indexer.url.fieldname: "url"
indexer.text.fieldname: "content"
indexer.canonical.name: "canonical"
indexer.md.mapping:
- parse.title=title
- parse.keywords=keywords
- parse.description=description
- domain=domain
Not having any md mappings defined would explain why your modified indexer does not write to the files and why the index contains urls but no additional fields.
Please note that the 'index' index (excuse the terminology) does not contain the status of the URL. See https://stackoverflow.com/a/49316316/432844 for an explanation of status vs index.