Search code examples
tensorflowkerasgoogle-cloud-platformgoogle-cloud-dataproctony

Using Distributed Tensorflow on a Keras model on GCP Dataproc


I am completely new to cloud computing on GCP Dataproc. I installed TonY (Tensorflow on Yarn) when I was creating my cluster in order to be able to run tensorflow on it.

I am stuck on the part where I create the tf.train.ClusterSpec portion in order to run distributed tensorflow on my keras model. It seems as like as long as I create a clusterspec and then create a server and a session using tf.train.Server and tf.Session, I can just set the session for my keras model using K.set_session(session created). I just wanted to make sure if this is correct? What are the worker and ps nodes and how do I reference it to my master and worker nodes in the cluster that I created in GCP Dataproc. When I am creating a session as well is the parameter inside tf.train.Server just server.target?

# Keras Core
from keras.layers.convolutional import MaxPooling2D, Convolution2D, 
AveragePooling2D
from keras.layers import Input, Dropout, Dense, Flatten, Activation
from keras.layers.normalization import BatchNormalization
from keras.layers.merge import concatenate
from keras import regularizers
from keras import initializers
from keras.models import Model
# Backend
from keras import backend as K
# Utils
from keras.utils.layer_utils import convert_all_kernels_in_model
from keras.utils.data_utils import get_file
from keras.preprocessing.image import ImageDataGenerator
from keras import optimizers
from keras.preprocessing.image import img_to_array, load_img

from keras import backend as K
import numpy as np
import os
import inspect
from tqdm import tqdm
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from keras.callbacks import ModelCheckpoint
import tensorflow as tf
from PIL import Image

#Is worker going to reference to my worker nodes in my cluster and ps 
references to my master node in my cluster?
#Do I put the external addresses of the nodes into their respective lists?
cluster = tf.train.ClusterSpec({"worker": ["35.236.62.93:2222",
                                       "35.236.30.154:2222",
                                       "35.235.127.146:2222"],
                            "ps": ["5.235.95.74:2222"]})
#Is my job name correct as well?
server = tf.train.Server(cluster, job_name="ps")

#Does tf.Session take in server.target as its parameter?
sess = tf.Session(server.target)
K.set_session(sess)

Solution

  • In order to access your Cluster configuration, please use CLUSTER_SPEC from your TensorFlow code. You can follow this working example:

        cluster_spec_str = os.environ["CLUSTER_SPEC"]
        cluster_spec = json.loads(cluster_spec_str)
        ps_hosts = cluster_spec['ps']
        worker_hosts = cluster_spec['worker']
    

    By launching a TonY job with Cloud Dataproc, TonY sets the CLUSTER_SPEC environment variable inside your YARN containers, which you can access directly as indicated above.

    You can also access the Job name using JOB_NAME environment variable:

      job_name = os.environ["JOB_NAME"]
    

    You should be able to use the TonY MNIST example as reference. Please let us know if this works for you or not.

    In Cloud Dataproc we have 2 concepts:

    • Master
    • Workers

    In the Hadoop world, these refers to Resource Manager (Master) and Node Manager (Worker) respectively. In this example we have a Cloud Dataproc cluster of 1 master and 4 workers:

    enter image description here

    This shows all VMs in the cluster:

    enter image description here

    From TensorFlow perspective you can do Distributed Machine Learning using 3 main strategies:

    • MirroredStrategy: Multiple GPUs, Single Node
    • CollectiveAllReduceStrategy: Multiple GPUs, Multiple Nodes All-Reduce
    • ParameterServerStrategy: Multiple GPUs, Multiple Nodes, Parameter+Worker Nodes

    In your case, when you launch a TonY job you seem to refer to the latter, hence you will define a .xml file in this case tony.xml where you define the number of parameter servers and workers.

    <configuration>
     <property>
      <name>tony.application.security.enabled</name>
      <value>false</value>
     </property>
     <property>
      <name>tony.worker.instances</name>
      <value>${worker_instances}</value>
     </property>
     <property>
      <name>tony.worker.memory</name>
      <value>${worker_memory}</value>
     </property>
     <property>
      <name>tony.ps.instances</name>
      <value>${ps_instances}</value>
     </property>
     <property>
      <name>tony.ps.memory</name>
      <value>${ps_memory}</value>
     </property>
    </configuration>
    

    When TonY client sends this request to Cloud Dataproc, Dataproc, by default will allocate containers in any of the Dataproc workers (Dataproc master is not used for processing) . Example:

    <configuration>
     <property>
      <name>tony.application.security.enabled</name>
      <value>false</value>
     </property>
     <property>
      <name>tony.worker.instances</name>
      <value>2</value>
     </property>
     <property>
      <name>tony.worker.memory</name>
      <value>4g</value>
     </property>
     <property>
      <name>tony.ps.instances</name>
      <value>1</value>
     </property>
     <property>
      <name>tony.ps.memory</name>
      <value>2g</value>
     </property> 
    </configuration>
    

    This will request 4 containers:

    • 1 Application master
    • 1 Parameter Server
    • 2 Worker Servers

    The allocation depends on Resource Manager scheduler. By default Dataproc uses DefaultResourceCalculator and will try to find resources in any of the Dataproc cluster active workers.

    Please take a look at the current sample for MNIST and Cloud DataProc:

    https://github.com/linkedin/TonY/tree/master/tony-examples/tony-in-gcp