Search code examples
pythonsocketstcppython-requestsurllib3

Python Requests - ephemeral port exhaustion


Is there anything I can do to the below code (I thought sessions would solve this?) to prevent new TCP connections being created with each GET request? I am hitting around 1000 requests a second and after around 10,000 request run out of sockets:

def ReqOsrm(url_input):
    ul, qid = url_input
    conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=1)
    try:
        response = conn_pool.request('GET', ul)
        json_geocode = json.loads(response.data.decode('utf-8'))
        status = int(json_geocode['status'])
        if status == 200:
            tot_time_s = json_geocode['route_summary']['total_time']
            tot_dist_m = json_geocode['route_summary']['total_distance']
            used_from, used_to = json_geocode['via_points']
            out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
            return out
        else:
            print("Done but no route: %d %s" % (qid, req_url))
            return [qid, 999, 0, 0, 0, 0, 0, 0]
    except Exception as err:
        print("%s: %d %s" % (err, qid, req_url))
        return [qid, 999, 0, 0, 0, 0, 0, 0]

# run:
pool = Pool(int(cpu_count()))
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()

HTTPConnectionPool(host='127.0.0.1', port=5005): Max retries exceeded with url: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (Caused by NewConnectionError(': Failed to establish a new connection: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted',))


Eric - thank you a lot for the response I think it's exactly what I need. However, I can't quite modify it correctly. The code correctly returns 10,000 responses for the first few cycles however then it seems to break and returns less than 10,000 which leads me to think I implemented the Queue incorrectly?

enter image description here

ghost = 'localhost'
gport = 8989

def CreateUrls(routes, ghost, gport):
    return [
        ["http://{0}:{1}/route?point={2}%2C{3}&point={4}%2C{5}&vehicle=car&calc_points=false&instructions=false".format(
            ghost, gport, alat, alon, blat, blon),
            qid] for qid, alat, alon, blat, blon in routes]


def LoadRouteCSV(csv_loc):
    if not os.path.isfile(csv_loc):
        raise Exception("Could not find CSV with addresses at: %s" % csv_loc)
    else:
        return pd.read_csv(csv_loc, sep=',', header=None, iterator=True, chunksize=1000 * 10)

class Worker(Process):
    def __init__(self, qin, qout, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self._qin = qin
        self._qout = qout

    def run(self):
        # Create threadsafe connection pool
        conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=10)

        class Consumer(threading.Thread):
            def __init__(self, qin, qout):
                threading.Thread.__init__(self)
                self.__qin = qin
                self.__qout = qout

            def run(self):
                while True:
                    msg = self.__qin.get()
                    ul, qid = msg
                    try:
                        response = conn_pool.request('GET', ul)
                        s = float(response.status)
                        if s == 200:
                            json_geocode = json.loads(response.data.decode('utf-8'))
                            tot_time_s = json_geocode['paths'][0]['time']
                            tot_dist_m = json_geocode['paths'][0]['distance']
                            out = [qid, s, tot_time_s, tot_dist_m]
                        elif s == 400:
                            print("Done but no route for row: ", qid)
                            out = [qid, 999, 0, 0]
                        else:
                            print("Done but unknown error for: ", s)
                            out = [qid, 999, 0, 0]
                    except Exception as err:
                        print(err)
                        out = [qid, 999, 0, 0]
                    self.__qout.put(out)
                    self.__qin.task_done()

        num_threads = 10
        [Consumer(self._qin, self._qout).start() for _ in range(num_threads)]

if __name__ == '__main__':
    try:
        with open(os.path.join(directory_loc, 'gh_output.csv'), 'w') as outfile:
            wr = csv.writer(outfile, delimiter=',', lineterminator='\n')
            for x in LoadRouteCSV(csv_loc=os.path.join(directory_loc, 'gh_input.csv')):
                routes = x.values.tolist()
                url_routes = CreateUrls(routes, ghost, gport)
                del routes

                stime = time.time()

                qout = Queue()
                qin = JoinableQueue()
                [qin.put(url_q) for url_q in url_routes]
                [Worker(qin, qout).start() for _ in range(cpu_count())]
                # Block until all urls in qin are processed
                qin.join()
                calc_routes = []
                while not qout.empty():
                    calc_routes.append(qout.get())

                # Time diagnostics
                dur = time.time() - stime
                print("Calculated %d distances in %.2f seconds: %.0f per second" % (len(calc_routes),
                                                                                    dur,
                                                                                    len(calc_routes) / dur))
                del url_routes
                wr.writerows(calc_routes)
                done_count += len(calc_routes)
                # Continually update progress in terms of millions
                print("Saved %d calculations" % done_count)

Solution

  • I was thinking something more like this. The idea is to spawn a process per core and a pool of threads per process. Each process has a separate connection pool which are shared among the threads in that process. I don't think you can get a more performant solution without some kind of threading.

    from multiprocessing import Pool, cpu_count
    import Queue
    
    from urllib3 import HTTPConnectionPool
    import threading
    
    
    def ReqOsrm(url_input):
        # Create threadsafe connection pool
        conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=1000)
    
        # Create consumer thread class
        class Consumer(threading.Thread):
            def __init__(self, queue):
                threading.Thread.__init__(self)
                self._queue = queue
    
            def run(self):
              while True:
                  msg = self._queue.get()
                  try:
                    response = conn_pool.request('GET', url)
                    print response
                  except Exception as err:
                    print err
                  self._queue.task_done()
    
        # Create work queue and a pool of workers
        queue = Queue.Queue()
        num_threads = 20
        workers = []
        for _ in xrange(num_threads):
            worker = Consumer(queue)
            worker.start()
            workers.append(worker)
    
        for url in url_input:
            queue.put(url)
    
        queue.join()
    
    url_routes = [
        ["/proc1-0", "/proc1-1"],
        ["/proc2-0", "/proc2-1"],
        ["/proc3-0", "/proc3-1"],
        ["/proc4-0", "/proc4-1"],
        ["/proc5-0", "/proc5-1"],
        ["/proc6-0", "/proc6-1"],
        ["/proc7-0", "/proc7-1"],
        ["/proc8-0", "/proc8-1"],
        ["/proc9-0", "/proc9-1"],
    ]
    
    pool = Pool(int(cpu_count()))
    calc_routes = pool.map(ReqOsrm, url_routes)
    pool.close()
    pool.join()