Search code examples
pythontensorflowtensorflow-datasetstensorflow-federated

splitting the data into training and testing in federated learning


I am new in federated learning I am currently experimenting with a model by following the official TFF documentation. But I am stuck with an issue and hope I find some explanation here.

I am using my own dataset, the data are distributed in multiple files, each file is a single client (as I am planning to structure the model). and the dependant and independent variables have been defined.

Now, my question is how can I split the data into training and testing sets in each client(file) in federated learning? like what we -normally- do in the centralized ML models The following code is what I have implemented so far: note my code is inspired by the official documentation and this post which is almost similar to my application, but it aims to split the clients as training and testing clients itself while my aim is to split the data inside these clients.

dataset_paths = {
  'client_0': '/content/drive/MyDrive/Colab Notebooks/1.csv',
  'client_1': '/content/drive/MyDrive/Colab Notebooks/2.csv',
  'client_2': '/content/drive/MyDrive/Colab Notebooks/3.csv'
}
record_defaults = [int(), int(), int(), int(), float(),float(),float(),
                   float(),float(),float(), int(), int(),float(),float(),int()]

@tf.function
def create_tf_dataset_for_client_fn(dataset_path):
   return tf.data.experimental.CsvDataset(dataset_path,
                                          record_defaults=record_defaults,
                                          header=True)

@tf.function
def add_parsing(dataset):
  def parse_dataset(*x):
    ## x defines the dependant varable & y defines the independant 
    return OrderedDict([('x', x[-1]), ('y', x[1:-1])])
  return dataset.map(parse_dataset, num_parallel_calls=tf.data.AUTOTUNE)

source = tff.simulation.datasets.FilePerUserClientData(
  dataset_paths, create_tf_dataset_for_client_fn) 

source = source.preprocess(add_parsing)
## Creat the the datasets from client data 
dataset_creation=source.create_tf_dataset_for_client(source.client_ids[0-2])
print(dataset_creation)
>>> _VariantDataset element_spec=OrderedDict([('x', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('y', (TensorSpec(shape=(), dtype=tf.int32, name=None), TensorSpec(shape=(), dtype=tf.int32, name=None), TensorSpec(shape=(), dtype=tf.int32, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None), TensorSpec(shape=(), dtype=tf.int32, name=None)))])>
## Convert the x into array(I think it is necessary for spliting to training and testing sets ) 
test= tf.nest.map_structure(lambda x: x.numpy(),next(iter(dataset_creation)))
print(test)
>>> OrderedDict([('x', 1), ('y', (0, 1, 9, 85.0, 7.75, 85.0, 95.0, 75.0, 50.0, 6))])

My understanding to supervised ML is to split the data into training and testing sets as in the below code, I am not sure how to do this in Federated learning and whether it will work this way or not?

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.4, random_state = 42) 

So, please I am looking for an explanation for this issue so I can proceed to the training phase.


Solution

  • See this tutorial. You should be able to create two datasets (train and test) based on the clients and their data:

    import tensorflow as tf
    import tensorflow_federated as tff
    from collections import OrderedDict
    
    record_defaults = [int(), int(), int(), int(), float(),float(),float(),float(),float(),float(), int(), int()]
    
    @tf.function
    def create_tf_dataset_for_client_fn(dataset_path):
       return tf.data.experimental.CsvDataset(dataset_path, record_defaults=record_defaults, header=True)
       
    @tf.function
    def add_parsing(dataset):
      def parse_dataset(*x):
        return OrderedDict([('label', x[:-1]), ('features', x[1:-1])])
      return dataset.map(parse_dataset, num_parallel_calls=tf.data.AUTOTUNE)
    
    def split_train_test(client_ids):
      train, test = [], []
      for x in client_ids:
        d = source.create_tf_dataset_for_client(x)
        d_length = d.reduce(0, lambda x,_: x+1).numpy()
        d = d.shuffle(d_length)
        train.append(list(d.take(int(d_length*.8)))) 
        test.append(list(d.skip(int(d_length*.2))))
      return train[0], test[0]
    
    dataset_paths = {'client1': '/content/client1.csv', 'client2': '/content/client2.csv', 
                     'client3': '/content/client2.csv', 'client4': '/content/client2.csv'}
    source = tff.simulation.datasets.FilePerUserClientData(
      dataset_paths, create_tf_dataset_for_client_fn) 
    
    client_ids = sorted(source.client_ids)
    
    federated_train_data, federated_test_data = split_train_test(client_ids)
    print(*federated_train_data, sep='\n')
    
    (<tf.Tensor: shape=(), dtype=int32, numpy=24>, <tf.Tensor: shape=(), dtype=int32, numpy=17>, <tf.Tensor: shape=(), dtype=int32, numpy=27>, <tf.Tensor: shape=(), dtype=int32, numpy=4>, <tf.Tensor: shape=(), dtype=float32, numpy=0.17308392>, <tf.Tensor: shape=(), dtype=float32, numpy=1.889401>, <tf.Tensor: shape=(), dtype=float32, numpy=1.6235029>, <tf.Tensor: shape=(), dtype=float32, numpy=-0.56010467>, <tf.Tensor: shape=(), dtype=float32, numpy=-1.0171211>, <tf.Tensor: shape=(), dtype=float32, numpy=0.43558818>, <tf.Tensor: shape=(), dtype=int32, numpy=40>, <tf.Tensor: shape=(), dtype=int32, numpy=14>)
    (<tf.Tensor: shape=(), dtype=int32, numpy=8>, <tf.Tensor: shape=(), dtype=int32, numpy=32>, <tf.Tensor: shape=(), dtype=int32, numpy=14>, <tf.Tensor: shape=(), dtype=int32, numpy=11>, <tf.Tensor: shape=(), dtype=float32, numpy=-0.91828436>, <tf.Tensor: shape=(), dtype=float32, numpy=0.29887632>, <tf.Tensor: shape=(), dtype=float32, numpy=-0.4598584>, <tf.Tensor: shape=(), dtype=float32, numpy=-1.1088414>, <tf.Tensor: shape=(), dtype=float32, numpy=-0.4057387>, <tf.Tensor: shape=(), dtype=float32, numpy=-2.1537204>, <tf.Tensor: shape=(), dtype=int32, numpy=15>, <tf.Tensor: shape=(), dtype=int32, numpy=45>)
    (<tf.Tensor: shape=(), dtype=int32, numpy=11>, <tf.Tensor: shape=(), dtype=int32, numpy=17>, <tf.Tensor: shape=(), dtype=int32, numpy=17>, <tf.Tensor: shape=(), dtype=int32, numpy=2>, <tf.Tensor: shape=(), dtype=float32, numpy=0.93560874>, <tf.Tensor: shape=(), dtype=float32, numpy=-2.4382026>, <tf.Tensor: shape=(), dtype=float32, numpy=-1.7638668>, <tf.Tensor: shape=(), dtype=float32, numpy=0.65431964>, <tf.Tensor: shape=(), dtype=float32, numpy=-0.7130539>, <tf.Tensor: shape=(), dtype=float32, numpy=-0.96356>, <tf.Tensor: shape=(), dtype=int32, numpy=15>, <tf.Tensor: shape=(), dtype=int32, numpy=18>)
    (<tf.Tensor: shape=(), dtype=int32, numpy=42>, <tf.Tensor: shape=(), dtype=int32, numpy=27>, <tf.Tensor: shape=(), dtype=int32, numpy=34>, <tf.Tensor: shape=(), dtype=int32, numpy=8>, <tf.Tensor: shape=(), dtype=float32, numpy=0.3965425>, <tf.Tensor: shape=(), dtype=float32, numpy=-0.2588629>, <tf.Tensor: shape=(), dtype=float32, numpy=-0.84179455>, <tf.Tensor: shape=(), dtype=float32, numpy=0.114052325>, <tf.Tensor: shape=(), dtype=float32, numpy=-0.9591451>, <tf.Tensor: shape=(), dtype=float32, numpy=0.94621265>, <tf.Tensor: shape=(), dtype=int32, numpy=28>, <tf.Tensor: shape=(), dtype=int32, numpy=7>)
    

    If you follow the the tutorial I linked, you should be able to feed the split data directly to tff.learning.from_keras_model.