Search code examples
ksqldb

KSQLDB: Group By Concate Equivalent


I have a stream such as the following:

ksql> select * from customerstream;
+-------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------+
|EVENT                                                                                                                                |CONTENT                                                                                                                              |
+-------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------+
|create                                                                                                                               |{name=bob, location=NY, id=1}                                                                                                        |
|update                                                                                                                               |{location=AM}                                                                                                                        |
|update                                                                                                                               |{location=BER}                                                                                                                       |
|update                                                                                                                               |{name=bob_new}                                                                                                                       |
|delete                                                                                                                               |{id=1}                                                                                                                               |

Now I would like to group the events by the id and ignore customers, that have already been deleted.

I look for something like this:

select content['id'] from customer group by content['id'] HAVING 'delete' not in collect_set(event);

Solution

  • I found it!

    select 
    content['id'],latest_by_offset(content['location']),collect_set(event) 
    from customerstream group by content['id'] HAVING NOT 
    ARRAY_CONTAINS(collect_set(event),'delete')  emit changes;