I am following TFF tutorials to build my FL model My data is contained in different CSV files which are considered as different clients. Following this tutorial, and build the Keras model function as following
@tf.function
def create_tf_dataset_for_client_fn(dataset_path):
return tf.data.experimental.CsvDataset(dataset_path,
record_defaults=record_defaults,
header=True)
@tf.function
def add_parsing(dataset):
def parse_dataset(*x):
return OrderedDict([('y', x[-1]), ('x', x[1:-1])])
return dataset.map(parse_dataset, num_parallel_calls=tf.data.AUTOTUNE)
source = tff.simulation.datasets.FilePerUserClientData(
dataset_paths, create_tf_dataset_for_client_fn)
client_ids = sorted(source.client_ids)
# Make sure the client ids are tensor strings when splitting data.
source._client_ids = [tf.cast(c, tf.string) for c in source.client_ids]
source = source.preprocess(add_parsing)
train, test = source.train_test_client_split(source, 1)
train_client_ids = train.client_ids
train_data = train.create_tf_dataset_for_client(train_client_ids[0])
def create_keras_model():
initializer = tf.keras.initializers.GlorotNormal(seed=0)
return tf.keras.models.Sequential([
tf.keras.layers.Input(shape=(32,)),
tf.keras.layers.Dense(10, kernel_initializer=initializer),
tf.keras.layers.Softmax(),
])
def model_fn():
keras_model = create_keras_model()
return tff.learning.from_keras_model(
keras_model,
input_spec=train_data.element_spec,
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
Then I followed instructions and run other @tff.tf_computation
functions as the tutorial, like def server_init()
, def initialize_fn()
, def client_update()
and def server_update()
. But when I run the def client_update_fn()
I got this error
ValueError: in user code:
File "<ipython-input-14-cada45ffae0f>", line 12, in client_update *
for batch in dataset:
File "/usr/local/lib/python3.7/dist-packages/tensorflow_federated/python/learning/keras_utils.py", line 455, in forward_pass *
return self._forward_pass(batch_input, training=training)
File "/usr/local/lib/python3.7/dist-packages/tensorflow_federated/python/learning/keras_utils.py", line 408, in _forward_pass *
predictions = self.predict_on_batch(inputs, training)
File "/usr/local/lib/python3.7/dist-packages/tensorflow_federated/python/learning/keras_utils.py", line 398, in predict_on_batch *
return self._keras_model(x, training=training)
File "/usr/local/lib/python3.7/dist-packages/keras/engine/base_layer_v1.py", line 740, in __call__ **
self.name)
File "/usr/local/lib/python3.7/dist-packages/keras/engine/input_spec.py", line 200, in assert_input_compatibility
raise ValueError(f'Layer "{layer_name}" expects {len(input_spec)} input(s),'
ValueError: Layer "sequential" expects 1 input(s), but it received 10 input tensors. Inputs received: [<tf.Tensor 'x:0' shape=() dtype=int32>, <tf.Tensor 'x_1:0' shape=() dtype=int32>, <tf.Tensor 'x_2:0' shape=() dtype=int32>, <tf.Tensor 'x_3:0' shape=() dtype=float32>, <tf.Tensor 'x_4:0' shape=() dtype=float32>, <tf.Tensor 'x_5:0' shape=() dtype=float32>, <tf.Tensor 'x_6:0' shape=() dtype=float32>, <tf.Tensor 'x_7:0' shape=() dtype=float32>, <tf.Tensor 'x_8:0' shape=() dtype=float32>, <tf.Tensor 'x_9:0' shape=() dtype=int32>]
Notes:
shape=(32,)
arbitrary, I don't really know what are the shape of the data is in each column?So, the question is, how to feed the data to the keras model and overcome this error
Thanks in advance
A couple problems: Your data has ten separate features, which means you actually need 10 separate inputs for your model. However, you can also stack the features into a tensor and then use a single input with the shape (10,)
. Here is a working example, but please note that it uses dummy data and therefore may not make much sense in reality.
Create dummy data:
import tensorflow as tf
import tensorflow_federated as tff
import pandas as pd
from collections import OrderedDict
import nest_asyncio
nest_asyncio.apply()
# Dummy data
samples = 5
data = [[tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
tf.random.normal((samples,)).numpy().tolist(),
tf.random.normal((samples,)).numpy().tolist(),
tf.random.normal((samples,)).numpy().tolist(),
tf.random.normal((samples,)).numpy().tolist(),
tf.random.normal((samples,)).numpy().tolist(),
tf.random.normal((samples,)).numpy().tolist(),
tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist(),
tf.random.uniform((samples,), maxval=50, dtype=tf.int32).numpy().tolist()]]
df = pd.DataFrame(data)
df = df.explode(list(df.columns))
df.to_csv('client1.csv', index= False)
df.to_csv('client2.csv', index= False)
Load and process dataset:
import tensorflow as tf
record_defaults = [int(), int(), int(), int(), float(),float(),float(),float(),float(),float(), int(), int()]
@tf.function
def create_tf_dataset_for_client_fn(dataset_path):
return tf.data.experimental.CsvDataset(dataset_path,
record_defaults=record_defaults,
header=True)
@tf.function
def add_parsing(dataset):
def parse_dataset(*x):
return OrderedDict([('y', x[-1]), ('x', x[1:-1])])
return dataset.map(parse_dataset, num_parallel_calls=tf.data.AUTOTUNE)
dataset_paths = {'client1': '/content/client1.csv', 'client2': '/content/client2.csv'}
source = tff.simulation.datasets.FilePerUserClientData(
dataset_paths, create_tf_dataset_for_client_fn)
client_ids = sorted(source.client_ids)
# Make sure the client ids are tensor strings when splitting data.
source._client_ids = [tf.cast(c, tf.string) for c in source.client_ids]
source = source.preprocess(add_parsing)
train, test = source.train_test_client_split(source, 1)
train_client_ids = train.client_ids
def reshape_data(d):
d['x'] = tf.stack([tf.cast(x, dtype=tf.float32) for x in d['x']])
return d
train_data = [train.create_tf_dataset_for_client(c).map(reshape_data).batch(1) for c in train_client_ids]
Create and run model:
def create_keras_model():
initializer = tf.keras.initializers.GlorotNormal(seed=0)
return tf.keras.models.Sequential([
tf.keras.layers.Input(shape=(10,)),
tf.keras.layers.Dense(75, kernel_initializer=initializer),
tf.keras.layers.Dense(50, kernel_initializer=initializer),
tf.keras.layers.Softmax(),
])
def model_fn():
keras_model = create_keras_model()
return tff.learning.from_keras_model(
keras_model,
input_spec=train_data[0].element_spec,
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
def initialize_fn():
model = model_fn()
return model.trainable_variables
@tf.function
def client_update(model, dataset, server_weights, client_optimizer):
"""Performs training (using the server model weights) on the client's dataset."""
client_weights = model.trainable_variables
tf.nest.map_structure(lambda x, y: x.assign(y),
client_weights, server_weights)
for batch in dataset:
with tf.GradientTape() as tape:
outputs = model.forward_pass(batch)
grads = tape.gradient(outputs.loss, client_weights)
grads_and_vars = zip(grads, client_weights)
client_optimizer.apply_gradients(grads_and_vars)
return client_weights
@tf.function
def server_update(model, mean_client_weights):
"""Updates the server model weights as the average of the client model weights."""
model_weights = model.trainable_variables
tf.nest.map_structure(lambda x, y: x.assign(y),
model_weights, mean_client_weights)
return model_weights
federated_float_on_clients = tff.FederatedType(tf.float32, tff.CLIENTS)
@tff.federated_computation(tff.FederatedType(tf.float32, tff.CLIENTS))
def get_average_temperature(client_temperatures):
return tff.federated_mean(client_temperatures)
str(get_average_temperature.type_signature)
get_average_temperature([68.5, 70.3, 69.8])
@tff.tf_computation
def server_init():
model = model_fn()
return model.trainable_variables
@tff.federated_computation
def initialize_fn():
return tff.federated_value(server_init(), tff.SERVER)
whimsy_model = model_fn()
tf_dataset_type = tff.SequenceType(whimsy_model.input_spec)
model_weights_type = server_init.type_signature.result
@tff.tf_computation(tf_dataset_type, model_weights_type)
def client_update_fn(tf_dataset, server_weights):
model = model_fn()
client_optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
return client_update(model, tf_dataset, server_weights, client_optimizer)
@tff.tf_computation(model_weights_type)
def server_update_fn(mean_client_weights):
model = model_fn()
return server_update(model, mean_client_weights)
federated_server_type = tff.FederatedType(model_weights_type, tff.SERVER)
federated_dataset_type = tff.FederatedType(tf_dataset_type, tff.CLIENTS)
@tff.federated_computation(federated_server_type, federated_dataset_type)
def next_fn(server_weights, federated_dataset):
server_weights_at_client = tff.federated_broadcast(server_weights)
client_weights = tff.federated_map(
client_update_fn, (federated_dataset, server_weights_at_client))
mean_client_weights = tff.federated_mean(client_weights)
server_weights = tff.federated_map(server_update_fn, mean_client_weights)
return server_weights
federated_algorithm = tff.templates.IterativeProcess(
initialize_fn=initialize_fn,
next_fn=next_fn
)
server_state = federated_algorithm.initialize()
for round in range(15):
server_state = federated_algorithm.next(server_state, train_data)
Regarding this line in the model: tf.keras.layers.Dense(50, kernel_initializer=initializer)
, I am using 50 output nodes, since I created dummy labels that can vary between 0 and 49. This is necessary when using the SparseCategoricalCrossentropy
loss function.