I have registered two jdbc tables in a my flink application, and want to join them and convert result into regular datastream. But when I join the tables getting error
Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.Unregistered_DataStream_Sink_1' doesn't support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[((id = asset_id) AND (owner_id = owner_id0))], select=[owner_id, id, poi_id, gateway_id, owner_id0, asset_id, tag_id, role], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
Code
val assetTableDescriptor = TableDescriptor.forConnector("jdbc")
.option(JdbcConnectorOptions.URL, s"jdbc:mysql://${memsqlConfig("host")}:${memsqlConfig("port")}/${memsqlConfig("dbname")}")
.option(JdbcConnectorOptions.USERNAME, memsqlConfig("user"))
.option(JdbcConnectorOptions.PASSWORD, memsqlConfig("pass"))
.option(JdbcConnectorOptions.TABLE_NAME, "asset")
.schema(Schema.newBuilder()
.column("owner_id", DataTypes.STRING)
.column("id", DataTypes.STRING)
.column("poi_id", DataTypes.STRING)
.column("gateway_id", DataTypes.STRING)
.column("internal_status", DataTypes.STRING)
.build())
.build()
val assetTagTableDescriptor = TableDescriptor.forConnector("jdbc")
.option(JdbcConnectorOptions.URL, s"jdbc:mysql://${memsqlConfig("host")}:${memsqlConfig("port")}/${memsqlConfig("dbname")}")
.option(JdbcConnectorOptions.USERNAME, memsqlConfig("user"))
.option(JdbcConnectorOptions.PASSWORD, memsqlConfig("pass"))
.option(JdbcConnectorOptions.TABLE_NAME, "asset_tag")
.schema(Schema.newBuilder()
.column("owner_id", DataTypes.STRING)
.column("asset_id", DataTypes.STRING)
.column("tag_id", DataTypes.STRING)
.column("role", DataTypes.STRING)
.build())
.build()
tableEnv.createTemporaryTable("asset", assetTableDescriptor)
tableEnv.createTemporaryTable("asset_tag", assetTagTableDescriptor)
val assetTable: Table = tableEnv.from(assetTableDescriptor)
.select($"owner_id" as "asset_owner_id", $"id", $"poi_id", $"gateway_id", $"internal_status")
val assetTagTable: Table = tableEnv.from(assetTagTableDescriptor)
.select($"owner_id", $"asset_id", $"tag_id", $"role")
val assetAssociationTable = assetTable
.leftOuterJoin(assetTagTable, $"id" === $"asset_id" and $"asset_owner_id" === $"owner_id")
.select($"asset_owner_id", $"id", $"poi_id", $"gateway_id", $"tag_id", $"role")
val assetTableStream: DataStream[AssetOperationKafkaMsg] = tableEnv
.toDataStream(assetAssociationTable, classOf[JdbcAssetState])
.flatMap(new JdbcAssetStateDataMapper)
In a BATCH mode, it works good, but I need to join assetTableStream
with another stream in my app in STREAMING mode
Based what I found in flink docs looks like I need to use Lookup Join
but cannot figure out how to do that with table API (not SQL).
Having any small example with joining two jdbc tables and converting it into datastream would be fantastic
Two hints:
toChangelogStream
method (not toDataStream
as it is in your example)