Search code examples
apache-kafkaksqldblenses

Need to filter out Kafka Records based on a certain keyword


I have a Kafka topic which has around 3 million records. I want to pick-out a single record from this which has a certain parameter. I have been trying to query this using Lenses, but unable to form the correct query. below are the record contents of 1 message.

{
  "header": {
    "schemaVersionNo": "1",
  },
  "payload": {
    "modifiedDate": 1552334325212,
    "createdDate": 1552334325212,
    "createdBy": "A",
    "successful": true,
    "source_order_id": "1111111111111",
  }
}

Now I want to filter out a record with a particular source_order_id, but not able to figure out the right way to do so. We have tried via lenses as well Kafka Tool.

A sample query that we tried in lenses is below:

SELECT * FROM `TEST`
WHERE _vtype='JSON' AND _ktype='BYTES'
AND _sample=2 AND _sampleWindow=200 AND payload.createdBy='A'

This query works, however if we try with source id as shown below we get an error:

SELECT * FROM `TEST`
WHERE _vtype='JSON' AND _ktype='BYTES'
AND _sample=2 AND _sampleWindow=200 AND payload.source_order_id='1111111111111'



 Error : "Invalid syntax at line=3 and column=41.Invalid syntax for 'payload.source_order_id'. Field 'payload' resolves to primitive type STRING.

Consuming all 3 million records via a custom consumer and then iterating over it doesn't seem to be an optimised approach to me, so looking for any available solutions for such a use case.


Solution

  • Since you said you are open to other solutions, here is one built using KSQL.

    First, let's get some sample records into a source topic:

    $ kafkacat -P -b localhost:9092 -t TEST <<EOF
    { "header": { "schemaVersionNo": "1" }, "payload": { "modifiedDate": 1552334325212, "createdDate": 1552334325212, "createdBy": "A", "successful": true, "source_order_id": "3411976933214" } }
    { "header": { "schemaVersionNo": "1" }, "payload": { "modifiedDate": 1552334325412, "createdDate": 1552334325412, "createdBy": "B", "successful": true, "source_order_id": "3411976933215" } }
    { "header": { "schemaVersionNo": "1" }, "payload": { "modifiedDate": 1552334325612, "createdDate": 1552334325612, "createdBy": "C", "successful": true, "source_order_id": "3411976933216" } }
    EOF
    

    Using KSQL we can inspect the topic with PRINT:

    ksql> PRINT 'TEST' FROM BEGINNING;
    Format:JSON
    {"ROWTIME":1552476232988,"ROWKEY":"null","header":{"schemaVersionNo":"1"},"payload":{"modifiedDate":1552334325212,"createdDate":1552334325212,"createdBy":"A","successful":true,"source_order_id":"3411976933214"}}
    {"ROWTIME":1552476232988,"ROWKEY":"null","header":{"schemaVersionNo":"1"},"payload":{"modifiedDate":1552334325412,"createdDate":1552334325412,"createdBy":"B","successful":true,"source_order_id":"3411976933215"}}
    {"ROWTIME":1552476232988,"ROWKEY":"null","header":{"schemaVersionNo":"1"},"payload":{"modifiedDate":1552334325612,"createdDate":1552334325612,"createdBy":"C","successful":true,"source_order_id":"3411976933216"}}
    

    Then declare a schema on the topic, which enables us to run SQL against it:

    ksql> CREATE STREAM TEST (header STRUCT<schemaVersionNo VARCHAR>, 
                              payload STRUCT<modifiedDate BIGINT, 
                                            createdDate BIGINT, 
                                            createdBy VARCHAR, 
                                            successful BOOLEAN, 
                                            source_order_id VARCHAR>) 
                              WITH (KAFKA_TOPIC='TEST', 
                                    VALUE_FORMAT='JSON');
    
    Message
    ----------------
    Stream created
    ----------------
    

    Tell KSQL to work with all the data in the topic:

    ksql> SET 'auto.offset.reset' = 'earliest';
    Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
    

    And now we can select all the data:

    ksql> SELECT * FROM TEST;
    1552475910106 | null | {SCHEMAVERSIONNO=1} | {MODIFIEDDATE=1552334325212, CREATEDDATE=1552334325212, CREATEDBY=A, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933214}
    1552475910106 | null | {SCHEMAVERSIONNO=1} | {MODIFIEDDATE=1552334325412, CREATEDDATE=1552334325412, CREATEDBY=B, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933215}
    1552475910106 | null | {SCHEMAVERSIONNO=1} | {MODIFIEDDATE=1552334325612, CREATEDDATE=1552334325612, CREATEDBY=C, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933216}
    ^CQuery terminated
    

    or we can selectively query it, using the -> notation to access nested fields in the schema:

    ksql> SELECT * FROM TEST 
            WHERE PAYLOAD->CREATEDBY='A';
    1552475910106 | null | {SCHEMAVERSIONNO=1} | {MODIFIEDDATE=1552334325212, CREATEDDATE=1552334325212, CREATEDBY=A, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933214}
    

    As well as selecting all records, you can return just the fields of interest:

    ksql> SELECT payload FROM TEST 
            WHERE PAYLOAD->source_order_id='3411976933216';
    {MODIFIEDDATE=1552334325612, CREATEDDATE=1552334325612, CREATEDBY=C, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933216}
    

    With KSQL you can write the results of any SELECT statement to a new topic, which populates it with all existing messages along with every new message on the source topic filtered and processed per the declared SELECT statement:

    ksql> CREATE STREAM TEST_CREATED_BY_A AS
            SELECT * FROM TEST WHERE PAYLOAD->CREATEDBY='A';
    
    Message
    ----------------------------
    Stream created and running
    ----------------------------
    

    List topic on the Kafka cluster:

    ksql> SHOW TOPICS;
    
    Kafka Topic            | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
    ----------------------------------------------------------------------------------------------------
    orders                 | true       | 1          | 1                  | 1         | 1
    pageviews              | false      | 1          | 1                  | 0         | 0
    products               | true       | 1          | 1                  | 1         | 1
    TEST                   | true       | 1          | 1                  | 1         | 1
    TEST_CREATED_BY_A      | true       | 4          | 1                  | 0         | 0
    

    Print the contents of the new topic:

    ksql> PRINT 'TEST_CREATED_BY_A' FROM BEGINNING;
    Format:JSON
    {"ROWTIME":1552475910106,"ROWKEY":"null","HEADER":{"SCHEMAVERSIONNO":"1"},"PAYLOAD":{"MODIFIEDDATE":1552334325212,"CREATEDDATE":1552334325212,"CREATEDBY":"A","SUCCESSFUL":true,"SOURCE_ORDER_ID":"3411976933214"}}