I already wrote a whole pipeline that suscribes from and to Kafka topics and do some operations using Apache Beam. Here is a piece of my current code:
public static void iot_topic_connection(String IP) {
System.out.println( "Initiating connection with iot topic" );
Pipeline pipeline = Pipeline.create();
PCollection<KafkaRecord<String, String>> pCollectionA = pipeline.apply(KafkaIO.<String, String>read()
.withBootstrapServers(IP)
.withTopic("iotA")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
);
PCollection<KafkaRecord<String, String>> pCollectionB = pipeline.apply(KafkaIO.<String, String>read()
.withBootstrapServers(IP)
.withTopic("iotB")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
);
//mapping and windowing
PCollection<KV<String, String>> wrdA = pCollectionA
.apply(
MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via((KafkaRecord<String, String> record) -> KV.of(record.getKV().getKey(), splitValue(record.getKV().getValue(),0)))
).apply(
Window.<KV<String,String>>into(FixedWindows.of(Duration.standardSeconds(30)))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.withAllowedLateness(Duration.ZERO).accumulatingFiredPanes()
);
PCollection<KV<String, String>> wrdB = pCollectionB
.apply(
MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via((KafkaRecord<String, String> record) -> KV.of(record.getKV().getKey(), splitValue(record.getKV().getValue(),0)))
).apply(
Window.<KV<String,String>>into(FixedWindows.of(Duration.standardSeconds(30)))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.withAllowedLateness(Duration.ZERO).accumulatingFiredPanes()
);
TupleTag<String> wrdA_tag = new TupleTag<>();
TupleTag<String> wrdB_tag = new TupleTag<>();
//CoGroupbyKey operation =~ join
PCollection<KV<String, CoGbkResult>> results =
KeyedPCollectionTuple.of(wrdA_tag, wrdA)
.and(wrdB_tag, wrdB)
.apply(CoGroupByKey.create());
//here aggregation needs to be implemented
PCollection<String> final_data =
results.apply(
ParDo.of(
new DoFn<KV<String, CoGbkResult>, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
Float avg_temp = 0.0f;
Integer array_lenght = 1;
KV<String, CoGbkResult> e = c.element();
//System.out.println(e);
String key = e.getKey();
Iterable<String> wrdA_obj = e.getValue().getAll(wrdA_tag);
Iterable<String> wrdB_obj = e.getValue().getAll(wrdB_tag);
Iterator<String> wrdA_iter = wrdA_obj.iterator();
Iterator<String> wrdB_iter = wrdB_obj.iterator();
while( wrdA_iter.hasNext() && wrdB_iter.hasNext() ){
// Process event1 and event2 data and write to c.output
String v1 = wrdA_iter.next();
String v2 = wrdB_iter.next();
avg_temp = (Float.parseFloat(v1) + Float.parseFloat(v2));
array_lenght += 1;
}
if(avg_temp == 0.0f){
System.out.println("Unable to join event1 and event2");
}else{
avg_temp = avg_temp/array_lenght;
c.output(String.valueOf(avg_temp));
}
}
}));
//write to kafka topic
final_data.apply(KafkaIO.<Void, String>write()
.withBootstrapServers(IP)
.withTopic("iotOut")
.withValueSerializer( StringSerializer.class).values());
//Here we are starting the pipeline
pipeline.run();
}
This project alone works perfectly but now I want to deploy a Dataflow job to run it. I already tried GCP documentation like this but it doesnt create any Dataflow's job, just executes my code locally. Reading I found out that I have to create a flexible template, but Im not sure. What's the next step to run my code in a Dataflow job? Creating a flexible template is enough? Or do I have to change something in my code?
Here is my current pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>beam-app</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>beam-app</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<profiles>
<profile>
<id>dataflow-runner</id>
<!-- Makes the DataflowRunner available when running a pipeline. -->
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>2.41.0</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>2.41.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>2.41.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-extensions-join-library -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-join-library</artifactId>
<version>2.41.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.41.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>2.41.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.41.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
The mvn command I'm currently running on GCP's Cloud Shell:
mvn compile exec:java \
-Dexec.mainClass=org.example.App \
-Dexec.args=" \
--project=round...2 \
--runner=DataflowRunner \
--jobName=my-first-job \
--region=southamerica-west1 \
--streaming=true \
--zone=southamerica-west1-a \
--tempLocation=gs://beam-testing-app/dataflow/temp \
--gcpTempLocation=gs://b..p/dataflow/temp \
--stagingLocation=gs://b...p/dataflow/staging \
" \
-Pdataflow-runner
Console output:
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------< org.example:beam-app >------------------------
[INFO] Building beam-app 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
Downloading from central: https://repo.maven.apache.org/maven2/io/grpc/grpc-api/maven-metadata.xml
Downloaded from central: https://repo.maven.apache.org/maven2/io/grpc/grpc-api/maven-metadata.xml (3.0 kB at 6.0 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/io/grpc/grpc-core/maven-metadata.xml
Downloaded from central: https://repo.maven.apache.org/maven2/io/grpc/grpc-core/maven-metadata.xml (4.6 kB at 89 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/io/grpc/grpc-netty-shaded/maven-metadata.xml
Downloaded from central: https://repo.maven.apache.org/maven2/io/grpc/grpc-netty-shaded/maven-metadata.xml (3.7 kB at 89 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/io/grpc/grpc-alts/maven-metadata.xml
Downloaded from central: https://repo.maven.apache.org/maven2/io/grpc/grpc-alts/maven-metadata.xml (3.5 kB at 77 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/io/grpc/grpc-xds/maven-metadata.xml
Downloaded from central: https://repo.maven.apache.org/maven2/io/grpc/grpc-xds/maven-metadata.xml (2.5 kB at 50 kB/s)
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ beam-app ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /home/user/beam-app/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ beam-app ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 2 source files to /home/user/beam-app/target/classes
[INFO]
[INFO] --- exec-maven-plugin:3.1.0:java (default-cli) @ beam-app ---
Initiating connection with iot topic
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
I found out what I was missing: I had to create a PipelineOptions object and pass the dataflow's parameters to it
PipelineOptions options =
PipelineOptionsFactory.fromArgs(params).withValidation().create();
Then, pass this variable to the pipeline:
Pipeline pipeline = Pipeline.create(options);
Params is exec.args
in the mvn console command