Search code examples
apache-flinkgoogle-cloud-dataproc

Consume GCS files based on pattern from Flink


Since Flink supports the Hadoop FileSystem abstraction, and there's a GCS connector - library that implements it on top of Google Cloud Storage.

How do I create a Flink file source using the code in this repo?


Solution

  • To achieve this you need to:

    1. Install and configure GCS connector on your Flink cluster.
    2. Add Hadoop and Flink dependencies (including HDFS connector) to your project:
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-scala_2.11</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-filesystem_2.11</artifactId>
          <version>${flink.version}</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-hadoop-compatibility_2.11</artifactId>
          <version>${flink.version}</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>${hadoop.version}</version>
          <scope>provided</scope>
      </dependency>
      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-mapreduce-client-core</artifactId>
          <version>${hadoop.version}</version>
          <scope>provided</scope>
      </dependency>
      
    3. Use it to create data source with GCS path:

      import org.apache.flink.api.java.DataSet;
      import org.apache.flink.api.java.ExecutionEnvironment;
      import org.apache.flink.api.java.tuple.Tuple2;
      import org.apache.flink.hadoopcompatibility.HadoopInputs;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapred.TextInputFormat;
      
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
      
      DataSet<Tuple2<LongWritable, Text>> input =
          env.createInput(
              HadoopInputs.readHadoopFile(
                  new TextInputFormat(), LongWritable.class, Text.class, "gs://bucket/path/some*pattern/"));