Search code examples
pythonmultithreadingpycurl

Still can't stop the Thread


I want to stop the thread with simple CTR+C, but data is keep coming and coming, so I have to close the window and start it again. Help me please to fix the issue. In order to see what I'm saying, just type your twitter username and password in user and pass

import threading
import random
import time
import Queue
import urllib2
import sys
import simplejson, pycurl
import sys, signal

queue = Queue.Queue()
keep_running = True
user = "" # username
pswd = "" # password
headers = [ "Content-Type: text/xml; charset: UTF-8; "]

class MyThread(threading.Thread):


    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.buffer = ""
        self.streaming_url = "https://stream.twitter.com/1/statuses/filter.json?track=justinbieber"

    def start_crawl(self, user, pswd):
        self.conn = pycurl.Curl()
        self.conn.setopt(pycurl.USERPWD, "%s:%s" % (user, pswd))
        self.conn.setopt(pycurl.URL, self.streaming_url)
        #self.conn.setopt(pycurl.POST, 1)
        self.conn.setopt(pycurl.HTTPHEADER, headers)
        self.conn.setopt(pycurl.READFUNCTION, self.storage)
        self.conn.setopt(pycurl.VERBOSE, 1)
        self.conn.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
        self.conn.perform()


    def storage(self, data):
        self.buffer += str(data)
        if isinstance(self.buffer, str):
            if self.buffer.endswith("\r\n") and self.buffer.strip():
                content = json.loads(self.buffer)
                self.buffer = ""
                self.queue.put(content)
                self.queue.task_done()
                #if "text" in content:
                    #print u"{0[user][name]}: {0[text]}".format(content)
        else:
            return data

    def run(self):
        while keep_running:
            self.start_crawl(user,pswd)
            if signal.signal(signal.SIGINT, signal.SIG_DFL):
                sys.exit()
            #line = self.names[random.randint(0,len(self.names)-1)]
            #queue.put(line)


class Starter():
    def __init__(self):
        self.queue = queue
        t = MyThread(self.queue)
        t.start()
        self.next()

    def next(self):
        while True:
            time.sleep(0.1)
            if not self.queue.empty():
                line = self.queue.get(timeout=0.2)
                print line, self.queue.qsize()
            else:
                print 'waiting for queue'

def main():  
    try:
        Starter()   
        queue.join()  
    except KeyboardInterrupt, e:
        print 'Stopping'
        global keep_running
        keep_running = False
        #Join all existing threads to main thread.
        for thread in threading.enumerate():
            if thread is not threading.currentThread():
                thread.join()
        sys.exit(1)

main()

Solution

  • Set the thread as daemon and it will be killed with your program

    import threading
    import random
    import time
    import Queue
    import urllib2
    import sys
    import simplejson, pycurl
    import sys, signal
    
    queue = Queue.Queue()
    keep_running = True
    user = "" # username
    pswd = "" # password
    headers = [ "Content-Type: text/xml; charset: UTF-8; "]
    
    class MyThread(threading.Thread):
    
    
        def __init__(self, queue):
            threading.Thread.__init__(self)
            self.queue = queue
            self.buffer = ""
            self.streaming_url = "https://stream.twitter.com/1/statuses/filter.json?track=justinbieber"
    
        def start_crawl(self, user, pswd):
            self.conn = pycurl.Curl()
            self.conn.setopt(pycurl.USERPWD, "%s:%s" % (user, pswd))
            self.conn.setopt(pycurl.URL, self.streaming_url)
            #self.conn.setopt(pycurl.POST, 1)
            self.conn.setopt(pycurl.HTTPHEADER, headers)
            self.conn.setopt(pycurl.READFUNCTION, self.storage)
            self.conn.setopt(pycurl.VERBOSE, 1)
            self.conn.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
            self.conn.perform()
    
    
        def storage(self, data):
            self.buffer += str(data)
            if isinstance(self.buffer, str):
                if self.buffer.endswith("\r\n") and self.buffer.strip():
                    content = json.loads(self.buffer)
                    self.buffer = ""
                    self.queue.put(content)
                    self.queue.task_done()
                    #if "text" in content:
                        #print u"{0[user][name]}: {0[text]}".format(content)
            else:
                return data
    
        def run(self):
            while keep_running:
                self.start_crawl(user,pswd)
                if signal.signal(signal.SIGINT, signal.SIG_DFL):
                    sys.exit()
                #line = self.names[random.randint(0,len(self.names)-1)]
                #queue.put(line)
    
    
    class Starter():
        def __init__(self):
            self.queue = queue
            t = MyThread(self.queue)
            t.daemon = True
            t.start()
            self.next()
    
        def next(self):
            while True:
                time.sleep(0.1)
                if not self.queue.empty():
                    line = self.queue.get(timeout=0.2)
                    print line, self.queue.qsize()
                else:
                    print 'waiting for queue'
    
    def main():  
        try:
            Starter()   
            queue.join()  
        except KeyboardInterrupt, e:
            print 'Stopping'
            raise
    
    main()