I'm trying to get up and running with distributed tensorflow but finding a lot of confusing behaviors. Currently I am running one ps server and two worker servers, one worker server on the same computer as the ps server and one on another computer. I wanted to start with a simple example, so I wrote something trying to add a constant to increment a variable from each worker. What I am observing is that the chief worker does not see the increments performed by the non-chief worker even though the non-chief worker does see the increments performed by the chief worker.
Here's my script (you'll notice it's an enlargement from my last question: Distributed tensorflow monopolizes GPUs after running server.__init__):
JOB_NAME = args.job_name
TASK_INDEX = args.task_idx
DIR_NAME = args.dir_name
CHECKPOINT_DIR = "/tmp/TF_%s" % (DIR_NAME)
ps_hosts = ["computerB-i9:2222"]
worker_hosts = ["computerA-i7:2222", "computerB-i9:2223"]
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
server = tf.train.Server(cluster, job_name = JOB_NAME, task_index = TASK_INDEX)
if JOB_NAME == "ps":
if not os.path.exists(CHECKPOINT_DIR):
os.makedirs(CHECKPOINT_DIR)
server.join()
elif JOB_NAME == "worker":
with tf.device(tf.train.replica_device_setter(
worker_device = "/job:worker/task:%d" % TASK_INDEX, cluster = cluster)):
global_step = tf.train.get_or_create_global_step()
a = tf.get_variable("a", [1], initializer = tf.constant_initializer(8))
b = tf.get_variable("b", [1], initializer = tf.constant_initializer(5))
c = tf.assign_add(a, b)
hooks = [tf.train.StopAtStepHook(last_step = 1000000)]
sess_config = tf.ConfigProto(
allow_soft_placement=True,
log_device_placement=True,
device_filters=["/job:ps", "/job:worker/task:%d" % TASK_INDEX])
with tf.train.MonitoredTrainingSession(master = server.target,
is_chief = (TASK_INDEX == 0),
checkpoint_dir = CHECKPOINT_DIR,
hooks = hooks,
config = sess_config) as sess:
val = sess.run([c])
print(val)
The behavior I am seeing is that when I run the script on the non-chief worker server alone or on the chief worker server alone, I see: 8, 13, 18, 23, ...
and so on. However, if I run on both the chief and non-chief workers, I see a pattern indicating that the non-chief worker is aware of and uses the chief worker's updates, but that the chief worker is not aware of and does not use the non-chief worker's updates. The chief worker continues incrementing on its own value, while the non-chief worker uses either its last value or the chief's last value, whichever is later. So here, for example is a sample pattern:
run chief: 8
run chief: 13
run chief: 18
run non-chief: 23
run non-chief: 28
run non=chief: 33 (so both are seeming to increment normally....BUT then...)
run chief: 23 (as though non-chief did not run)
run non-chief: 28 (sees chief's update and runs off that)
run non-chief: 33 (continuing off 'the latest')
run chief: 28 (again chief sees only its own)
I also notice that if I look in CHECKPOINT_DIR
at the timestamps, I see files updated when chief
runs but not when non-chief
runs.
I have tried a few things:
chief worker
is on the same computer as the ps server
, only in that if chief worker
is not on the same computer it saves its files locally on whatever computer it is on. However, the behavior described above otherwise remains the same.chief
or non chief
but it doesn't affect the behavior described above.I have the sense that the ps server
preserves some state (slash the value of the variables) in addition to there being checkpoint files
but I am not clear on how these relate to one another or what might be going wrong as far as what information is preserved in the ps server
vs what is going to a file.
I would welcome advice about what is wrong with my code or more generally about troubleshooting ideas. Thank you.
It appears to be the case (I would appreciate if someone could point this out in an example or the docs) that the workers have to be always running and running at the same time. So my example was such a short toy example that this wasn't true, and I suppose basic assumptions of the framework break down.
Modifying the end of the script to the following made everything work as expected because both workers were running at the same time:
while True:
val = sess.run([c])
print(val)
time.sleep(15)