Search code examples
queuetensorflowpipeline

Combine queues for async io with auto enqueue in tensorflow


I have multiple csv files which contain features. One feature is the filename of an image. I want to read the csv files line by line, push a path to the corresponding image into a new queue. Both queues should be processed in parallel.

flow

csv_queue = tf.FIFOQueue(10, tf.string)
csv_init = csv_queue.enqueue_many(['sample1.csv','sample2.csv','sample3.csv'])

path, label = read_label(csv_queue)

image_queue = tf.FIFOQueue(100, tf.string)
image_init = image_queue.enqueue(path)
_, image = read_image(image_queue)


with tf.Session() as sess:
    csv_init.run()
    image_init.run()

    print(sess.run([key, label, path])) # works
    print(sess.run(image)) # works

    print(sess.run([key, label, path])) # works
    print(sess.run(image)) # will deadlock unlike I do iq_init.run()

Implementation of helper functions (e.g. read_csv) can be found here

Can I "hide" the call to iq_init.run() behind sess.run(image) to avoid deadlock and allow batching?


Solution

  • filenames = ['./cs_disp_train.txt', './cs_limg_train.txt']
    txt_queue = tf.train.string_input_producer(filenames)
    # txt_queue = tf.FIFOQueue(10, tf.string)
    # init_txt_queue = txt_queue.enqueue_many(filenames)
    
    enqueue_ops = []
    image_queues = tf.FIFOQueue(100, tf.string)
    
    num_reader = len(filenames)
    for i in range(num_reader):
        reader = tf.TextLineReader()
        _, buffer = reader.read(txt_queue)
        enqueue_ops.append(image_queues.enqueue(buffer))
    
    tf.train.queue_runner.add_queue_runner(
        tf.train.queue_runner.QueueRunner(image_queues, enqueue_ops))
    
    y = image_queues.dequeue()
    
    sess = tf.Session()
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
    # sess.run(init_txt_queue)
    
    print sess.run([y])
    print sess.run([y])
    print sess.run([y])
    print sess.run([y])
    print sess.run([y])
    print sess.run([y])
    print sess.run([y])
    print sess.run([y])
    print sess.run([y])
    print sess.run([y])
    print sess.run([y])
    print sess.run([y])
    
    coord.request_stop()
    coord.join(threads)
    

    For example, I have two files, 'cs_disp_train.txt' and 'cs_limg_train.txt', one is for depth images' file address, another is for corresponding color images' address. The code creates two FIFOQueues, one reading these two files, another reading all file names.

    I am using tf.train.QueueRunner too. But I am not sure I understantd it. I got inspiration from here, although it reads TFRecords files. Hope this could help you.