Search code examples
scalaapache-sparkcassandraspark-streamingspark-cassandra-connector

Adding custom codec to CassandraConnector


Is there a way to register custom codecs on instantiation of CassandraConnector?

I am currently registering my codecs each time I call cassandraConnector.withSessionDo

val cassandraConnector = CassandraConnector(ssc.sparkContext.getConf)
...
...
.mapPartitions(partition => {
  cassandraConnector.withSessionDo(session => {
    // register custom codecs once for each partition so it isn't loaded as often for each data point
    if (partition.nonEmpty) {
      session.getCluster.getConfiguration.getCodecRegistry
        .register(new TimestampLongCodec)
        .register(new SummaryStatsBlobCodec)
        .register(new JavaHistogramBlobCodec)
    }

It seems a bit like an anti pattern to do it this way. It also really clogs up our logs because we have a spark streaming service that is running every 30 seconds and it's filling our logs with:

16/11/01 14:14:44 WARN CodecRegistry: Ignoring codec SummaryStatsBlobCodec [blob <-> SummaryStats] because it collides with previously registered codec SummaryStatsBlobCodec [blob <-> SummaryStats]
16/11/01 14:14:44 WARN CodecRegistry: Ignoring codec JavaHistogramBlobCodec [blob <-> Histogram] because it collides with previously registered codec JavaHistogramBlobCodec [blob <-> Histogram]
16/11/01 14:14:44 WARN CodecRegistry: Ignoring codec TimestampLongCodec [timestamp <-> java.lang.Long] because it collides with previously registered codec TimestampLongCodec [timestamp <-> java.lang.Long]

Edit:

I've tried registering them immediately like this:

val cassandraConnector = CassandraConnector(ssc.sparkContext.getConf)
cassandraConnector.withClusterDo(cluster => {
  cluster.getConfiguration.getCodecRegistry
    .register(new TimestampLongCodec)
    .register(new SummaryStatsBlobCodec)
    .register(new JavaHistogramBlobCodec)
})

This ^ works locally but when deployed to our mesos cluster, it can't find the codecs. I'm assuming it's cause it only registers those locally in the driver, and never adds them to the executors version.


Solution

  • The better way is to override cassandra connection factory, something like this

    import com.datastax.driver.core.Cluster
    import com.datastax.spark.connector.cql.{CassandraConnectionFactory, CassandraConnectorConf, DefaultConnectionFactory}
    object MyConnectionFactory extends CassandraConnectionFactory {
      override def createCluster(conf: CassandraConnectorConf): Cluster = {
        val cluster = DefaultConnectionFactory.createCluster(conf)
        cluster.getConfiguration.getCodecRegistry
          .register(new TimestampLongCodec)
          .register(new SummaryStatsBlobCodec)
          .register(new JavaHistogramBlobCodec)
        cluster
      }
    }
    

    and set spark.cassandra.connection.factory parameter to point to the class