Search code examples
pythontensorflowdistributed-computing

Distributed TF: Non-chief worker sees updates from chief worker but not vice versa


I'm trying to get up and running with distributed tensorflow but finding a lot of confusing behaviors. Currently I am running one ps server and two worker servers, one worker server on the same computer as the ps server and one on another computer. I wanted to start with a simple example, so I wrote something trying to add a constant to increment a variable from each worker. What I am observing is that the chief worker does not see the increments performed by the non-chief worker even though the non-chief worker does see the increments performed by the chief worker.

Here's my script (you'll notice it's an enlargement from my last question: Distributed tensorflow monopolizes GPUs after running server.__init__):

JOB_NAME   = args.job_name
TASK_INDEX = args.task_idx
DIR_NAME   = args.dir_name
CHECKPOINT_DIR = "/tmp/TF_%s" % (DIR_NAME)

ps_hosts     = ["computerB-i9:2222"]
worker_hosts = ["computerA-i7:2222", "computerB-i9:2223"]

cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
server  = tf.train.Server(cluster, job_name = JOB_NAME, task_index = TASK_INDEX)

if JOB_NAME == "ps":        
    if not os.path.exists(CHECKPOINT_DIR):
        os.makedirs(CHECKPOINT_DIR)

    server.join()

elif JOB_NAME == "worker":
    with tf.device(tf.train.replica_device_setter(
            worker_device = "/job:worker/task:%d" % TASK_INDEX, cluster = cluster)):

        global_step = tf.train.get_or_create_global_step()
        a = tf.get_variable("a", [1], initializer = tf.constant_initializer(8))
        b = tf.get_variable("b", [1], initializer = tf.constant_initializer(5))
        c = tf.assign_add(a, b)

    hooks = [tf.train.StopAtStepHook(last_step = 1000000)]

    sess_config = tf.ConfigProto(
        allow_soft_placement=True,
        log_device_placement=True,
        device_filters=["/job:ps", "/job:worker/task:%d" % TASK_INDEX])

    with tf.train.MonitoredTrainingSession(master   = server.target,
                                           is_chief = (TASK_INDEX == 0),
                                           checkpoint_dir = CHECKPOINT_DIR,
                                           hooks    = hooks,
                                           config   = sess_config) as sess:

        val = sess.run([c])
        print(val)

The behavior I am seeing is that when I run the script on the non-chief worker server alone or on the chief worker server alone, I see: 8, 13, 18, 23, ... and so on. However, if I run on both the chief and non-chief workers, I see a pattern indicating that the non-chief worker is aware of and uses the chief worker's updates, but that the chief worker is not aware of and does not use the non-chief worker's updates. The chief worker continues incrementing on its own value, while the non-chief worker uses either its last value or the chief's last value, whichever is later. So here, for example is a sample pattern:

run chief: 8
run chief: 13
run chief: 18
run non-chief: 23
run non-chief: 28
run non=chief: 33 (so both are seeming to increment normally....BUT then...)
run chief: 23 (as though non-chief did not run)
run non-chief: 28 (sees chief's update and runs off that)
run non-chief: 33 (continuing off 'the latest')
run chief: 28 (again chief sees only its own)

I also notice that if I look in CHECKPOINT_DIR at the timestamps, I see files updated when chief runs but not when non-chief runs.

I have tried a few things:

  • The file saving behavior is slight different depending on whether chief worker is on the same computer as the ps server, only in that if chief worker is not on the same computer it saves its files locally on whatever computer it is on. However, the behavior described above otherwise remains the same.
  • I tried changing which worker is run first, the chief or non chief but it doesn't affect the behavior described above.

I have the sense that the ps server preserves some state (slash the value of the variables) in addition to there being checkpoint files but I am not clear on how these relate to one another or what might be going wrong as far as what information is preserved in the ps server vs what is going to a file.

I would welcome advice about what is wrong with my code or more generally about troubleshooting ideas. Thank you.


Solution

  • It appears to be the case (I would appreciate if someone could point this out in an example or the docs) that the workers have to be always running and running at the same time. So my example was such a short toy example that this wasn't true, and I suppose basic assumptions of the framework break down.

    Modifying the end of the script to the following made everything work as expected because both workers were running at the same time:

      while True:
                val = sess.run([c])
                print(val)
                time.sleep(15)