Search code examples
google-cloud-dataflowapache-beamapache-beam-io

Apache Beam Dataflow Bigquery Streming insertions out of memory error


I'm intermittently getting out-of-memory issues on the dataflow job when inserting the data into Bigauqery using Apache Beam SDK for Java 2.29.0.

Here is the stack trace

    Error message from worker: java.lang.RuntimeException: java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:982)
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:1022)
        org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.flushRows(BatchedStreamingWrite.java:375)
        org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.access$800(BatchedStreamingWrite.java:69)
        org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite$BatchAndInsertElements.finishBundle(BatchedStreamingWrite.java:271)
Caused by: java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
        java.base/java.lang.Thread.start0(Native Method)
        java.base/java.lang.Thread.start(Thread.java:803)
        java.base/java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:937)
        java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1343)
        java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:129)
        java.base/java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:724)
        com.google.api.client.http.javanet.NetHttpRequest.writeContentToOutputStream(NetHttpRequest.java:188)
        com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:117)
        com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84)
        com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012)
        com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:514)
        com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:455)
        com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:565)
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.lambda$insertAll$1(BigQueryServicesImpl.java:906)
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$BoundedExecutorService$SemaphoreCallable.call(BigQueryServicesImpl.java:1492)
        java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        java.base/java.lang.Thread.run(Thread.java:834)

I tried increasing the worker node size still seeing the same issue.


Solution

  • I really recommend you to upgrade your Beam version to 2.42.0 (latest).

    Also check if you have some aggregation like groupBy or groupByKey that are costly in memory inside a worker.

    You can also use Dataflow prime, that is the last execution engine for Dataflow and allows to prevent errors like outOfMemory in a worker with vertical autoscaling :

    dataflow prime

    Dataflow prime can be enabled with a program argument, example for Beam Java :

    --dataflowServiceOptions=enable_prime
    

    Dataflow prime helps in this case, but you have to check and optimize your job if needed and avoid costly operations if it's possible (memory leaks, useless aggregation, costly serialization...)