Search code examples
pythontensorflow2.0tensorflow-servingtfxkubeflow-pipelines

TFX running multiple trainers simultaneously


I'm new to tfx and learning to put together the pipeline. I've successfully built a pipeline in Kubeflow on GCP. I like to know how can I run multiple trainers that will generate different outputs/pushers in the pipeline using the same csvexamplegen, transform, and schemagen. Has anyone done this before? Please advise and thanks in advance.

  def create_pipeline(
    pipeline_name: Text,
    pipeline_root: Text,
    data_path: Text,
    preprocessing_fn: Text,
    run_fn: Text,
    train_args: trainer_pb2.TrainArgs,
    eval_args: trainer_pb2.EvalArgs,
    eval_accuracy_threshold: float,
    serving_model_dir: Text,
    metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None
) -> pipeline.Pipeline:


      trainer_args = {
      'run_fn': run_fn,
      'transformed_examples': transform.outputs['transformed_examples'],
      'schema': schema_gen.outputs['schema'],
      'transform_graph': transform.outputs['transform_graph'],
      'train_args': train_args,
      'eval_args': eval_args,
      'custom_executor_spec':
          executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor)}

      trainer = Trainer(**trainer_args)
      components.append(trainer)

return pipeline.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      enable_cache=True,
      metadata_connection_config=metadata_connection_config,
      beam_pipeline_args=beam_pipeline_args,
  )

Solution

  • To make the trainers unique that the Kubeflow to pick up, I have to put in the instance_name to define the trainers.

    trainer2 = Trainer(
          run_fn=run_fn2,
          examples=transform.outputs['transformed_examples'],
          schema=schema_gen.outputs['schema'],
          transform_graph= transform.outputs['transform_graph'],
          train_args= train_args2,
          eval_args= eval_args2,
          custom_executor_spec= executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
          instance_name='trainer2'
      )