i am trying to load a dataset from csv and perform some federated learning on the available data.
i manage to load a federated dataset from a given csv file and load both the train and the test data.
My question now is how to reproduce a working example to build an iterative process that performs a custom federated averaging on this data.
Here is my code but it's not working:
import collections
import os
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_federated as tff
from absl import app
from tensorflow.keras import layers
from src.main import Parameters
global input_spec
def main(args):
working_dir = "D:/User/Documents/GitHub/TriaBaseMLBackup/input/fakehdfs/nms/ystr=2016/ymstr=1/ymdstr=26"
client_id_colname = 'counter'
SHUFFLE_BUFFER = 1000
NUM_EPOCHS = 1
for root, dirs, files in os.walk(working_dir):
file_list = []
for filename in files:
if filename.endswith('.csv'):
file_list.append(os.path.join(root, filename))
df_list = []
for file in file_list:
df = pd.read_csv(file, delimiter="|", usecols=[1, 2, 6, 7], header=None, na_values=["NIL"],
na_filter=True, names=["time", "meas_info", "counter", "value"])
# df_list.append(df[["value"]])
if df_list:
rawdata = pd.concat(df_list)
client_ids = df.get(client_id_colname)
train_client_ids = client_ids.sample(frac=0.5).tolist()
# test_client_ids = [x for x in client_ids if x not in train_client_ids]
example_dataset = train_data.create_tf_dataset_for_client(
train_data.client_ids[0]
)
def create_tf_dataset_for_client_fn(client_id):
# a function which takes a client_id and returns a
# tf.data.Dataset for that client
# target = df.pop('value')
client_data = df[df['value'] == client_id]
print(df.head())
features = ['time', 'meas_info', 'value']
LABEL_COLUMN = 'counter'
dataset = tf.data.Dataset.from_tensor_slices(
(collections.OrderedDict(df[features].to_dict('list')),
df[LABEL_COLUMN].to_list())
)
global input_spec
input_spec = dataset.element_spec
dataset = dataset.shuffle(SHUFFLE_BUFFER).batch(1).repeat(NUM_EPOCHS)
return dataset
train_data = tff.simulation.ClientData.from_clients_and_fn(
client_ids=train_client_ids,
create_tf_dataset_for_client_fn=create_tf_dataset_for_client_fn
)
# split client id into train and test clients
loss_builder = tf.keras.losses.SparseCategoricalCrossentropy
metrics_builder = lambda: [tf.keras.metrics.SparseCategoricalAccuracy()]
def retrieve_model():
initializer = tf.keras.initializers.GlorotNormal(seed=0)
model = tf.keras.models.Sequential([
tf.keras.layers.LSTM(2, input_shape=(1, 2), return_sequences=True),
tf.keras.layers.Dense(256, activation=tf.nn.relu),
tf.keras.layers.Activation(tf.nn.softmax),
])
return model
print(input_spec)
def tff_model_fn() -> tff.learning.Model:
return tff.learning.from_keras_model(
keras_model=retrieve_model(),
input_spec=example_dataset.element_spec,
loss=loss_builder(),
metrics=metrics_builder())
iterative_process = tff.learning.build_federated_averaging_process(
tff_model_fn, Parameters.server_adam_optimizer_fn, Parameters.client_adam_optimizer_fn)
server_state = iterative_process.initialize()
for round_num in range(Parameters.FLAGS.total_rounds):
sampled_clients = np.random.choice(
train_data.client_ids,
size=Parameters.FLAGS.train_clients_per_round,
replace=False)
sampled_train_data = [
train_data.create_tf_dataset_for_client(client)
for client in sampled_clients
]
server_state, metrics = iterative_process.next(server_state, sampled_train_data)
train_metrics = metrics['train']
print(metrics)
if __name__ == '__main__':
app.run(main)
def start():
app.run(main)
Here is the input_spec output
(OrderedDict([('time', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('meas_info', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('value', TensorSpec(shape=(), dtype=tf.int64, name=None))]), TensorSpec(shape=(), dtype=tf.float32, name=None))
Here is the error that I got
ValueError: Layer sequential expects 1 inputs, but it received 3 input tensors. Inputs received: [<tf.Tensor 'batch_input:0' shape=() dtype=int32>, <tf.Tensor 'batch_input_1:0' shape=() dtype=int32>, <tf.Tensor 'batch_input_2:0' shape=() dtype=int64>]
Can anyone help me to figure out the problem?
As the error message:
ValueError: Layer sequential expects 1 inputs, but it received 3 input tensors.
says, the Keras model is defined with only a single input (the first layer in the list):
model = tf.keras.models.Sequential([
tf.keras.layers.LSTM(2, input_shape=(1, 2), return_sequences=True),
tf.keras.layers.Dense(256, activation=tf.nn.relu),
tf.keras.layers.Activation(tf.nn.softmax),
])
Try inspecting model.input_spec
to see what objects the model is expecting to be fed as input.
>>> [InputSpec(shape=(None, None, 2), ndim=3)]
Where as the dataset defines and OrderedDict
of 3 tensors for input features:
features = ['time', 'meas_info', 'value']
LABEL_COLUMN = 'counter'
dataset = tf.data.Dataset.from_tensor_slices(
(collections.OrderedDict(df[features].to_dict('list')),
df[LABEL_COLUMN].to_list())
)
Try inspecting the value of dataset.element_spec
to see what objects the dataset will feed the model.
To make them compatible will require either changing the model definition, or the dataset. I'll assume the three features in the dataset are desired, in which case we want to tell Keras we have three features from the OrderedDict
. We'll need to use the Functional model API from Keras.
SEQUENCE_LENGTH = 5
input_dict = {f: tf.keras.layers.Input(shape=(SEQUENCE_LENGTH, 1), name=f) for f in features}
concatenated_inputs = tf.keras.layers.Concatenate()(input_dict.values())
lstm_output = tf.keras.layers.LSTM(2, input_shape=(1, 2), return_sequences=True)(concatenated_inputs)
logits = tf.keras.layers.Dense(256, activation=tf.nn.relu)(lstm_output)
predictions = tf.keras.layers.Activation(tf.nn.softmax)(logits)
model = tf.keras.models.Model(inputs=input_dict, outputs=predictions
Note that for the LSTM layer, I needed to provide and extra SEQUENCE_LENGTH
variable and dimension. The shape=(SEQUENCE_LENGTH, 1)
will need to be modified to fit the shape of the features coming out of the dataset.
To test if the model and dataset and compatible quickly (without all the other machinery), ensure that the following doesn't raise an error:
model(next(iter(dataset))[0])