Search code examples
google-cloud-dataflowapache-beamapache-beam-io

How to save DeferredDataFrame to feather with DataflowRunner?


I'm trying to compute a sentence-transformers model for various rows stored in BigQuery, and then store them in a feather dataframe in Google Cloud Storage.

However, I'm having problems in saving the actual dataframe. I'm not able to save it locally or in Google Cloud Storage, but get no error.

Here's a reproducible example I've come up with:

import apache_beam as beam
from apache_beam.ml.inference.base import (
    ModelHandler,
    PredictionResult,
    RunInference,
)
from sentence_transformers import SentenceTransformer
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from typing import Sequence, Optional, Any, Dict, Iterable
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.dataframe.convert import to_dataframe

ENCODING_MODEL_NAME = "distiluse-base-multilingual-cased-v1"


class EmbeddingModelHandler(
    ModelHandler[str, PredictionResult, SentenceTransformer]
):
    def __init__(self, model_name: str = ENCODING_MODEL_NAME):
        self._model_name = model_name

    def load_model(self) -> SentenceTransformer:
        from sentence_transformers import (
            SentenceTransformer,
        )  # <- These imports are needed otherwise GCP complains
        import sentence_transformers

        return sentence_transformers.SentenceTransformer(self._model_name)

    def run_inference(
        self,
        batch: Sequence[str],
        model: SentenceTransformer,
        inference_args: Optional[Dict[str, Any]] = None,
    ) -> Iterable[PredictionResult]:
        from sentence_transformers import SentenceTransformer
        import sentence_transformers

        embedding_matrix = model.encode(
            batch, show_progress_bar=True, normalize_embeddings=True
        )

        return embedding_matrix


class GetFeatures(beam.DoFn):
    def process(self, element):
        feature = element.get("overview", "")
        iid = element.get("iid")
        return [(iid, feature)]


def run(argv=None):

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--output",
        dest="output",
        required=True,
        help="Output file to write results to.",
    )
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args)

    with beam.Pipeline(options=pipeline_options) as pipeline:

        embedding_dataframe = (
            pipeline
            | "Read BigQuery"
            >> beam.io.ReadFromBigQuery(
                query="""SELECT text_to_embed,
                                identifier
                                FROM  [gcp-project:gcp-dataset.gcp-table]
                                LIMIT 20
                                """,
                project="gcp-project",
                gcs_location="gs://ml-apache-beam/tmp/",
            )
            | "Get features" >> beam.ParDo(GetFeatures())
            | "Run inference"
            >> RunInference(
                KeyedModelHandler(EmbeddingModelHandler(ENCODING_MODEL_NAME))
            )
            | "To Rows"
            >> beam.Map(
                lambda element: __import__("beam").Row(
                    biggint=int(element[0]), embedding=element[1].tolist()
                )
            )
        )

    df = to_dataframe(embedding_dataframe)
    df.to_feather(known_args.output)


if __name__ == "__main__":
    run()

And my requirements.txt:

sentence-transformers==2.2.2

With python 3.8.14

To run it locally, I use:

python beam_pipeline.py   --requirements_file requirements.txt  --output embedding_output.feather 

Which runs fine, but I see no embedding_output.feather in the directory.

And to run it on GCP:

python beam_pipeline.py   --requirements_file requirements.txt  --output "gs://my-bucket/embedding_output.feather" --runner DataflowRunner --project my-gcp-project --region us-central1

Also runs fine, but the gs://my-bucket/embedding_output.feather file is not there as well.


Solution

  • Thanks @TheNeuralBit for the help here!

    The problem is that this part of the code:

        df = to_dataframe(embedding_dataframe)
        df.to_feather(known_args.output)
    

    Was outside of the pipeline block:

    with beam.Pipeline(options=pipeline_options) as pipeline:
    

    I indented the to_dataframe part and it all worked great:

    # ... 
    def run(argv=None):
    
        parser = argparse.ArgumentParser()
        parser.add_argument(
            "--output",
            dest="output",
            required=True,
            help="Output file to write results to.",
        )
        known_args, pipeline_args = parser.parse_known_args(argv)
        pipeline_options = PipelineOptions(pipeline_args)
    
        with beam.Pipeline(options=pipeline_options) as pipeline:
    
            embedding_dataframe = (
                pipeline
                | "Read BigQuery"
                >> beam.io.ReadFromBigQuery(
                    query="""SELECT text_to_embed,
                                    identifier
                                    FROM  [gcp-project:gcp-dataset.gcp-table]
                                    LIMIT 20
                                    """,
                    project="gcp-project",
                    gcs_location="gs://ml-apache-beam/tmp/",
                )
                | "Get features" >> beam.ParDo(GetFeatures())
                | "Run inference"
                >> RunInference(
                    KeyedModelHandler(EmbeddingModelHandler(ENCODING_MODEL_NAME))
                )
                | "To Rows"
                >> beam.Map(
                    lambda element: __import__("beam").Row(
                        biggint=int(element[0]), embedding=element[1].tolist()
                    )
                )
            )
    
            df = to_dataframe(embedding_dataframe)
            df.to_feather(known_args.output)
    
    
    if __name__ == "__main__":
        run()