Search code examples
pythondistributed-computingsungridenginesupercomputers

Python: IOError 110 Connection timed out when reading from disk


I'm running a Python script on a Sun Grid Engine supercompute cluster that reads in a list of file ids, sends each to a worker process for analysis, and writes one output per input file to disk.

The trouble is I'm getting IOError(110, 'Connection timed out') somewhere inside the worker function, and I'm not sure why. I've received this error in the past when making network requests that were severely delayed, but in this case the worker is only trying to read data from disk.

My question is: What would cause a Connection timed out error when reading from disk, and how can one resolve this error? Any help others can offer would be very appreciated.

Full script (the IOError crops up in minhash_text()):

from datasketch import MinHash
from multiprocessing import Pool
from collections import defaultdict
from nltk import ngrams
import json
import sys
import codecs
import config

cores = 24
window_len = 12
step = 4
worker_files = 50
permutations = 256
hashband_len = 4

def minhash_text(args):
  '''Return a list of hashband strings for an input doc'''
  try:
    file_id, path = args
    with codecs.open(path, 'r', 'utf8') as f:
      f = f.read()
    all_hashbands = []
    for window_idx, window in enumerate(ngrams(f.split(), window_len)):
      window_hashbands = []
      if window_idx % step != 0:
        continue
      minhash = MinHash(num_perm=permutations, seed=1)
      for ngram in set(ngrams(' '.join(window), 3)):
        minhash.update( ''.join(ngram).encode('utf8') )
      hashband_vals = []
      for i in minhash.hashvalues:
        hashband_vals.append(i)
        if len(hashband_vals) == hashband_len:
          window_hashbands.append( '.'.join([str(j) for j in hashband_vals]) )
          hashband_vals = []
      all_hashbands.append(window_hashbands)
    return {'file_id': file_id, 'hashbands': all_hashbands}
  except Exception as exc:
    print(' ! error occurred while processing', file_id, exc)
    return {'file_id': file_id, 'hashbands': []}

if __name__ == '__main__':

  file_ids = json.load(open('file_ids.json'))
  file_id_path_tuples = [(file_id, path) for file_id, path in file_ids.items()]

  worker_id = int(sys.argv[1])
  worker_ids = list(ngrams(file_id_path_tuples, worker_files))[worker_id]

  hashband_to_ids = defaultdict(list)
  pool = Pool(cores)

  for idx, result in enumerate(pool.imap(minhash_text, worker_ids)):
    print(' * processed', idx, 'results')
    file_id = result['file_id']
    hashbands = result['hashbands']
    for window_idx, window_hashbands in enumerate(hashbands):
      for hashband in window_hashbands:
        hashband_to_ids[hashband].append(file_id + '.' + str(window_idx))

  with open(config.out_dir + 'minhashes-' + str(worker_id) + '.json', 'w') as out:
    json.dump(dict(hashband_to_ids), out)

Solution

  • It turned out I was hammering the filesystem too hard, making too many concurrent read requests for files on the same server. That server could only allow a fixed number of reads in a given period, so any requests over that limit received a Connection Timed Out response.

    The solution was to wrap each file read request in a while loop. Inside that while loop, try to read the appropriate file from disk. If the Connection timed out error springs, sleep for a second and try again. Only once the file has been read may the while loop be broken.