(Long time user && First question && Nervous to ask) is True
I'm currently building a Python backend that will be deployed to a single AWS EC2 instance which has the following architecture:
|---- Data Sources -----| Temp Storage | - Data Processing --- | ----- DB ---- |
Web Crawler Data----*Save to S3* =\
API Data-----------------*Save to S3* ==> Lugi Data Pipeline --> MongoDB
As shown above, we have different ways of fetching data (ie API Requests, Scrapy Web Crawler, etc...) but the tricky/difficult part is coming up with a simple and fault-tolerant way to connect the received data with the Luigi Data Pipeline.
Is there a way to integrate a web crawler's output into a Luigi Data Pipeline? If not, how is the best way to go about bridging the gap between the HTTP data fetchers and the Luigi tasks?
Any advice, documents, or articles would be super appreciated! Also, if you need more details I'll get them in here as quick as possible.
Thank you!
I have never used luigi. But I do use scrapy. I am going to guess the real question is how you notify luigi in a reasonable way that there is new data to be processed?
There is a similar question you could learn from here:When a new file arrives in S3, trigger luigi task Perhaps you guys work at the same place:).
I would strongly suggest hosting your spider in scrapyd and using scrapyd-client to drive it. There are ALL KINDS of hairy things that pop up if you try to run scrapy in other tools that use the twisted library (not sure if luigi does). I would drive the spider using scrapyd-client and let your spider post to a trigger url that tells luigi to kick off the task somehow.
Again, as I haven't used luigi, I don't know details there...but you do not want to be busy-checking/polling to find out if the job is done.
I have a django web app and I kick off the spider, store the jobid from scrapyd-client, and get a json tap on the shoulder when it's done, then I use celery and solr to ingest the data.
Edit to include pipeline code from comments below:
for fentry in item['files']:
# open and read the file
pdf = open(rootdir+os.path.sep+fentry['path'],'rb').read()
# just in case we need cookies
cj = CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj))
# set the content type
headers = {
'Content-Type': 'application/json'
}
#fill out the object
json_body = json.dumps({
'uid' : 'indeed-'+item['indeed_uid'],
'url' : item['candidate_url'],
'first_name' : fname,
'last_name' : lname,
'pdf' : base64.b64encode(pdf).decode(),
'jobid': spider.jobid
}).encode()
#, ensure_ascii=False)
# send the POST and read the result
request = urllib.request.Request('http://localhost:8080/api/someapi/', json_body, headers)
request.get_method = lambda: 'POST'
response = opener.open(request)