Search code examples
apache-kafkaksqldb

How to find if two objects have been close for a certain amount of time


I'm new with ksqlDB and trying to get my head around the difference between the different types of windowing. Currently I'm trying to populate a table for the following:

If two objects have been closer than 1 km, having a speed of less than 2, for longer than one hour.

I have two streams, vessels and reefers. Both these streams are continuously updated with new positional data.

Currently my query looks like this:

SELECT VESSELS.ID, LATEST_BY_OFFSET(VESSELS.NAME)
  FROM vessels
  INNER JOIN REEFERS
  WITHIN 3 HOURS
  ON reefers.id_2 = vessels.id_2
  WINDOW HOPPING (size 1 HOURS, advance by 15 MINUTES)
  WHERE GEO_DISTANCE(reefers.LAT, reefers.LON, vessels.LAT, vessels.LON, 'KM') < 1 AND vessels.speed < 2.0 AND vessels.id != reefer.id
  GROUP BY VESSELS.ID

Source data for vessels:

|ID           |NAME        |LAT      |LON       |SPEED  |ID_2   |
|257184340    |KRYSSHOLM   |70.6807  |21.6932   |0.8    |1      |

The reefers stream has the same structure

As the final result I'm only interested in a list with vessels that fulfils the criteria. As of now this query gives me a long list with every datapoint (positional update).

Regards Lars


Solution

  • -- Source streams & topics
    CREATE OR REPLACE STREAM reefers_v00 (ID BIGINT, NAME VARCHAR, LAT DOUBLE, LON DOUBLE, SPEED double, DUMMY INT) WITH (KAFKA_TOPIC='reefers_v00',partitions=4,replicas=1,value_format='protobuf');
    CREATE OR REPLACE STREAM vessels_v00 (ID BIGINT, NAME VARCHAR, LAT DOUBLE, LON DOUBLE, SPEED double, DUMMY INT) WITH (KAFKA_TOPIC='vessels_v00',partitions=4,replicas=1,value_format='protobuf');
    
    -- Test data
    INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-29 12:03:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),28,'TEST1',-56.72016, 83.43720, 0.8,1);
    INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-29 12:03:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),21,'TEST2',-56.72016, 83.44720, 1.8,1);
    INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-29 12:33:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),28,'TEST1',-56.72016, 83.43720, 0.8,1);
    INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-29 12:33:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),21,'TEST2',-56.72016, 83.44720, 1.8,1);
    INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-29 13:01:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),28,'TEST1',-56.72016, 83.43720, 0.8,1);
    INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-29 13:01:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),21,'TEST2',-56.72016, 83.44720, 1.8,1);
    
    INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 12:03:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),9,'wibble',-56.72016, 83.43720, 0.8,1);
    INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 13:03:05', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),11,'foo',-56.62016, 83.43720, 0.8,1);
    INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 13:04:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),9,'wibble',-56.61016, 83.45720, 0.8,1);
    INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 14:02:03', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),9,'wibble',-56.61016, 83.45720, 0.8,1);
    INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 14:02:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),11,'foo',-56.62016, 83.43720, 0.8,1);
    INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 14:05:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),11,'foo',-56.62016, 83.44720, 0.8,1);
    INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 14:05:10', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),9,'wibble',-56.64016, 83.44720, 0.8,1);
    INSERT INTO vessels_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 14:09:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),9,'wibble',-57.64016, 83.44720, 0.8,1);
    INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 14:10:00', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),11,'foo',-56.62016, 83.44720, 0.8,1);
    INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 15:15:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),11,'foo',-56.62016, 83.45720, 0.8,1);
    INSERT INTO reefers_v00 (ROWTIME, ID, NAME, LAT, LON,SPEED,DUMMY) VALUES (STRINGTOTIMESTAMP('2021-01-28 19:03:04', 'yyyy-MM-dd HH:mm:ss', 'Europe/London'),22,'bar',-56.68016, 83.44720, 0.8,1);
    
    -- Join the streams
    -- Assumes that movement reports within a minute of each other can be counted as the same timestamp
    -- Creates a new field to indicate if the two objects are within the criteria (range & speed)
    SET 'auto.offset.reset' = 'earliest';
    
    CREATE STREAM MOVEMENTS_v00 AS
    SELECT TIMESTAMPTOSTRING(v.ROWTIME, 'yyyy-MM-dd HH:mm:ss', 'Europe/London') AS TS_v,
      v.*,
      TIMESTAMPTOSTRING(r.ROWTIME, 'yyyy-MM-dd HH:mm:ss', 'Europe/London') AS TS_r,
      r.*,
      geo_distance(v.lat, v.lon, r.lat, r.lon, 'KM') as distance,
      case
        when geo_distance(v.lat, v.lon, r.lat, r.lon, 'KM') < 3 
         and (r.speed <2 and v.speed < 2) then 1
        else 0
      end as in_range_and_speed
    FROM vessels_v00 v
      inner join reefers_v00 r 
      WITHIN 1 minute on R.DUMMY = V.DUMMY
      ;
    
    -- Query the data for debug purposes
    SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS TS, V_ID, R_ID, V_SPEED, R_SPEED, DISTANCE, IN_RANGE_AND_SPEED 
      FROM MOVEMENTS_v00 
      EMIT CHANGES;
    
    -- Select object pairs using a session window based on the critera flag
    -- and a predicate on the duration of the session
    SELECT V_ID,
      V_NAME,
      R_ID,
      R_NAME,
      MIN(DISTANCE) AS CLOSEST_DISTANCE_KM,
      MAX(DISTANCE) AS FURTHEST_DISTANCE_KM,
      COLLECT_LIST(DISTANCE) AS DISTANCE_POINTS_KM,
      TIMESTAMPTOSTRING( MIN(ROWTIME), 'yyyy-MM-dd HH:mm:ss', 'Europe/London' ) AS FIRST_TS,
      TIMESTAMPTOSTRING( MAX(ROWTIME), 'yyyy-MM-dd HH:mm:ss', 'Europe/London' ) AS LAST_TS,
      (MAX(ROWTIME) - MIN(ROWTIME)) / 1000 AS DIFF_SEC
    FROM MOVEMENTS_v00 WINDOW SESSION (1 HOUR)
    where IN_RANGE_AND_SPEED = 1
    GROUP BY V_ID,
      V_NAME,
      R_ID,
      R_NAME,
      IN_RANGE_AND_SPEED 
    HAVING (MAX(ROWTIME) - MIN(ROWTIME)) / 1000 > 3600  
      EMIT CHANGES;