Search code examples
apache-flinkflink-streaming

Stuck at: Could not find a suitable table factory


While playing around with Flink, I have been trying to upsert data into Elasticsearch. I'm having this error on my STDOUT:

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
connector.hosts=http://elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200
connector.index=transfers-sum
connector.key-null-literal=n/a
connector.property-version=1
connector.type=elasticsearch
connector.version=6
format.json-schema={      \"curr_careUnit\": {\"type\": \"text\"},      \"sum\": {\"type\": \"float\"}    }
format.property-version=1
format.type=json
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=curr_careUnit
schema.1.data-type=FLOAT
schema.1.name=sum
update-mode=upsert

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
    at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
...

Here is what I have in my scala Flink code:

  def main(args: Array[String]) {
    // Create streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // Set properties per KafkaConsumer API
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "kafka.kafka:9092")
    properties.setProperty("group.id", "test")

    // Add Kafka source to environment
    val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data4", new SimpleStringSchema(), properties)
    // Read from beginning of topic
    myKConsumer.setStartFromEarliest()

    val streamSource = env
      .addSource(myKConsumer)

    // Transform CSV (with a header row per Kafka event into a Transfers object
    val streamTransfers = streamSource.map(new TransfersMapper())

    // create a TableEnvironment
    val tEnv = StreamTableEnvironment.create(env)

    // register a Table
    val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
    tEnv.createTemporaryView("transfers", tblTransfers)

    tEnv.connect(
      new Elasticsearch()
        .version("6")
        .host("elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http")
        .index("transfers-sum")
        .keyNullLiteral("n/a")
      .withFormat(new Json().jsonSchema("{      \"curr_careUnit\": {\"type\": \"text\"},      \"sum\": {\"type\": \"float\"}    }"))
      .withSchema(new Schema()
        .field("curr_careUnit", DataTypes.STRING())
        .field("sum", DataTypes.FLOAT())
      )
      .inUpsertMode()
      .createTemporaryTable("transfersSum")

    val result = tEnv.sqlQuery(
      """
        |SELECT curr_careUnit, sum(los)
        |FROM transfers
        |GROUP BY curr_careUnit
        |""".stripMargin)

    result.insertInto("transfersSum")

    env.execute("Flink Streaming Demo Dump to Elasticsearch")
  }
}

I am creating a fat jar and uploading it to my remote flink instance. Here is my build.gradle dependencies:

compile 'org.scala-lang:scala-library:2.11.12'
compile 'org.apache.flink:flink-scala_2.11:1.10.0'
compile 'org.apache.flink:flink-streaming-scala_2.11:1.10.0'
compile 'org.apache.flink:flink-connector-kafka-0.10_2.11:1.10.0'
compile 'org.apache.flink:flink-table-api-scala-bridge_2.11:1.10.0'
compile 'org.apache.flink:flink-connector-elasticsearch6_2.11:1.10.0'
compile 'org.apache.flink:flink-json:1.10.0'
compile 'com.fasterxml.jackson.core:jackson-core:2.10.1'
compile 'com.fasterxml.jackson.module:jackson-module-scala_2.11:2.10.1'
compile 'org.json4s:json4s-jackson_2.11:3.7.0-M1'

Here is how the farJar command is built for gradle:

jar {
    from {
        (configurations.compile).collect {
            it.isDirectory() ? it : zipTree(it)
        }
    }
    manifest {
        attributes("Main-Class": "main" )
    }
}
task fatJar(type: Jar) {
    zip64 true
    manifest {
        attributes 'Main-Class': "flinkNamePull.Demo"
    }
    baseName = "${rootProject.name}"
    from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
    with jar
}

Could anybody please help me to see what I am missing? I'm fairly new to Flink and data streaming in general. Hehe

Thank you in advance!


Solution

  • Is the list in The following factories have been considered: complete? Does it contain Elasticsearch6UpsertTableSinkFactory? If not as far as I can tell there is a problem with the service discovery dependencies.

    How do you submit your job? Can you check if you have a file META-INF/services/org.apache.flink.table.factories.TableFactory in the uber jar with an entry for Elasticsearch6UpsertTableSinkFactory?

    When using maven you have to add a transformer to properly merge service files:

    <!-- The service transformer is needed to merge META-INF/services files -->
    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
    

    I don't know how do you do it in gradle.


    EDIT: Thanks to Arvid Heise In gradle when using shadowJar plugin you can merge service files via:

    // Merging Service Files
    shadowJar {
      mergeServiceFiles()
    }