Search code examples
apache-sparkapache-flinkgoogle-cloud-dataflowapache-beamdataflow

How to deal with (Apache Beam) high IO bottlenecks?


Let's say to cite a simple example, that I have a very simple beam pipeline which just reads from a file and dumps the data into an output file. Now let's consider that the input file is huge (some GBs in size, the type of file you can't typically open in a text editor). Since the direct-runner implementation is quite simple (it reads the whole input set into memory), it won't be able to read and output those huge files (unless you assign an impractically high amount of memory to the java vm process); so my question is: "How do production runners like flink/spark/cloud dataflow" deal with this 'huge dataset problem'? - assuming they would not just try to put the whole file(s)/dataset into memory?" -.

I'd expect production runner's implementation need to work "in parts or batches" (like reading/processing/outputting in parts) to avoid trying to fit huge datasets into memory at any specific point in time. Can somebody please share their feedback regarding how production runners deal with this "huge data" situation?

Generalizing, please notice this applies for other input/output mechanisms too, for example if my input is a PCollection coming from huge database table (broadly speaking huge in both row-size and amount), does the internal implementation of the production's runner somehow divides the given input SQL statement into many internally generated sub statements each taking smaller subsets (for example by internally generating a count(-) statement, followed by N statements, each taking (count(-)/N) elements? the direct-runner won't do this and will just pass the given query 1:1 to the DB), or is my responsibility as a developer to "iterate in batches" and divide the problem, and if this is indeed the case, what are the best practices here, ie: having one pipeline for this or many?, and if only one then somehow parametrise the pipeline to read/write in batches? or iterate over a simple pipeline and manage necessary metadata externally to the pipeline?

thanks in advance, any feedback would be greatly appreciated!


EDIT (reflecting David's feedback):

David your feedback is highly valuable and definitely touches the point i'm interested in. Having a work discovery phase for splitting a source and and read phase to concurrently read the split-partitions is definitely what I was interested in hearing, so thanks for pointing me in the right direction. I have a couple of small follow up questions if you don't mind:

1 - The article points out under the section "Generic enumerator-reader communication mechanism" the following:

"The SplitEnumerator and SourceReader are both user implemented class. It is not rare that the implementation require some communication between these two components. In order to facilitate such use cases [....]"

So my question here would be, is that "splitting + reading behaviour" triggered by some user (ie. developer) provided implementation (specifically SplitEnumerator and SourceReader), or can I benefit from that out of the box without any custom code?.

2 - Probably just delving deeper into the question above; if I have a batch/bounded workload (let's say I'm using apache flink), and I'm interested in processing a "huge file" like described in the original post, will the pipeline work "out of the box" (doing the behind the scenes "work preparation phase" splits and the parallel reads), or would that require some custom code implemented by the developer?

thank's in advance for all your valuable feedback!


Solution

  • Just to provide some closure to this question, the justification for this question was to know if apache beam - when coupled with a production runner-(like flink or spark or google cloud dataflow), offered out of the box mechanisms for -splitting work a.k.a reading/writing manipulating - huge files (or datasources in general). The comment provided by David Anderson above proved of great value in hintintg at how Apache flink deals with this workflows.

    At this point I've implemented solutions using huge files (for testing possible IO bottlenecks) with a "beam on flink" based pipeline, and I can confirm, that flink will create an excecution plan which includes splitting sources, and dividing work in such a way that no memory problem arises. Now, there can be of course conditions under which stability/"IO performance" is compromised, but at least I can confirm that the workflows carried out behind the pipeline abstraction, uses the filesystem when carriying out tasks, avoiding fitting all data in memory and thus avoiding trivial memory errors. Conclusion: yes "beam on flink" (and likely spark and dataflow too) do offer proper work preparation, work splitting and filesystem usage so that available volatile memory is used in an efficient way.

    Update about datasources: Regarding DBs as datasources, Flink won't (and can't - it is not trivial) optimize/split/distribute work related to DB datasources in the same way it optimizes reading from the filesystem. There are still approaches to read huge amount of data (records) from a DB though, but the implementation details need to be addressed by the developer instead of being responsibility of the framework. I've found this article (https://nl.devoteam.com/expert-view/querying-jdbc-database-in-parallel-with-google-dataflow-apache-beam/) very helpful in addressing the point of reading massive amounts of records from a DB in beam (the article uses a cloud dataflow runner, but I used Flink and it worked just fine), splitting queries and distributing the processing.