I'm using stormcrawler for crawling 40k sites, with max_depth=2 and I want to do it as faster as possible. I have 5 storm nodes(with different static ips), and 3 elastic nodes. For now my best topology is:
spouts:
- id: "spout"
className: "com.digitalpebble.stormcrawler.elasticsearch.persistence.CollapsingSpout"
parallelism: 10
bolts:
- id: "partitioner"
className: "com.digitalpebble.stormcrawler.bolt.URLPartitionerBolt"
parallelism: 1
- id: "fetcher"
className: "com.digitalpebble.stormcrawler.bolt.FetcherBolt"
parallelism: 5
- id: "sitemap"
className: "com.digitalpebble.stormcrawler.bolt.SiteMapParserBolt"
parallelism: 5
- id: "parse"
className: "com.digitalpebble.stormcrawler.bolt.JSoupParserBolt"
parallelism: 100
- id: "index"
className: "com.digitalpebble.stormcrawler.elasticsearch.bolt.IndexerBolt"
parallelism: 25
- id: "status"
className: "com.digitalpebble.stormcrawler.elasticsearch.persistence.StatusUpdaterBolt"
parallelism: 25
- id: "status_metrics"
className: "com.digitalpebble.stormcrawler.elasticsearch.metrics.StatusMetricsBolt"
parallelism: 5
and crawler config:
config:
topology.workers: 5
topology.message.timeout.secs: 300
topology.max.spout.pending: 250
topology.debug: false
fetcher.threads.number: 500
worker.heap.memory.mb: 4096
Questions: 1) Should I use AggreationsSpout or CollapsingSpout, what is the difference? I tried AggregationSpout, but performance was equal to performance of 1 machine with default configuration.
2) Is this configration of parallelism correct?
3) I found that "FETCH ERROR" increased by ~20% and lot of sites not fetched properly, when I jumped from 1 node to 5 node configuration. What could be the reason?
UPDATE:
es-conf.yaml:
# configuration for Elasticsearch resources
config:
# ES indexer bolt
# adresses can be specified as a full URL
# if not we assume that the protocol is http and the port 9200
es.indexer.addresses: "1.1.1.1"
es.indexer.index.name: "index"
es.indexer.doc.type: "doc"
es.indexer.create: false
es.indexer.settings:
cluster.name: "webcrawler-cluster"
# ES metricsConsumer
es.metrics.addresses: "http://1.1.1.1:9200"
es.metrics.index.name: "metrics"
es.metrics.doc.type: "datapoint"
es.metrics.settings:
cluster.name: "webcrawler-cluster"
# ES spout and persistence bolt
es.status.addresses: "http://1.1.1.1:9200"
es.status.index.name: "status"
es.status.doc.type: "status"
#es.status.user: "USERNAME"
#es.status.password: "PASSWORD"
# the routing is done on the value of 'partition.url.mode'
es.status.routing: true
# stores the value used for the routing as a separate field
# needed by the spout implementations
es.status.routing.fieldname: "metadata.hostname"
es.status.bulkActions: 500
es.status.flushInterval: "5s"
es.status.concurrentRequests: 1
es.status.settings:
cluster.name: "webcrawler-cluster"
################
# spout config #
################
# positive or negative filter parsable by the Lucene Query Parser
# es.status.filterQuery: "-(metadata.hostname:stormcrawler.net)"
# time in secs for which the URLs will be considered for fetching after a ack of fail
es.status.ttl.purgatory: 30
# Min time (in msecs) to allow between 2 successive queries to ES
es.status.min.delay.queries: 2000
es.status.max.buckets: 50
es.status.max.urls.per.bucket: 2
# field to group the URLs into buckets
es.status.bucket.field: "metadata.hostname"
# field to sort the URLs within a bucket
es.status.bucket.sort.field: "nextFetchDate"
# field to sort the buckets
es.status.global.sort.field: "nextFetchDate"
# Delay since previous query date (in secs) after which the nextFetchDate value will be reset
es.status.reset.fetchdate.after: -1
# CollapsingSpout : limits the deep paging by resetting the start offset for the ES query
es.status.max.start.offset: 500
# AggregationSpout : sampling improves the performance on large crawls
es.status.sample: false
# AggregationSpout (expert): adds this value in mins to the latest date returned in the results and
# use it as nextFetchDate
es.status.recentDate.increase: -1
es.status.recentDate.min.gap: -1
topology.metrics.consumer.register:
- class: "com.digitalpebble.stormcrawler.elasticsearch.metrics.MetricsConsumer"
parallelism.hint: 1
#whitelist:
# - "fetcher_counter"
# - "fetcher_average.bytes_fetched"
#blacklist:
# - "__receive.*"
1) Should I use AggreationsSpout or CollapsingSpout, what is the difference? I tried AggregationSpout, but performance was equal to performance of 1 machine with default configuration.
As the name suggests, AggregationSpout uses aggregations as a mechanism for grouping URLs by host (or domain or IP or whatever), whereas CollapsingSpout uses collapsing. The latter is likely to be slower if you configured it to have more than 1 URL per bucket (es.status.max.urls.per.bucket) as it issues subqueries for each bucket. The AggregationSpout should have good performance, especially if es.status.sample is set to true. CollapsingSpouts are experimental at this stage.
2) Is this configuration of parallelism correct?
This is probably more JSoupParserBolts than needed. In practice a ratio of 1:4 compared to the Fetcherbolts is fine even with 500 fetching threads. The Storm UI is useful for spotting bottlenecks and which components need scaling. Everything else looks OK, but practically, you should look at the Storm UI and metrics to adjust the topology to the best settings for your crawl.
3) I found that "FETCH ERROR" increased by ~20% and lot of sites not fetched properly, when I jumped from 1 node to 5 node configuration. What could be the reason?
That could indicate that you are saturating your network connection but this shouldn't be the case when using more nodes, at the opposite. Maybe check with the Storm UI how the FetcherBolts are distributed across the nodes. Is one worker running all the instances or do they all get an equal number? Look at the logs to see what happens, e.g. are there loads of timeout exceptions?