We came across an issue with the Combine operation with Apache Beam Go SDK (v2.28.0), when running a pipeline on Google Cloud Dataflow. I understand that the Go SDK is experimental but it would be great if someone can help us understand if there’s anything wrong with our code, or if there's a bug in the Go SDK or Dataflow. The issue only happens when running the pipeline with Google Dataflow, with some large data set. We are trying to combine a PCollection<pairedVec>
, with
type pairedVec struct {
Vec1 [1048576]uint64
Vec2 [1048576]uint64
}
There are 10,000,000 items in the PCollection.
Main func:
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
pipeline := beam.NewPipeline()
scope := pipeline.Root()
records := textio.ReadSdf(scope, *inputFile)
rRecords := beam.Reshuffle(scope, records)
vecs := beam.ParDo(scope, &genVecFn{LogN: *logN}, rRecords)
histogram := beam.Combine(scope, &combineVecFn{LogN: *logN}, vecs)
lines := beam.ParDo(scope, &flattenVecFn{}, histogram)
textio.Write(scope, *outputFile, lines)
if err := beamx.Run(ctx, pipeline); err != nil {
log.Exitf(ctx, "Failed to execute job: %s", err)
}
}
After reading the input file, Dataflow scheduled 1000 workers to generate the PCollection, and started to do the combination. Then the worker number reduced to almost 1 and lasted for a very long time. Eventually the job failed with the following error log:
2021-03-02T06:13:40.438112597ZWorkflow failed. Causes: S09:CombinePerKey/CoGBK'1/Read+CombinePerKey/main.combineVecFn+CombinePerKey/main.combineVecFn/Extract+beam.dropKeyFn+main.flattenVecFn+textio.Write/beam.addFixedKeyFn+textio.Write/CoGBK/Write failed., The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. The work item was attempted on these workers: go-job-1-1614659244459204-03012027-u5s6-harness-q8tx Root cause: The worker lost contact with the service., go-job-1-1614659244459204-03012027-u5s6-harness-44hk Root cause: The worker lost contact with the service., go-job-1-1614659244459204-03012027-u5s6-harness-05nm Root cause: The worker lost contact with the service., go-job-1-1614659244459204-03012027-u5s6-harness-l22w Root cause: The worker lost contact with the service.
Edit
Tried to add a step to "pre-combine" the records to 100,000 keys (combineDomain=100000) before combining all of them together:
Main function:
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
pipeline := beam.NewPipeline()
scope := pipeline.Root()
records := textio.ReadSdf(scope, *inputFile)
rRecords := beam.Reshuffle(scope, records)
vecs := beam.ParDo(scope, &genVecFn{LogN: *logN}, rRecords)
keyVecs := beam.ParDo(scope, &addRandomKeyFn{Domain: *combineDomain}, vecs)
combinedKeyVecs := beam.CombinePerKey(scope, &combineVecFn{LogN: *logN}, keyVecs)
combinedVecs := beam.DropKey(scope, combinedKeyVecs)
histogram := beam.Combine(scope, &combineVecFn{LogN: *logN}, combinedVecs)
lines := beam.ParDo(scope, &flattenVecFn{}, histogram)
textio.Write(scope, *outputFile, lines)
if err := beamx.Run(ctx, pipeline); err != nil {
log.Exitf(ctx, "Failed to execute job: %s", err)
}
}
But the job scheduled only one worker for it, and failed after a long time:
Workflow failed. Causes: S06:Reshuffle/e6_gbk/Read+Reshuffle/e6_gbk/GroupByWindow+Reshuffle/e6_unreify+main.genVecFn+main.addRandomKeyFn+CombinePerKey/CoGBK'2+CombinePerKey/main.combineVecFn/Partial+CombinePerKey/CoGBK'2/Write failed., The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. The work item was attempted on these workers:
go-job-1-1615178257414007-03072037-mrlo-harness-ppjj
Root cause: The worker lost contact with the service.,
go-job-1-1615178257414007-03072037-mrlo-harness-czng
Root cause: The worker lost contact with the service.,
go-job-1-1615178257414007-03072037-mrlo-harness-79n8
Root cause: The worker lost contact with the service.,
go-job-1-1615178257414007-03072037-mrlo-harness-mj6c
Root cause: The worker lost contact with the service.
After adding another reshuffle before CombinePerKey(), the pipeline scheduled 1000 workers to process it. But the job was extremely slow, and uses a large amount of shuffle data. 1 hour later, genVecFn finished less than 10 percent, and had 8.08TB shuffle data. This is basically consistent with our production code, which eventually failed because it used up the 40TB shuffle data quota.
We tried another method to reduce workload on a single worker: segment the vector [1048576]uint64 into 32 pieces of [32768]uint64, and combine each of the pieces. Something like:
totalLength := uint64(1 << *logN)
segLength := uint64(1 << *segmentBits)
for i := uint64(0); i < totalLength/segLength; i++ {
fileName := strings.ReplaceAll(*outputFile, path.Ext(*outputFile), fmt.Sprintf("-%d-%d%s", i+1, totalLength/segLength, path.Ext(*outputFile)))
pHistogram := beam.Combine(scope, &combineVecRangeFn{StartIndex: i * segLength, Length: segLength}, vecs)
flattened := beam.ParDo(scope, &flattenVecRangeFn{StartIndex: i * segLength}, pHistogram)
textio.Write(scope, fileName, flattened)
}
The job succeeded eventually.
Given your pipeline code, the job downsizing to 1 worker is behaving as expected for the Go SDK since it lacks some of the optimizations of the Java and Python SDKs. The reason it happens is because you use beam.Combine
which is a global combine, meaning that every element in the PCollection is combined down to one value. On the Go SDK this means that all elements need to be localized to a single worker to be combined, which for 10 million items each of which is about 16 megabytes, takes too long and the job most likely times out (you can probably confirm this by looking for a timeout message in the Dataflow logs).
Other SDKs have optimizations in place which split the input elements among workers to combine down, before consolidating to a single worker. For example in the Java SDK: "Combining can happen in parallel, with different subsets of the input PCollection being combined separately, and their intermediate results combined further, in an arbitrary tree reduction pattern, until a single result value is produced."
Fortunately, this solution is easily to implement manually for the Go SDK. Simply place your elements into N buckets (where N is greater than the number of workers you'd ideally want) by assigning random keys in the range of [0, N). Then perform a CombinePerKey
and only elements with matching keys need to be localized on a worker, allowing this Combine to be split in multiple workers. Then follow that up with DropKey
and then the global Combine, and you should get the intended result.