While going through the Google Cloud Dataflow WordCount Pipeline Example and creating a Scala app to run the pipeline locally, I'm getting the following exception:
Exception in thread "main" java.lang.NullPointerException
at com.google.cloud.dataflow.sdk.util.SerializableUtils.clone(SerializableUtils.java:89)
at com.google.cloud.dataflow.sdk.transforms.ParDo$Bound.<init>(ParDo.java:700)
at com.google.cloud.dataflow.sdk.transforms.ParDo$Unbound.of(ParDo.java:661)
at com.google.cloud.dataflow.sdk.transforms.ParDo.of(ParDo.java:551)
at apps.MiniDataFlowApp$.delayedEndpoint$apps$MiniDataFlowApp$1(MiniDataFlowApp.scala:32)
at apps.MiniDataFlowApp$delayedInit$body.apply(MiniDataFlowApp.scala:17)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at apps.MiniDataFlowApp$.main(MiniDataFlowApp.scala:17)
at apps.MiniDataFlowApp.main(MiniDataFlowApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
where MiniDataFlowApp.scala:32
corresponds to .apply(ParDo.of(extractWords))
within the following snippet that creates the pipeline:
val p: Pipeline = Pipeline.create(options)
p.apply(TextIO.Read.from("some.input.txt"))
.apply(ParDo.of(extractWords))
.apply(Count.perElement[String]())
.apply(ParDo.of(formatOutput))
.apply(TextIO.Write.to("some.output.txt"))
extractWords
implements DoFn
as follows:
val extractWords = new DoFn[String, String]() {
override def processElement(c: DoFn[String, String]#ProcessContext) {
c.element.split("[^a-zA-Z']+").filter(_.nonEmpty).map(_ => c.output(_))
}
}
The issue described in this StackOverflow question seems similar. However, I don't think I have an unserializable class like the one causing the problem on that question. At least, I can't see why I might be having a serialization problem if that's the issue.
Thank you for taking the time to read my question and any insights!
This is an initialization-order problem. In Scala, vals in class bodies (objects are singleton instances of a corresponding class) are initialized in declaration order.
This means that when p gets initialized, extractWords and formatOutput haven't been, and are null. (I've seen the rest of the OP's code; those members are vals that come after val p.)
There are at least 3 solutions:
1) Change the order of the vals to make the dependencies (extractWords, formatOutput) come first.
2) Make extractWords and formatOutput lazy vals. This will make them get initialized when they're accessed, and guarantee they're initialized only once.
3) Make extractWords and formatOutput defs. This will make them get re-computed on each access, which might or might not be ok.