Search code examples
pythontensorflowkerastensorflow-federated

ValueError: Layer "sequential" expects 1 input(s), but it received 10 input tensors


I am following TFF tutorials to build my FL model My data is contained in different CSV files which are considered as different clients. Following this tutorial, and build the Keras model function as following

@tf.function
def create_tf_dataset_for_client_fn(dataset_path):
   return tf.data.experimental.CsvDataset(dataset_path, 
                                          record_defaults=record_defaults,
                                          header=True)
   
@tf.function
def add_parsing(dataset):
  def parse_dataset(*x):
    return OrderedDict([('y', x[-1]), ('x', x[1:-1])])
  return dataset.map(parse_dataset, num_parallel_calls=tf.data.AUTOTUNE)
source = tff.simulation.datasets.FilePerUserClientData(
  dataset_paths, create_tf_dataset_for_client_fn) 

client_ids = sorted(source.client_ids)

# Make sure the client ids are tensor strings when splitting data.
source._client_ids = [tf.cast(c, tf.string) for c in source.client_ids] 
source = source.preprocess(add_parsing)

train, test = source.train_test_client_split(source, 1)

train_client_ids = train.client_ids

train_data = train.create_tf_dataset_for_client(train_client_ids[0])

def create_keras_model():
  initializer = tf.keras.initializers.GlorotNormal(seed=0)
  return tf.keras.models.Sequential([
      tf.keras.layers.Input(shape=(32,)),
      tf.keras.layers.Dense(10, kernel_initializer=initializer),
      tf.keras.layers.Softmax(),
  ])
def model_fn():
  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=train_data.element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

Then I followed instructions and run other @tff.tf_computation functions as the tutorial, like def server_init(), def initialize_fn(), def client_update() and def server_update(). But when I run the def client_update_fn() I got this error

ValueError: in user code:

   File "<ipython-input-14-cada45ffae0f>", line 12, in client_update  *
       for batch in dataset:
   File "/usr/local/lib/python3.7/dist-packages/tensorflow_federated/python/learning/keras_utils.py", line 455, in forward_pass  *
       return self._forward_pass(batch_input, training=training)
   File "/usr/local/lib/python3.7/dist-packages/tensorflow_federated/python/learning/keras_utils.py", line 408, in _forward_pass  *
       predictions = self.predict_on_batch(inputs, training)
   File "/usr/local/lib/python3.7/dist-packages/tensorflow_federated/python/learning/keras_utils.py", line 398, in predict_on_batch  *
       return self._keras_model(x, training=training)
   File "/usr/local/lib/python3.7/dist-packages/keras/engine/base_layer_v1.py", line 740, in __call__  **
       self.name)
   File "/usr/local/lib/python3.7/dist-packages/keras/engine/input_spec.py", line 200, in assert_input_compatibility
       raise ValueError(f'Layer "{layer_name}" expects {len(input_spec)} input(s),'

ValueError: Layer "sequential" expects 1 input(s), but it received 10 input tensors. Inputs received: [<tf.Tensor 'x:0' shape=() dtype=int32>, <tf.Tensor 'x_1:0' shape=() dtype=int32>, <tf.Tensor 'x_2:0' shape=() dtype=int32>, <tf.Tensor 'x_3:0' shape=() dtype=float32>, <tf.Tensor 'x_4:0' shape=() dtype=float32>, <tf.Tensor 'x_5:0' shape=() dtype=float32>, <tf.Tensor 'x_6:0' shape=() dtype=float32>, <tf.Tensor 'x_7:0' shape=() dtype=float32>, <tf.Tensor 'x_8:0' shape=() dtype=float32>, <tf.Tensor 'x_9:0' shape=() dtype=int32>]

Notes:

  • each CSV file has 10 column as features (input) and one column as label (output).
  • I added the shape=(32,) arbitrary, I don't really know what are the shape of the data is in each column?

So, the question is, how to feed the data to the keras model and overcome this error

Thanks in advance


Solution

  • A couple problems: Your data has ten separate features, which means you actually need 10 separate inputs for your model. However, you can also stack the features into a tensor and then use a single input with the shape (10,). Here is a working example, but please note that it uses dummy data and therefore may not make much sense in reality.

    Create dummy data:

    import tensorflow as tf
    import tensorflow_federated as tff
    import pandas as pd
    from collections import OrderedDict
    import nest_asyncio
    nest_asyncio.apply()
    
    # Dummy data
    samples = 5
    data = [[tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
            tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
            tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
            tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
            tf.random.normal((samples,)).numpy().tolist(),
            tf.random.normal((samples,)).numpy().tolist(),
            tf.random.normal((samples,)).numpy().tolist(),
            tf.random.normal((samples,)).numpy().tolist(),
            tf.random.normal((samples,)).numpy().tolist(),
            tf.random.normal((samples,)).numpy().tolist(),
            tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
            tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist()]]
    df = pd.DataFrame(data)
    df = df.explode(list(df.columns))
    df.to_csv('client1.csv', index= False)
    df.to_csv('client2.csv', index= False)
    

    Load and process dataset:

    import tensorflow as tf
    
    record_defaults = [int(), int(), int(), int(), float(),float(),float(),float(),float(),float(), int(), int()]
    
    @tf.function
    def create_tf_dataset_for_client_fn(dataset_path):
       return tf.data.experimental.CsvDataset(dataset_path, 
                                              record_defaults=record_defaults,
                                              header=True)
    @tf.function
    def add_parsing(dataset):
      def parse_dataset(*x):
        return OrderedDict([('y', x[-1]), ('x', x[1:-1])])
      return dataset.map(parse_dataset, num_parallel_calls=tf.data.AUTOTUNE)
    
    dataset_paths = {'client1': '/content/client1.csv', 'client2': '/content/client2.csv'}
    
    source = tff.simulation.datasets.FilePerUserClientData(
      dataset_paths, create_tf_dataset_for_client_fn) 
    
    client_ids = sorted(source.client_ids)
    
    # Make sure the client ids are tensor strings when splitting data.
    source._client_ids = [tf.cast(c, tf.string) for c in source.client_ids] 
    source = source.preprocess(add_parsing)
    
    train, test = source.train_test_client_split(source, 1)
    
    train_client_ids = train.client_ids
    
    def reshape_data(d):
      d['x'] = tf.stack([tf.cast(x, dtype=tf.float32) for x in d['x']])
      return d
    
    train_data = [train.create_tf_dataset_for_client(c).map(reshape_data).batch(1) for c in train_client_ids]
    

    Create and run model:

    def create_keras_model():
      initializer = tf.keras.initializers.GlorotNormal(seed=0)
      return tf.keras.models.Sequential([
          tf.keras.layers.Input(shape=(10,)),
          tf.keras.layers.Dense(75, kernel_initializer=initializer),
          tf.keras.layers.Dense(50, kernel_initializer=initializer),
          tf.keras.layers.Softmax(),
      ])
    def model_fn():
      keras_model = create_keras_model()
      return tff.learning.from_keras_model(
          keras_model,
          input_spec=train_data[0].element_spec,
          loss=tf.keras.losses.SparseCategoricalCrossentropy(),
          metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
    
    def initialize_fn():
      model = model_fn()
      return model.trainable_variables
    
    @tf.function
    def client_update(model, dataset, server_weights, client_optimizer):
      """Performs training (using the server model weights) on the client's dataset."""
      client_weights = model.trainable_variables
      tf.nest.map_structure(lambda x, y: x.assign(y),
                            client_weights, server_weights)
    
      for batch in dataset:
        with tf.GradientTape() as tape:
          outputs = model.forward_pass(batch)
    
        grads = tape.gradient(outputs.loss, client_weights)
        grads_and_vars = zip(grads, client_weights)
        client_optimizer.apply_gradients(grads_and_vars)
    
      return client_weights
    
    @tf.function
    def server_update(model, mean_client_weights):
      """Updates the server model weights as the average of the client model weights."""
      model_weights = model.trainable_variables
      tf.nest.map_structure(lambda x, y: x.assign(y),
                            model_weights, mean_client_weights)
      return model_weights
    
    federated_float_on_clients = tff.FederatedType(tf.float32, tff.CLIENTS)
    
    @tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS))
    def get_average_temperature(client_temperatures):
      return tff.federated_mean(client_temperatures)
    str(get_average_temperature.type_signature)
    get_average_temperature([68.5, 70.3, 69.8])
    
    @tff.tf_computation
    def server_init():
      model = model_fn()
      return model.trainable_variables
    
    @tff.federated_computation
    def initialize_fn():
      return tff.federated_value(server_init(), tff.SERVER)
    
    whimsy_model = model_fn()
    tf_dataset_type = tff.SequenceType(whimsy_model.input_spec)
    model_weights_type = server_init.type_signature.result
    
    @tff.tf_computation(tf_dataset_type, model_weights_type)
    def client_update_fn(tf_dataset, server_weights):
      model = model_fn()
      client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
      return client_update(model, tf_dataset, server_weights, client_optimizer)
    
    @tff.tf_computation(model_weights_type)
    def server_update_fn(mean_client_weights):
      model = model_fn()
      return server_update(model, mean_client_weights)
    
    federated_server_type = tff.FederatedType(model_weights_type, tff.SERVER)
    federated_dataset_type = tff.FederatedType(tf_dataset_type, tff.CLIENTS)
    
    @tff.federated_computation(federated_server_type, federated_dataset_type)
    def next_fn(server_weights, federated_dataset):
      server_weights_at_client = tff.federated_broadcast(server_weights)
      client_weights = tff.federated_map(
          client_update_fn, (federated_dataset, server_weights_at_client))
      mean_client_weights = tff.federated_mean(client_weights)
    
      server_weights = tff.federated_map(server_update_fn, mean_client_weights)
      return server_weights
    
    federated_algorithm = tff.templates.IterativeProcess(
        initialize_fn=initialize_fn,
        next_fn=next_fn
    )
    
    server_state = federated_algorithm.initialize()
    for round in range(15):
      server_state = federated_algorithm.next(server_state, train_data)
    

    Regarding this line in the model: tf.keras.layers.Dense(50, kernel_initializer=initializer), I am using 50 output nodes, since I created dummy labels that can vary between 0 and 49. This is necessary when using the SparseCategoricalCrossentropy loss function.