I am writing a DoFn
in which its instance variable elements
(i.e., a shared resource) can be mutated in the @ProcessElement
method:
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.transforms.DoFn;
public class DemoDoFn extends DoFn<String, Void> {
private final int batchSize;
private transient List<String> elements;
public DemoDoFn(int batchSize) {
this.batchSize = batchSize;
}
@StartBundle
public void startBundle() {
elements = new ArrayList<>();
}
@ProcessElement
public void processElement(@Element String element, ProcessContext context) {
elements.add(element); // <-------- mutated
if (elements.size() >= batchSize) {
flushBatch();
}
}
@FinishBundle
public void finishBundle() {
flushBatch();
}
private void flushBatch() {
// Flush all elements, e.g., send all elements in a single API call to a server
// Initialize a new array list for next batch
elements = new ArrayList<>(); // <-------- mutated
}
}
Question 1: Do I need to add the synchronized
keyword to the @ProcessElement
method in order to avoid a race condition?
According to Apache Beam Thread-compatibility: "Each instance of your function (DoFn) object is accessed by a single thread at a time on a worker instance, unless you explicitly create your own threads. Note, however, that the Beam SDKs are not thread-safe. If you create your own threads in your user code, you must provide your own synchronization."
Question 2: Does "Each instance of your function object is accessed by a single thread at a time on a worker instance" indicate that Beam will synchronize @ProcessElement
or the entire DoFn behind the scenes?
This IBM paper points out that and I quote
The paper seems to indicate that the entire DoFn invocation is synchronized.
I know this is old question but since I was researching the same thing - no, you don't need synchronized for your processElement because as you quoted: "Each instance of your function (DoFn) object is accessed by a single thread at a time on a worker instance"
Here is example of beam's official class that mutates instance variable https://github.com/apache/beam/blob/0c01636fc8610414859d946cb93eabc904123fc8/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L1369