Search code examples
apache-beamapache-calcitebeam-sql

Apache calcite: cast integer to datetime


I am using Beam SQL and trying to cast integer to datetime field.

  Schema resultSchema =
    Schema.builder()
          .addInt64Field("detectedCount")
          .addStringField("sensor")
          .addInt64Field("timestamp")
          .build();

  PCollection<Row> sensorRawUnboundedTimestampedSubset = 
    sensorRowUnbounded.apply(
        SqlTransform.query(
          "select PCOLLECTION.payload.`value`.`count` detectedCount, \n"
          + "PCOLLECTION.payload.`value`.`id` sensor, \n"
          + "PCOLLECTION.`timestamp` `timestamp` \n"
          + "from PCOLLECTION "))
    .setRowSchema(resultSchema);

For some computation and windowing, I want to convert/cast timestamp to Datetime field? Please provide some pointers to convert timestamp in resultSchema to DateTime. datatype.


Solution

  • There is no out of the box way to do that in Beam (or in Calcite). Short version - Calcite or Beam have no way of knowing how you actually store the dates or timestamps in the integers. However, assuming you have epoch millis, this should work:

    @Test
    public void testBlah() throws Exception {
      // input schema, has timestamps as epoch millis
      Schema schema = Schema.builder().addInt64Field("ts").addStringField("st").build();
    
      DateTime ts1 = new DateTime(2019, 8, 9, 10, 11, 12);
      DateTime ts2 = new DateTime(2019, 8, 9, 10, 11, 12);
    
      PCollection<Row> input =
        pipeline
          .apply(
              "createRows",
              Create.of(
                  Row.withSchema(schema).addValues(ts1.getMillis(), "two").build(),
                  Row.withSchema(schema).addValues(ts2.getMillis(), "twelve").build()))
          .setRowSchema(schema);
    
      PCollection<Row> result =
        input.apply(
          SqlTransform.query(
              "SELECT \n"
              + "(TIMESTAMP '1970-01-01 00:00:00' + ts * INTERVAL '0.001' SECOND) as ts, \n"
              + "st \n"
              + "FROM \n"
              + "PCOLLECTION"));
    
      // output schema, has timestamps as DateTime
      Schema outSchema = Schema.builder().addDateTimeField("ts").addStringField("st").build();
      PAssert.that(result)
        .containsInAnyOrder(
            Row.withSchema(outSchema).addValues(ts1, "two").build(),
            Row.withSchema(outSchema).addValues(ts2, "twelve").build());
      pipeline.run();
    }
    

    Alternatively you can always do it in java, not in SQL, just apply a custom ParDo to the output of the SqlTransform. In that ParDo extract the integer timestamp from the Row object, convert it to DateTime and then emit it, e.g. as part of another row with a different schema.