Search code examples
scalaapache-sparkcassandratimeuuid

Casting cassandra timestamp column as timeuuid


I'm getting events from Kafka and storing into Cassandra. Parsing json which contains fields eventID, sessionID, timestamp, userID to create columns for Cassandra table which looks like this:

cassandra@cqlsh> CREATE TABLE mydata.events (
   ...     "event_date" date,
   ...     "eventID" text,
   ...     "userID" text,
   ...     timestamp timeuuid,
   ...     "sessionID" text,
   ...     "fullJson" text,
   ...     PRIMARY KEY ("event_date", timestamp, "sessionID")

and in code:

case class cassandraFormat(
                       eventID: String, 
                       sessionID: String,
                       timeuuid: UUID, // timestamp as timeuuid
                       userID: String,
                       event_date: LocalDate, // YYYY-MM-dd format
                       fullJson: String // full json from Kafka
                     )

I need to add timestamp column as timeuuid. Since I'm parsing from json, extracted all values from header and created columns in this fashion:

 val allJson = rdd.
            map(x => {
              implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
              //use serialization default to format a Map to JSON
              (x, Serialization.write(x))
            }).
            filter(x => x._1 isDefinedAt "header").
            map(x => (x._1("header"), x._2)).
            filter(x => (x._1 isDefinedAt "userID") &&
              (x._1 isDefinedAt "eventID") &&
              (x._1 isDefinedAt "sessionID") &&
              (x._1 isDefinedAt "timestamp").
            map(x => cassFormat(x._1("eventID").toString,
              x._1("sessionID").toString,
              com.datastax.driver.core.utils.UUIDs.startOf(x._1("timestamp").toString.toLong),
              x._1("userID").toString,
              com.datastax.driver.core.LocalDate.fromMillisSinceEpoch(x._1("timestamp").toString.toLong),
              x._2))

This part:

com.datastax.driver.core.utils.UUIDs.startOf(x._1("timestamp").toString.toLong)

is generating Error

java.lang.NumberFormatException: For input string: "2019-05-09T09:00:52.553+0000" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

Even tried: java.util.UUID.fromString(x._1("timestamp").toString, also generating same Error. How to properly cast/convert timestamp as timeuuid and insert into Cassandra via spark job


Solution

  • I managed to do it, converting timestamp format to dateTime and to millis, then generate uuid:

    val dateTimePattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
    val dateFormatter = DateTimeFormatter.ofPattern(dateTimePattern)
    
    val allJson = rdd.
                  map(x => {
                    implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
                    //use serialization default to format a Map to JSON
                    (x, Serialization.write(x))
                  }).
                  filter(x => x._1 isDefinedAt "header").
                  map(x => (x._1("header"), x._2)).
                  filter(x => (x._1 isDefinedAt "userID") &&
                    (x._1 isDefinedAt "eventID") &&
                    (x._1 isDefinedAt "sessionID") &&
                    (x._1 isDefinedAt "timestamp").
                  map(x => {
                    var millis: Long  = System.currentTimeMillis() // if timestamp format is invalid, put current timestamp instead
                    try {
                      val dateStr: String = x._1("timestamp").asInstanceOf[String]
                      // timestamp from event json
                      // create DateTime from Timestamp string
                      val dateTime: ZonedDateTime = ZonedDateTime.parse(dateStr, dateFormatter)
                      // create millis from DateTime
                      millis = dateTime.toInstant.toEpochMilli
                    } catch {
                      case e: Exception =>
                        e.printStackTrace()
                    }
                    // generate timeuuid
                    val uuid = new UUID(UUIDs.startOf(millis).getMostSignificantBits, random.nextLong)
                    // generate eventDate
                    val eventDate = com.datastax.driver.core.LocalDate.fromMillisSinceEpoch(millis)
                    cassFormat(x._1("eventID").toString,
                      x._1("sessionID").toString,
                      uuid,
                      x._1("userID").toString,
                      eventDate,
                      x._2)
                  })
                allJson.saveToCassandra(CASSANDRA_KEYSPACE_NAME, CASSANDRA_EVENTS_TABLE)
            }
          })
    

    timestamp column in cassandra now looks like: 58976340-7313-11e9-910d-60dce7513b94