The code snippet snippet as below is frequently used to train an EncoderDecoderModel
from Huggingface's transformer library
from transformers import EncoderDecoderModel
from transformers import PreTrainedTokenizerFast
multibert = EncoderDecoderModel.from_encoder_decoder_pretrained(
"bert-base-multilingual-uncased", "bert-base-multilingual-uncased"
)
tokenizer = PreTrainedTokenizerFast.from_pretrained("bert-base-multilingual-uncased")
...
wmt14
dataset:import datasets
train_data = datasets.load_dataset("wmt14", "de-en", split="train")
val_data = datasets.load_dataset("wmt14", "de-en", split="validation[:10%]")
from functools import partial
def process_data_to_model_inputs(batch, encoder_max_length=512, decoder_max_length=512, batch_size=2):
inputs = tokenizer([segment["en"] for segment in batch['translation']],
padding="max_length", truncation=True, max_length=encoder_max_length)
outputs = tokenizer([segment["de"] for segment in batch['translation']],
padding="max_length", truncation=True, max_length=encoder_max_length)
batch["input_ids"] = inputs.input_ids
batch["attention_mask"] = inputs.attention_mask
batch["decoder_input_ids"] = outputs.input_ids
batch["decoder_attention_mask"] = outputs.attention_mask
batch["labels"] = outputs.input_ids.copy()
# because BERT automatically shifts the labels, the labels correspond exactly to `decoder_input_ids`.
# We have to make sure that the PAD token is ignored
batch["labels"] = [[-100 if token == tokenizer.pad_token_id else token for token in labels] for labels in batch["labels"]]
return batch
def munge_dataset_to_pacify_bert(dataset, encoder_max_length=512, decoder_max_length=512, batch_size=2):
bert_wants_to_see = ["input_ids", "attention_mask", "decoder_input_ids",
"decoder_attention_mask", "labels"]
_process_data_to_model_inputs = partial(process_data_to_model_inputs,
encoder_max_length=encoder_max_length,
decoder_max_length=decoder_max_length,
batch_size=batch_size
)
dataset = dataset.map(_process_data_to_model_inputs,
batched=True,
batch_size=batch_size
)
dataset.set_format(type="torch", columns=bert_wants_to_see)
return dataset
train_data = munge_dataset_to_pacify_bert(train_data)
val_data = munge_dataset_to_pacify_bert(val_data)
from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments
# set training arguments - these params are not really tuned, feel free to change
training_args = Seq2SeqTrainingArguments(
output_dir="./",
evaluation_strategy="steps",
...
)
# instantiate trainer
trainer = Seq2SeqTrainer(
model=multibert,
tokenizer=tokenizer,
args=training_args,
train_dataset=train_data,
eval_dataset=val_data,
)
trainer.train()
A working example can be found on something like: https://www.kaggle.com/code/alvations/neural-plasticity-bert2bert-on-wmt14
.txt
or .tsv
files, not a pre-coded datasetGiven a large .tsv
file (e.g. 1 billion lines), e.g.
hello world\tHallo Welt
how are you?\twie gehts?
...\t...
import vaex # Using vaex
import sys
filename = "train.en-de.tsv"
df = vaex.from_csv(filename, sep="\t", header=None, names=["src", "trg"], convert=True, chunk_size=50_000_000)
df.export(f"{filename}.parquet")
datasets.Dataset
object and use the munge_dataset_to_pacify_bert()
as shown above, e.gfrom datasets import Dataset, load_from_disk
import pyarrow as pa
_ds = Dataset(pa.compute.drop_null(pa.parquet.read_table('train.en-de.tsv.parquet')
_ds.save_to_disk('train.en-de.tsv.parquet.hfdataset')
_ds = load_from_disk('train.en-de.tsv.parquet.hfdataset')
train_data = munge_dataset_to_pacify_bert(_ds)
train_data.save_to_disk('train.en-de.tsv.parquet.hfdataset')
.save_to_disk()
function seems like it is runningf "forever" and the end is no where in sight.Breaking down the steps in the munge_dataset_to_pacify_bert()
, there are 2 sub-functions:
dataset.map(_process_data_to_model_inputs, batched=True, batch_size=batch_size)
dataset.set_format(type="torch", columns=bert_wants_to_see)
For the .map()
process, it's possible to scale in parallel threads by specifying by
dataset.map(_process_data_to_model_inputs,
batched=True, batch_size=100,
num_proc=32 # num of parallel threads.
)
And when I tried to process with
num_proc=32
batch_size=100
The .map()
function finishes the processing of 500 million lines in 18 hours of compute time on Intel Xeon E5-2686 @ 2.3GHz with 32 processor cores, optimally.
But somehow the .map()
function created 32 temp .arrow
files and 128 tmp...
binary files. Seemingly the last save_to_disk
function has been running for more than 10+ hours and have not finished combining the temp files in parts to save the final HF Dataset to disk.
Given the above context, my questions in parts are:
.arrow
and tmp...
files, is there a way to read these individually instead of try to save them into a final directory using the save_to_disk()
function?save_to_disk()
function so slow after the mapping and how can the mapped processed data be saved in a faster manner?.set_format()
function after the .map()
and make it part of the _process_data_to_model_inputs
function?(Answer's credits goes to @lhoestq)
If you have a TSV file that looks like this:
hello world\tHallo Welt
how are you?\twie gehts?
...\t...
load the dataset as such:
# tatoeba-sentpairs.tsv is a pretty large file.
ds = load_dataset("csv", data_files="../input/tatoeba/tatoeba-sentpairs.tsv",
streaming=True, delimiter="\t", split="train")
munge_dataset_to_pacify_bert
is also quite expensive operation. If that is done for 1B lines and even if it's thread-parallelized, it will take hours to days to completedataset.set_format(type="torch")
is massive, a ~50GB of tsv with 1B lines will easily become TBs of binaries.Huggingface datasets
supports it with stream=True
when defining the dataset:
ds = load_dataset("csv", data_files="../input/tatoeba/tatoeba-sentpairs.tsv",
streaming=True, delimiter="\t", split="train")