Below are two versions of distributed Tensorflow code that (try to) implement a global counter that is stored on one parameter server and incremented by each of the workers (asynchronously).
Both versions seem to print the same thing, but I do not understand the reason for this. The difference between the versions is on two lines denoted by a comment, # NEW
.
When each worker runs Version 1, does the parameter server automatically store local_counter
tf.Variable
s for each worker?
In Version 2, I attempt to put each local_counter
tf.Variable
on the parameter server explicitly.
Does following Version 1 or Version 2 actually make a difference?
PS: I'm sure this is not the best way to manage a tf.Variable
shared among all instances, so I'd gladly take any advice on improvements. Thanks!
# Standard distributed Tensorflow boilerplate
# ...
elif FLAGS.job_name == 'worker':
TASK = FLAGS.task_index
with tf.device('/job:ps/task:0/cpu:0'):
with tf.variable_scope('global'):
global_counter = tf.Variable(0, name='global_counter',
trainable=False)
local_counter = tf.Variable(0, name='local_counter_{}'.format(TASK),
trainable=False)
init_op = tf.global_variables_initializer()
with tf.device('/job:worker/task:{}'.format(TASK)):
with tf.variable_scope('local'):
local_inc_op = local_counter.assign_add(1)
global_inc_op = global_counter.assign_add(1)
with tf.Session(server.target):
sess.run(init_op)
global_count = 0
while global_count < 1000:
sess.run([local_inc_op, global_inc_op])
local_count, global_count = sess.run([local_counter, global_counter])
print('Local {}, Global {}, worker-{}'.format(
local_count, global_count, TASK))
# Standard distributed Tensorflow boilerplate
# ...
elif FLAGS.job_name == 'worker':
NUM_WORKERS = len(worker_hosts)
TASK = FLAGS.task_index
with tf.device('/job:ps/task:0/cpu:0'):
with tf.variable_scope('global'):
global_counter = tf.Variable(0, name='global_counter',
trainable=False)
local_counters = [tf.Variable(0, name='local_counter_{}'.format(i),
trainable=False)
for i in range(NUM_WORKERS)] # NEW
init_op = tf.global_variables_initializer()
with tf.device('/job:worker/task:{}'.format(TASK)):
with tf.variable_scope('local'):
local_counter = local_counters[TASK] # NEW
local_inc_op = local_counter.assign_add(1)
global_inc_op = global_counter.assign_add(1)
with tf.Session(server.target):
sess.run(init_op)
global_count = 0
while global_count < 1000:
sess.run([local_inc_op, global_inc_op])
local_count, global_count = sess.run([local_counter, global_counter])
print('Local {}, Global {}, worker-{}'.format(
local_count, global_count, TASK))
I'm not sure I see much practical difference. In either case the local counters are created in the parameter service device
scope, so they'll live on the parameter server. In Version 1 each worker's graph contains only its local counter, whereas each worker's graph contains all of the local counters in Version 2 (but the workers still only interact with their own counters, and the variables themselves still live on the parameter server).
So to answer explicitly, yes you can store variables which don't exist in the parameter server's graph on the parameter server. Basically there's a hash table on the parameter server (the ResourceMgr) which can store arbitrary variable names/values.
To automatically put variables on parameter server(s), tf.train.replica_device_setter can help reduce boilerplate.