Search code examples
scalaapache-flinkflink-sql

leftOuterJoin throws TableException: Unsupported join type 'LEFT'


I'm trying to run a left outer join on two tables and convert the results to a DataStream.

All the joins I've done before using flink have been inner joins, and I have always followed the join with a .toRetractStream[MyCaseClass](someQueryConfig). However, with the introduction of null values due to the left join, my understanding from the flink docs is that I can no longer use case classes because they don't support null values when converting a table to a DataStream.

So, I'm trying to accomplish this using a POJO. Here is my code:

class EnrichedTaskUpdateJoin(val enrichedTaskId: String, val enrichedTaskJobId: String, val enrichedTaskJobDate: String, val enrichedTaskJobMetadata: Json, val enrichedTaskStartedAt: String, val enrichedTaskTaskMetadata: Json, val taskUpdateMetadata: Json = Json.Null) {}

val qConfig = tableEnv.queryConfig
qConfig.withIdleStateRetentionTime(IDLE_STATE_RETENTION_TIME)

val updatedTasksUpsertTable = enrichedTasksUpsertTable
  .leftOuterJoin(taskUpdatesUpsertTable, 'enrichedTaskId === 'taskUpdateId)
  .select(
    'enrichedTaskId,
    'enrichedTaskJobId,
    'enrichedTaskJobDate,
    'enrichedTaskJobMetadata,
    'enrichedTaskStartedAt,
    'enrichedTaskTaskMetadata,
    'taskUpdateMetadata
  )

val updatedEnrichedTasksStream: KeyedStream[String, String] = updatedTasksUpsertTable
  .toAppendStream[EnrichedTaskUpdateJoin](qConfig)
  .map(toEnrichedTask(_))
  .map(encodeTask(_))
  .keyBy(x => parse(x).getOrElse(Json.Null).hcursor.get[String]("id").getOrElse(""))

This compiles just fine, but when I try to run it, I get org.apache.flink.table.api.TableException: Unsupported join type 'LEFT'. Currently only non-window inner joins with at least one equality predicate are supported. However, according to these docs, it seems like I should be able to run a left join. It also seems worth noting that the error gets thrown from the .toAppendStream[EnrichedTaskUpdateJoin](qConfig). I thought perhaps the non-window portion of the error implied that my idle state retention time was a problem, so I took the query config out, but got the same error.

Hopefully this has enough context, but if I need to add anything else please let me know. Also, I'm running flink 1.5-SNAPSHOT and Circe for json parsing. I'm also quite new to scala, so it's very possible that this is just some dumb syntax error.


Solution

  • Non-windowed outer joins are not supported in Flink 1.5-SNAPSHOT. As you can see in the link that you have posted, there is no "Streaming" tag next to "Outer Joins". Time-windowed joins (which work on time attributes) were supported in 1.5.

    Flink 1.6 will provide LEFT, RIGHT, and FULL outer joins (see also FLINK-5878).

    Btw. make sure that EnrichedTaskUpdateJoin is really a POJO because POJOs need a default constructor and I think also var instead of val.