Search code examples
apache-flinkflink-streamingflink-sql

Flink Table API join tables in a streaming mode


I have registered two jdbc tables in a my flink application, and want to join them and convert result into regular datastream. But when I join the tables getting error

Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.Unregistered_DataStream_Sink_1' doesn't support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[((id = asset_id) AND (owner_id = owner_id0))], select=[owner_id, id, poi_id, gateway_id, owner_id0, asset_id, tag_id, role], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])

Code

    val assetTableDescriptor = TableDescriptor.forConnector("jdbc")
      .option(JdbcConnectorOptions.URL, s"jdbc:mysql://${memsqlConfig("host")}:${memsqlConfig("port")}/${memsqlConfig("dbname")}")
      .option(JdbcConnectorOptions.USERNAME, memsqlConfig("user"))
      .option(JdbcConnectorOptions.PASSWORD, memsqlConfig("pass"))
      .option(JdbcConnectorOptions.TABLE_NAME, "asset")
      .schema(Schema.newBuilder()
        .column("owner_id", DataTypes.STRING)
        .column("id", DataTypes.STRING)
        .column("poi_id", DataTypes.STRING)
        .column("gateway_id", DataTypes.STRING)
        .column("internal_status", DataTypes.STRING)
        .build())
      .build()

    val assetTagTableDescriptor = TableDescriptor.forConnector("jdbc")
      .option(JdbcConnectorOptions.URL, s"jdbc:mysql://${memsqlConfig("host")}:${memsqlConfig("port")}/${memsqlConfig("dbname")}")
      .option(JdbcConnectorOptions.USERNAME, memsqlConfig("user"))
      .option(JdbcConnectorOptions.PASSWORD, memsqlConfig("pass"))
      .option(JdbcConnectorOptions.TABLE_NAME, "asset_tag")
      .schema(Schema.newBuilder()
        .column("owner_id", DataTypes.STRING)
        .column("asset_id", DataTypes.STRING)
        .column("tag_id", DataTypes.STRING)
        .column("role", DataTypes.STRING)
        .build())
      .build()

    tableEnv.createTemporaryTable("asset", assetTableDescriptor)
    tableEnv.createTemporaryTable("asset_tag", assetTagTableDescriptor)


    val assetTable: Table = tableEnv.from(assetTableDescriptor)
      .select($"owner_id" as "asset_owner_id", $"id", $"poi_id", $"gateway_id", $"internal_status")

    val assetTagTable: Table = tableEnv.from(assetTagTableDescriptor)
      .select($"owner_id", $"asset_id", $"tag_id", $"role")

    val assetAssociationTable = assetTable
      .leftOuterJoin(assetTagTable, $"id" === $"asset_id" and $"asset_owner_id" === $"owner_id")
      .select($"asset_owner_id", $"id", $"poi_id", $"gateway_id", $"tag_id", $"role")

    val assetTableStream: DataStream[AssetOperationKafkaMsg] = tableEnv
      .toDataStream(assetAssociationTable, classOf[JdbcAssetState])
      .flatMap(new JdbcAssetStateDataMapper)

In a BATCH mode, it works good, but I need to join assetTableStream with another stream in my app in STREAMING mode

Based what I found in flink docs looks like I need to use Lookup Join but cannot figure out how to do that with table API (not SQL).

Having any small example with joining two jdbc tables and converting it into datastream would be fantastic


Solution

  • Two hints: