Search code examples
pythonmultithreadingurllib2http-range

trying to split the file download buffer to into separate threads


I am trying to download the buffer of file into 5 threads but it seems like it's getting garbled.

from numpy import arange
import requests
from threading import Thread
import urllib2

url = 'http://pymotw.com/2/urllib/index.html'
sizeInBytes = r = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers['content-length']

splitBy = 5

splits = arange(splitBy + 1) * (float(sizeInBytes)/splitBy)

dataLst = []

def bufferSplit(url, idx, splits):
    req = urllib2.Request(url,  headers={'Range': 'bytes=%d-%d' % (splits[idx], splits[idx+1])})
    print {'bytes=%d-%d' % (splits[idx], splits[idx+1])}
    dataLst.append(urllib2.urlopen(req).read())


for idx in range(splitBy):
    dlth = Thread(target=bufferSplit, args=(url, idx, splits))
    dlth.start()


print dataLst

with open('page.html', 'w') as fh:
    fh.write(''.join(dataLst))

Update: So I worked over and got little but progress, however if I download a jpg it seems to be corrupted;

from numpy import arange
import os
import requests
import threading
import urllib2

# url ='http://s1.fans.ge/mp3/201109/08/John_Legend_So_High_Remix(fans_ge).mp3'
url = "http://www.nasa.gov/images/content/607800main_kepler1200_1600-1200.jpg"
# url = 'http://pymotw.com/2/urllib/index.html'
sizeInBytes = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers.get('content-length', None)


splitBy = 5

dataLst = []


class ThreadedFetch(threading.Thread):
    """ docstring for ThreadedFetch
    """
    def __init__(self, url, fileName, splitBy=5):
        super(ThreadedFetch, self).__init__()
        self.__url = url
        self.__spl = splitBy
        self.__dataLst = []
        self.__fileName = fileName

    def run(self):
        if not sizeInBytes:
            print "Size cannot be determined."
            return
        splits = arange(self.__spl + 1) * (float(sizeInBytes)/self.__spl)
        for idx in range(self.__spl):
            req = urllib2.Request(self.__url,  headers={'Range': 'bytes=%d-%d' % (splits[idx], splits[idx+1])})
            self.__dataLst.append(urllib2.urlopen(req).read())


    def getFileData(self):
        return ''.join(self.__dataLst)


fileName = url.split('/')[-1]

dl = ThreadedFetch(url, fileName)
dl.start()
dl.join()
content = dl.getFileData()
if content:
    with open(fileName, 'w') as fh:
        fh.write(content)
    print "Finished Writing file %s" % fileName

Below is how the image after getting downloaded.

corrupted image


Solution

  • Here's another version of the project. Differences:

    • thread code is a single small function

    • each thread downloads a chunk, then stores it in a global threadsafe dictionary

    • threads are started, then join()ed -- they're all running at once

    • when all done, data is reassembled in correct order then written to disk

    • extra printing, to verify everything's correct

    • output file size is calculated, for an extra comparison

    source

    import os, requests
    import threading
    import urllib2
    import time
    
    URL = "http://www.nasa.gov/images/content/607800main_kepler1200_1600-1200.jpg"
    
    def buildRange(value, numsplits):
        lst = []
        for i in range(numsplits):
            if i == 0:
                lst.append('%s-%s' % (i, int(round(1 + i * value/(numsplits*1.0) + value/(numsplits*1.0)-1, 0))))
            else:
                lst.append('%s-%s' % (int(round(1 + i * value/(numsplits*1.0),0)), int(round(1 + i * value/(numsplits*1.0) + value/(numsplits*1.0)-1, 0))))
        return lst
    
    def main(url=None, splitBy=3):
        start_time = time.time()
        if not url:
            print "Please Enter some url to begin download."
            return
    
        fileName = url.split('/')[-1]
        sizeInBytes = requests.head(url, headers={'Accept-Encoding': 'identity'}).headers.get('content-length', None)
        print "%s bytes to download." % sizeInBytes
        if not sizeInBytes:
            print "Size cannot be determined."
            return
    
        dataDict = {}
    
        # split total num bytes into ranges
        ranges = buildRange(int(sizeInBytes), splitBy)
    
        def downloadChunk(idx, irange):
            req = urllib2.Request(url)
            req.headers['Range'] = 'bytes={}'.format(irange)
            dataDict[idx] = urllib2.urlopen(req).read()
    
        # create one downloading thread per chunk
        downloaders = [
            threading.Thread(
                target=downloadChunk, 
                args=(idx, irange),
            )
            for idx,irange in enumerate(ranges)
            ]
    
        # start threads, let run in parallel, wait for all to finish
        for th in downloaders:
            th.start()
        for th in downloaders:
            th.join()
    
        print 'done: got {} chunks, total {} bytes'.format(
            len(dataDict), sum( (
                len(chunk) for chunk in dataDict.values()
            ) )
        )
    
        print "--- %s seconds ---" % str(time.time() - start_time)
    
        if os.path.exists(fileName):
            os.remove(fileName)
        # reassemble file in correct order
        with open(fileName, 'w') as fh:
            for _idx,chunk in sorted(dataDict.iteritems()):
                fh.write(chunk)
    
        print "Finished Writing file %s" % fileName
        print 'file size {} bytes'.format(os.path.getsize(fileName))
    
    if __name__ == '__main__':
        main(URL)
    

    output

    102331 bytes to download.
    done: got 3 chunks, total 102331 bytes
    --- 0.380599021912 seconds ---
    Finished Writing file 607800main_kepler1200_1600-1200.jpg
    file size 102331 bytes