Search code examples
google-cloud-platformgoogle-cloud-dataflowapache-beam

Executing Apache Beam's code in GCP Dataflow


I already wrote a whole pipeline that suscribes from and to Kafka topics and do some operations using Apache Beam. Here is a piece of my current code:

    public static void iot_topic_connection(String IP) {
        System.out.println( "Initiating connection with iot topic" );
        Pipeline pipeline = Pipeline.create();
        PCollection<KafkaRecord<String, String>> pCollectionA = pipeline.apply(KafkaIO.<String, String>read()
                .withBootstrapServers(IP)
                .withTopic("iotA")
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
        );
        PCollection<KafkaRecord<String, String>> pCollectionB = pipeline.apply(KafkaIO.<String, String>read()
                .withBootstrapServers(IP)
                .withTopic("iotB")
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
        );

        //mapping and windowing
        PCollection<KV<String, String>> wrdA = pCollectionA
                .apply(
                MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
                        .via((KafkaRecord<String, String> record) -> KV.of(record.getKV().getKey(), splitValue(record.getKV().getValue(),0)))
                ).apply(
                        Window.<KV<String,String>>into(FixedWindows.of(Duration.standardSeconds(30)))
                                .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
                                .withAllowedLateness(Duration.ZERO).accumulatingFiredPanes()
                );

        PCollection<KV<String, String>> wrdB = pCollectionB
                .apply(
                MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
                        .via((KafkaRecord<String, String> record) -> KV.of(record.getKV().getKey(), splitValue(record.getKV().getValue(),0)))
                ).apply(
                        Window.<KV<String,String>>into(FixedWindows.of(Duration.standardSeconds(30)))
                                .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
                                .withAllowedLateness(Duration.ZERO).accumulatingFiredPanes()
                );

        TupleTag<String> wrdA_tag = new TupleTag<>();
        TupleTag<String> wrdB_tag = new TupleTag<>();
        //CoGroupbyKey operation =~ join
        PCollection<KV<String, CoGbkResult>> results =
                KeyedPCollectionTuple.of(wrdA_tag, wrdA)
                        .and(wrdB_tag, wrdB)
                        .apply(CoGroupByKey.create());

        //here aggregation needs to be implemented
        PCollection<String> final_data =
                results.apply(
                        ParDo.of(
                                new DoFn<KV<String, CoGbkResult>, String>() {
                                    @ProcessElement
                                    public void processElement(ProcessContext c) {
                                        Float avg_temp = 0.0f;
                                        Integer array_lenght = 1;
                                        KV<String, CoGbkResult> e = c.element();
                                        //System.out.println(e);
                                        String key = e.getKey();
                                        Iterable<String> wrdA_obj = e.getValue().getAll(wrdA_tag);
                                        Iterable<String> wrdB_obj = e.getValue().getAll(wrdB_tag);
                                        Iterator<String> wrdA_iter = wrdA_obj.iterator();
                                        Iterator<String> wrdB_iter = wrdB_obj.iterator();
                                        while( wrdA_iter.hasNext() && wrdB_iter.hasNext() ){
                                            // Process event1 and event2 data and write to c.output
                                            String v1 = wrdA_iter.next();
                                            String v2 = wrdB_iter.next();
                                            avg_temp = (Float.parseFloat(v1) + Float.parseFloat(v2));
                                            array_lenght += 1;
                                        }
                                        if(avg_temp == 0.0f){
                                            System.out.println("Unable to join event1 and event2");
                                        }else{
                                            avg_temp = avg_temp/array_lenght;
                                            c.output(String.valueOf(avg_temp));
                                        }

                                    }
                                }));
        //write to kafka topic
        final_data.apply(KafkaIO.<Void, String>write()
                .withBootstrapServers(IP)
                .withTopic("iotOut")
                .withValueSerializer( StringSerializer.class).values());
        //Here we are starting the pipeline
        pipeline.run();
    }

This project alone works perfectly but now I want to deploy a Dataflow job to run it. I already tried GCP documentation like this but it doesnt create any Dataflow's job, just executes my code locally. Reading I found out that I have to create a flexible template, but Im not sure. What's the next step to run my code in a Dataflow job? Creating a flexible template is enough? Or do I have to change something in my code?

Here is my current pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>beam-app</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>beam-app</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
  </properties>
  <profiles>
    <profile>
      <id>dataflow-runner</id>
      <!-- Makes the DataflowRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
          <version>2.41.0</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>
  </profiles>
  <dependencies>
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
      <version>2.41.0</version>
      <scope>runtime</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
      <version>2.41.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-extensions-join-library -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-extensions-join-library</artifactId>
      <version>2.41.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>2.41.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-kafka</artifactId>
      <version>2.41.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-direct-java</artifactId>
      <version>2.41.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.2.2</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.13.2</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>

The mvn command I'm currently running on GCP's Cloud Shell:

mvn compile exec:java \
  -Dexec.mainClass=org.example.App \
  -Dexec.args=" \
  --project=round...2 \
  --runner=DataflowRunner \
  --jobName=my-first-job \
  --region=southamerica-west1 \
  --streaming=true \
  --zone=southamerica-west1-a \
  --tempLocation=gs://beam-testing-app/dataflow/temp \
  --gcpTempLocation=gs://b..p/dataflow/temp \
  --stagingLocation=gs://b...p/dataflow/staging \
  " \
  -Pdataflow-runner

Console output:

[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------< org.example:beam-app >------------------------
[INFO] Building beam-app 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
Downloading from central: https://repo.maven.apache.org/maven2/io/grpc/grpc-api/maven-metadata.xml
Downloaded from central: https://repo.maven.apache.org/maven2/io/grpc/grpc-api/maven-metadata.xml (3.0 kB at 6.0 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/io/grpc/grpc-core/maven-metadata.xml
Downloaded from central: https://repo.maven.apache.org/maven2/io/grpc/grpc-core/maven-metadata.xml (4.6 kB at 89 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/io/grpc/grpc-netty-shaded/maven-metadata.xml
Downloaded from central: https://repo.maven.apache.org/maven2/io/grpc/grpc-netty-shaded/maven-metadata.xml (3.7 kB at 89 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/io/grpc/grpc-alts/maven-metadata.xml
Downloaded from central: https://repo.maven.apache.org/maven2/io/grpc/grpc-alts/maven-metadata.xml (3.5 kB at 77 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/io/grpc/grpc-xds/maven-metadata.xml
Downloaded from central: https://repo.maven.apache.org/maven2/io/grpc/grpc-xds/maven-metadata.xml (2.5 kB at 50 kB/s)
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ beam-app ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /home/user/beam-app/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ beam-app ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 2 source files to /home/user/beam-app/target/classes
[INFO]
[INFO] --- exec-maven-plugin:3.1.0:java (default-cli) @ beam-app ---
Initiating connection with iot topic
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

Solution

  • I found out what I was missing: I had to create a PipelineOptions object and pass the dataflow's parameters to it

            PipelineOptions options =
                PipelineOptionsFactory.fromArgs(params).withValidation().create();
    

    Then, pass this variable to the pipeline:

    Pipeline pipeline = Pipeline.create(options);
    

    Params is exec.args in the mvn console command