Search code examples
pythonmachine-learningpytorchdataloaderpytorch-datapipe

How to handle Pytorch Dataset with transform function that returns >1 output per row of data?


Given a myfile.csv file that looks like:

imagefile,label
train/0/16585.png,0
train/0/56789.png,0

The goal is to create a Pytorch DataLoader that when looped return 2x the data points, e.g.

>>> dp = MyDataPipe(csvfile)
>>> for row in dp.train_dataloader:
...     print(row)
...
(tensor([1.23, 4.56, 7.89]), 0)
(tensor([9.87, 6.54, 3.21]), 1)
(tensor([9.99, 8.88, 7.77]), 0)
(tensor([1.11, 2.22, 9.87]), 1)

I've tried writing the dataloader if we are just expect the same no. of dataloader's row as per the input file, this works:

import torch 

from torch.utils.data import DataLoader2
from torchdata.datapipes.iter import IterDataPipe, IterableWrapper
import pytorch_lightning as pl


content = """imagefile,label
train/0/16585.png,0
train/0/56789.png,0"""

with open('myfile.csv', 'w') as fout:
    fout.write(content)


def optimus_prime(row):
    """This functions returns two data points with some arbitrary vectors.
    >>> row = {'imagefile': 'train/0/16585.png', label: 0}
    >>> optimus_prime(row)
    (tensor([1.23, 4.56, 7.89]), 0)
    """
    # We are using torch.rand here but there is an actual function
    # that converts the png file into a vector.
    vector1 = torch.rand(3) 
    return vector1, row['label']
    

class MyDataPipe(pl.LightningDataModule):
    def __init__(
        self,
        csv_files: list[str],
        skip_lines: int = 0,
        tranform_func: Callable = None
    ):
        super().__init__()
        self.csv_files: list[str] = csv_files
        self.skip_lines: int = skip_lines

        # Initialize a datapipe.
        self.dp_chained_datapipe: IterDataPipe = (
            IterableWrapper(iterable=self.csv_files)
            .open_files()
            .parse_csv_as_dict(skip_lines=self.skip_lines)
        )
            
        if tranform_func:
            self.dp_chained_datapipe = self.dp_chained_datapipe.map(tranform_func)

    def train_dataloader(self, batch_size=1) -> DataLoader2:
        return DataLoader2(dataset=self.dp_chained_datapipe, batch_size=batch_size)

dp = MyDataPipe('myfile.csv', tranform_func=optimus_prime)

for row in dp.train_dataloader:
    print(row)

If the optimus_prime function returns 2 data points, how do I setup the Dataloader such that it can collate the 2 data points accordingly?

How to formulate the collate function or tell the Dataloader that there's 2 inputs in each .map(tranform_func) output? E.g. if I change the function to:

def optimus_prime(row):
    """This functions returns two data points with some arbitrary vectors.
    >>> row = {'imagefile': 'train/0/16585.png', label: 0}
    >>> optimus_prime(row)
    (tensor([1.23, 4.56, 7.89]), 0), (tensor([3.21, 6.54, 9.87]), 1)
    """
    # We are using torch.rand here but there is an actual function
    # that converts the png file into a vector.
    vector1 = torch.rand(3) 
    yield vector1, row['label']
    yield vector2, not row['label']

I've also tried the following and it works but I need to run the optimus_prime function twice, but the 2nd .map(tranform_func) throws a TypeError: tuple indices must be integers or slice not str...


def optimus_prime_1(row):
    # We are using torch.rand here but there is an actual function
    # that converts the png file into a vector.
    vector1 = torch.rand(3) 
    yield vector1, row['label']

def optimus_prime_2(row):
    # We are using torch.rand here but there is an actual function
    # that converts the png file into a vector.
    vector2 = torch.rand(3) 
    yield vector2, not row['label']
    

class MyDataPipe(pl.LightningDataModule):
    def __init__(
        self,
        csv_files: list[str],
        skip_lines: int = 0,
        tranform_funcs: list[Callable] = None
    ):
        super().__init__()
        self.csv_files: list[str] = csv_files
        self.skip_lines: int = skip_lines

        # Initialize a datapipe.
        self.dp_chained_datapipe: IterDataPipe = (
            IterableWrapper(iterable=self.csv_files)
            .open_files()
            .parse_csv_as_dict(skip_lines=self.skip_lines)
        )
            
        if tranform_funcs:
            for tranform_func in tranform_funcs:
                self.dp_chained_datapipe = self.dp_chained_datapipe.map(tranform_func)

    def train_dataloader(self, batch_size=1) -> DataLoader2:
        return DataLoader2(dataset=self.dp_chained_datapipe, batch_size=batch_size)

dp = MyDataPipe('myfile.csv', tranform_funcs=[optimus_prime_1, optimus_prime_2])

for row in dp.train_dataloader:
    print(row)

Solution

  • From https://discuss.pytorch.org/t/how-to-handle-pytorch-dataset-with-transform-function-that-returns-1-output-per-row-of-data/162160, there's a reference to use .flatmap() instead of .map():

    By changing the transformation function to return N no. of data points per row of data form the csvfile, e.g.

    def optimus_prime(row):
        """This functions returns two data points with some arbitrary vectors.
        >>> row = {'imagefile': 'train/0/16585.png', label: 0}
        >>> optimus_prime(row)
        (tensor([1.23, 4.56, 7.89]), 0)
        """
        # We are using torch.rand here but there is an actual function
        # that converts the png file into a vector.
        vector1 = torch.rand(3) 
        vector2 = torch.rand(3) 
        return [(vector1, row['label']), (vector2, row['label'])] 
        
    

    Changing the code to use the .flatmap() as such works:

    
    class MyDataPipe(pl.LightningDataModule):
        def __init__(
            self,
            csv_files,
            skip_lines=0
        ):
            super().__init__()
            self.csv_files: list[str] = csv_files
            self.skip_lines: int = skip_lines
    
            # Initialize a datapipe.
            self.dp_chained_datapipe: IterDataPipe = (
                IterableWrapper(iterable=self.csv_files)
                .open_files()
                .parse_csv_as_dict(skip_lines=self.skip_lines)
            )
                
            self.dp_chained_datapipe = self.dp_chained_datapipe.flatmap(optimus_prime)
    
        def train_dataloader(self, batch_size=1) -> DataLoader2:
            return DataLoader2(dataset=self.dp_chained_datapipe, batch_size=batch_size)
    

    Full working example:

    import torch 
    
    from torch.utils.data import DataLoader2
    import pytorch_lightning as pl
    from torchdata.datapipes.iter import IterDataPipe, IterableWrapper
    
    content = """imagefile,label
    train/0/16585.png,0
    train/0/56789.png,0"""
    
    with open('myfile.csv', 'w') as fout:
        fout.write(content)
    
    def optimus_prime(row):
        """This functions returns two data points with some arbitrary vectors.
        >>> row = {'imagefile': 'train/0/16585.png', label: 0}
        >>> optimus_prime(row)
        (tensor([1.23, 4.56, 7.89]), 0)
        """
        # We are using torch.rand here but there is an actual function
        # that converts the png file into a vector.
        vector1 = torch.rand(3) 
        vector2 = torch.rand(3) 
        return [(vector1, row['label']), (vector2, row['label'])] 
        
    
    class MyDataPipe(pl.LightningDataModule):
        def __init__(
            self,
            csv_files,
            skip_lines=0
        ):
            super().__init__()
            self.csv_files: list[str] = csv_files
            self.skip_lines: int = skip_lines
    
            # Initialize a datapipe.
            self.dp_chained_datapipe: IterDataPipe = (
                IterableWrapper(iterable=self.csv_files)
                .open_files()
                .parse_csv_as_dict(skip_lines=self.skip_lines)
            )
                
            self.dp_chained_datapipe = self.dp_chained_datapipe.flatmap(optimus_prime)
    
        def train_dataloader(self, batch_size=1) -> DataLoader2:
            return DataLoader2(dataset=self.dp_chained_datapipe, batch_size=batch_size)
    
    dp = MyDataPipe(['myfile.csv'])
    
    for row in dp.train_dataloader():
        print(row)
    

    [out]:

    [tensor([[0.6003, 0.1200, 0.5175]]), ('0',)]
    [tensor([[0.0628, 0.7004, 0.3169]]), ('0',)]
    [tensor([[0.0623, 0.4608, 0.7456]]), ('0',)]
    [tensor([[0.7454, 0.5326, 0.7459]]), ('0',)]