Search code examples
python-3.xscrapyprefect

How can I use Scrapy CrawlerProcess with Prefect 2


I have written some scrapy spider and I run it using Crawler process and I want it to run as a prefect flow. This is my function to run CrawlerProcess

from prefect import flow
from SpyingTools.spiders.bankWebsiteNews import BankNews
from scrapy.crawler import CrawlerProcess
@flow
def bank_website_news():
    settings = get_project_settings()
    process = CrawlerProcess(settings)

    process.crawl(BankNews)
    process.start()

Add more Info: This is my BankNews class

class BankNews(scrapy.Spider):
    sph= SpiderHelper()
    name           ='BANKWEBSITE'
    latest_date    = None
    bank           = "ACLEDA"
    index_news     = 2
    image_base_url = 'https://www.acledabank.com.kh/kh'


    custom_settings = {
        'ITEM_PIPELINES': {
            'SpyingTools.pipelines.DataPipeline': 400,
        }
        }

    def generateDaterange(self,start_date=None):

        if start_date==None:
            start_date = date(2021, 1, 1) 
        end_date = datetime.today()
        print(end_date) # perhaps date.now()

        delta = end_date - start_date
        dates = [0]*(delta.days+1 ) 
        for i in range(delta.days + 1):
            day = start_date + timedelta(days=i)
            dates[i] = str(day).replace("-","")
        return dates

  
    def start_requests(self):
        base_url = "https://www.acledabank.com.kh/kh/eng/md_ln"
        self.latest_date = self.sph.get_latest_date_news(self.bank,self.bank).values[0][0]

        self.latest_date = self.latest_date + timedelta(days=1)

        dates= self.generateDaterange(self.latest_date)
        print(dates)
        for date_ in dates:    
            yield scrapy.Request(base_url+date_, self.parse)

    def parse(self,response):
        news      = NewsItem()
        url_r     = str(response.request.url).split("_ln")[-1]  
        if "_" in url_r:
            self.index_news+=1
        else:
            self.index_news= 2

        container = response.css("div.main")[0]
        div_khmer = container.css('div.font-khm').get()
        h1_khmer  =  container.css('h1.font-khm').get()

        title = container.css('h1::text').get()
        date_ = container.css('p.date::text').get()

        img      = container.css("div.imgbox  img::attr(src)").get()
        img_link = self.image_base_url+ img.split('..')[-1]

        if date_:
            content = "\n".join(container.xpath('p//text()')[7:].getall())
        else:
            content = "\n".join(container.xpath('p//text()')[6:].getall())
        
        yield scrapy.Request(response.request.url+"_"+str(self.index_news), self.parse)
        if div_khmer==None and h1_khmer==None:
            news['time']    =date_
            news['title']   = title
            news['content'] = content
            news['name']    = self.bank.upper()
            news['link']    = response.request.url
            news['image']   = img_link
            news["source"]  = self.bank
            yield news  

And this is my Datapipeline

class DataPipeline(PipelineBase):
    def __init__(self) -> None:
        super().__init__()

    def process_item(self, item, spider):

        if isinstance(item, NewsItem):

            df = pd.DataFrame([
            [
                item['time'],item['title'],
                item['content'],item["name"],item["link"],item["image"]
            ]
            ],
            columns=["DATE","TITLE","CONTENT","NAME","URL","IMAGE"])
            try:
                df["DATE"] = pd.to_datetime(df["DATE"])
            except:
                df["DATE"] = datetime.datetime.now()
        
            if item.get("source"):
                df["WEBSITE"]= item.get("source")
            else:
                df["WEBSITE"] = 'khmertimeskh'

            today = date.today()
            df["DOWNLOAD_DATE"] = today
        


            df.to_sql("NEWS", self.engine,index=False,if_exists='append',
            dtype={
                "TITLE": sqlalchemy.types.NVARCHAR(),
                "CONTENT":sqlalchemy.types.NVARCHAR(),
                'DATE': sqlalchemy.types.DATETIME(),
                'DOWNLOAD_DATE':sqlalchemy.types.DATE()
            }
            )
            

            return df
        else:
            return item

Bank news is a spider that I wrote to scrape news

And this is the error that I got when I try to run bank_website_news() function as a prefect flow

  File "D:\Development\spyingtool\venv\Lib\site-packages\twisted\internet\base.py", line 1282, in _handleSignals
    signal.signal(signal.SIGTERM, reactorBaseSelf.sigTerm)
  File "C:\Users\seab.navin\AppData\Local\Programs\Python\Python311\Lib\signal.py", line 56, in signal
    handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: signal only works in main thread of the main interpreter

Does anyone know how to solve this problem and running Scrapy with prefect flow ?


Solution

  • I've tried multiple solutions including Asycio library for a synchronous function and CrawlRunner, but it did not work well, So I decide to change from CrawlerProcess to run Scrapy command line instead and it works well,this is my new change code

    import subprocess
    from prefect import task,flow,get_run_logger
    
    
    @task
    def run_query():
        query = 'scrapy crawl BANKWEBSITE'
        proc = subprocess.Popen(query, stdout=subprocess.PIPE, stderr=subprocess.PIPE,shell=True)
        stdout, stderr = proc.communicate()
        if proc.returncode != 0:
            raise Exception(stderr.decode())
    @flow
    def run_all_task():
      run_query()