My workflow is a little complex but I hope someone would understand it from explanation or code below.
Basically, I am scraping a site/directory of company. When a query is passed it returns mini profile of companies i.e 50 companies per page. Using celery, I am trying to get all companies from total pages of search results using a group of tasks. The workflow is as follows:
Note i is a list containing each page result company_work processes this list as a group too by calling another group.
@shared_task
def company_worker(items):
return group(get_site.s(item) for item in items)()
@shared_task
def process_ali(url, query):
content = get_documment.s(url)()
doc = abs_url(content, url)
if doc is None:
return
companies = []
for sel in doc.xpath("//div[@class='item-main']"):
item = {'source': 'ebay'}
company = sel.xpath("div[@class='top']/div[@class='corp']/div[@class='item-title']/"
"h2[@class='title ellipsis']/a/text()")[0]
contact_url = sel.xpath("div[@class='top']/div[@class='corp']/"
"div[@class='company']/a[@class='cd']/@href")[0]
item['contact_url'] = contact_url
companies.append(item)
return companies
@shared_task
def get_site(item):
site = item.get('contact_url')
content = get_documment.s(site)() # this module handle requests and returns content of page as string
doc = abs_url(content, site) # make links in page absolute and return parseable element tree (lxml.html)
web_urls = doc.xpath("//div[@class='company-contact-information']/table/tr/td/a/@href") # more than one website possible
#validate each website
webs = []
for url in web_urls:
uri = None
if len(url) > 6 and url.startswith('http'):
uri = url
elif url.startswith('www'):
uri = 'http://' + url
if uri:
up = urlparse(uri)
site = up.scheme + "://" + up.netloc
webs.append(site)
# remove bad links e.g site.ebay.com, faceboo.com
item['website'] = list(filter(None, remove_blacklist_links(webs)))
if item['website']:
#store website and item['company'] in DB
...
return item['website']
def process_site(engine, query):
# do some stuff here with engine and also find total pages base_url... using requests and lxml
urls =[]
for x in range(1, total_pages+1):
start_url = page + "{}.html".format(x)
print(start_url)
urls.append(start_url)
# process bunch of urls that returns list containing list of dictionaries i.e
# [[{'url':'http://example.org'},{'url':'http://example.com'}], [{...},{...}]]
res = group(process_ali.s(url, query) for url in urls )()
all = group(company_worker.s(i) for i in res)() # process outer list above as a group
return all
This is how i call the tasks from the Python Interpreter.
>>> from b2b.tasks import *
>>> from pprint import pprint
>>> from celery import shared_task, group, task, chain, chord
>>> from celery.task.sets import subtask
>>> base_url = "http://ebay.com"
>>> query = "bag"
>>> res = process_site.s(base_url, query)()
http://www.ebay.com/company/bag/-50/1.html
http://www.ebay.com/company/bag/-50/2.html
http://www.ebay.com/company/bag/-50/3.html
http://www.ebay.com/company/bag/-50/4.html
http://www.ebay.com/company/bag/-50/5.html
...
Traceback i get immediately after the list of urls above ...
Traceback (most recent call last):
File "<console>", line 1, in <module>
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/celery/canvas.py", line 172, in __call__
return self.type(*args, **kwargs)
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/celery/app/task.py", line 420, in __call__
return self.run(*args, **kwargs)
File "/Users/Me/projects/django_stuff/scraper/b2b/tasks.py", line 224, in process_site
all = group(company_worker.s(i) for i in res)()
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/celery/canvas.py", line 525, in __call__
return self.apply_async(partial_args, **options)
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/celery/canvas.py", line 504, in apply_async
add_to_parent=add_to_parent)
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/celery/app/task.py", line 420, in __call__
return self.run(*args, **kwargs)
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/celery/app/builtins.py", line 172, in run
add_to_parent=False) for stask in taskit]
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/celery/canvas.py", line 251, in apply_async
return _apply(args, kwargs, **options)
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/celery/app/task.py", line 559, in apply_async
**dict(self._get_exec_options(), **options)
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/celery/app/base.py", line 353, in send_task
reply_to=reply_to or self.oid, **options
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/celery/app/amqp.py", line 305, in publish_task
**kwargs
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/kombu/messaging.py", line 165, in publish
compression, headers)
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/kombu/messaging.py", line 241, in _prepare
body) = dumps(body, serializer=serializer)
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/kombu/serialization.py", line 164, in dumps
payload = encoder(data)
File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/contextlib.py", line 35, in __exit__
self.gen.throw(type, value, traceback)
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/kombu/serialization.py", line 59, in _reraise_errors
reraise(wrapper, wrapper(exc), sys.exc_info()[2])
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/kombu/serialization.py", line 55, in _reraise_errors
yield
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/kombu/serialization.py", line 164, in dumps
payload = encoder(data)
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/anyjson/__init__.py", line 141, in dumps
return implementation.dumps(value)
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/anyjson/__init__.py", line 87, in dumps
return self._encode(data)
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/simplejson/__init__.py", line 380, in dumps
return _default_encoder.encode(obj)
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/simplejson/encoder.py", line 275, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/simplejson/encoder.py", line 357, in iterencode
return _iterencode(o, 0)
File "/Users/Me/.virtualenvs/djangoscrape/lib/python2.7/site-packages/simplejson/encoder.py", line 252, in default
raise TypeError(repr(o) + " is not JSON serializable")
EncodeError: <AsyncResult: 7838a203-a853-4755-992b-cfd67207d398> is not JSON serializable
>>>
The arguments sent to a celery task must be JSON serializable (eg, string, list dictionary etc) so there's most likely one of the arguments to one of the tasks that are not.