I am trying to run an Apache Beam flow using Google Cloud Dataflow Flex-template but the template I created continue to fail with the following error message:
java.lang.IllegalArgumentException: Transform ReadingFileFromSamples-Read-Impulse is not a composite transform but does not have a specified URN.
outputs {
key: "org.apache.beam.sdk.values.PCollection.<init>:397#bb20b45fd4d95138"
value: "ReadingFileFromSamples/Read/Impulse.out"
}
The code is very simple Kotlin version of wordcount using Apache Beam v2.51.0:
fun main(args: Array<String>) {
val logger = LoggerFactory.getLogger("app.fp8.sample.kbeam.MainTK")
val options = PipelineOptionsFactory.fromArgs(*args).withValidation().`as`(IAppOptions::class.java)
// options.gcpCredential = AppHelper.getGCloudCredentials(options.serviceAccountPath)
// Define pipeline
val pipe = Pipeline.create(options)
pipe.apply(
"ReadingFileFromSamples",
TextIO.read().from(options.inputFilePattern)
).apply(
"CountWords",
WordCounterTransform(options.outputCountGreaterThan)
).apply(
"OutputLogs",
PassthroughTransform<String> {
println("Result -> $it")
}
)
// Run the pipeline
try {
logger.info("Start running pipeline reading from {}", options.inputFilePattern)
pipe.run()
} catch (e: RuntimeException) {
logger.error("Job failed due to runtime exception: ${e.message}", e)
exitProcess(1)
} catch (e: Exception) {
logger.error("Unexpected Exception: ${e.message}", e)
exitProcess(1)
}
}
The error is clearly on the step of TextIO.read()
where the inputFilePattern
is set to gs://apache-beam-samples/shakespeare/*
. The error seems to imply that I am attempting to read a Impulse.out
which is not true.
Any idea what that error means?
After a lot of research, I came across the issue below that is similar to the question I raised:
https://github.com/apache/beam/issues/28034
I am too building a fat jar and the moment that I replaced one jar with all the required jars to the --jar
method of gcloud dataflow flex-template build
, I finally managed to run the flex template in Dataflow.
Which made me wonder why Google doesn't allow passing wild card into the --jar option. The lack of this wild card option is what led me to create fat jar in the first place.