I'm using ksqlDB version 0.11 (I cannot upgrade to newer versions at the moment), and willing to replicate a TABLE data into MySQL using JDBC Sink connector. ksqlDB v0.11 does not support multiple TABLE keys, and my data needs to be grouped using multiple GROUP BY expression.
Using this statement I create the table:
CREATE TABLE estads AS SELECT
STID AS stid,
ASIG AS asig,
COUNT(*) AS np,
MIN(NOTA) AS min,
MAX(NOTA) AS max,
AVG(NOTA) AS med,
LATEST_BY_OFFSET(FECHREG) AS fechreg
FROM estads_stm GROUP BY stid, asig EMIT CHANGES;
The resulting table has the following schema:
Name : ESTADS
Field | Type
---------------------------------------------
KSQL_COL_0 | VARCHAR(STRING) (primary key)
NP | BIGINT
MIN | DOUBLE
MAX | DOUBLE
MED | DOUBLE
FECHREG | VARCHAR(STRING)
As you can see, the two primary keys (stid and asig) has been merged into a field called KSQL_COL_0, which is the expected behavior for version 0.11. The problem is that I need to use JDBC Sink connector to replicate the data into a MySQL table with the following schema:
+---------+--------------+------+-----+-------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+---------+--------------+------+-----+-------------------+-----------------------------+
| stid | varchar(15) | NO | PRI | NULL | |
| asig | varchar(10) | NO | PRI | NULL | |
| np | smallint(6) | YES | | NULL | |
| min | decimal(5,2) | YES | | NULL | |
| max | decimal(5,2) | YES | | NULL | |
| med | decimal(5,2) | YES | | NULL | |
| fechreg | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+---------+--------------+------+-----+-------------------+-----------------------------+
I don't know how to "unmerge" the automatically generated KSQL_COL_0 in order to tell JDBC that both stid and asig are primary keys in the MySQL table. Any ideas how to manage this? I know that since ksqlDB version 0.15 this is no longer a problem, as ksqlDB tables support multiple keys, but as I said, upgrading is not an option in my case.
Thanks!
I figured it out.
Basically you need to use AS_VALUE()
clause in the table creation query. This way you copy the value of both private keys in new columns while also have the newly created private key in its own column. Then, simply specify in the JCBD Sink Connector to get the values of all the columns except the newly created private key.
CREATE TABLE estads AS SELECT
STID AS k1,
ASIG AS k2,
AS_VALUE(STID) AS stid,
AS_VALUE(ASIG) AS asig,
COUNT(*) AS np,
MIN(NOTA) AS min,
MAX(NOTA) AS max,
AVG(NOTA) AS med,
LATEST_BY_OFFSET(FECHREG) AS fechreg
FROM estads_stm GROUP BY k1, k2 EMIT CHANGES;