Search code examples
javasqlscalaignite

Unable to perform Ignite SQL query over [CustomKey, CustomValue] cache in Scala.


I am trying to setup a distributed cache using Apache Ignite with Scala. After setting up the cache, I am able to put and get items knowing the key, but SQL queries of any type returns always a cursor with null iterator.

Here is how I setup my cache (please note that this is done before the ignition.start):

def setupTelemetryCache(): CacheConfiguration[TelemetryKey, TelemetryValue] = {

val dataRegionName = "persistent-region"
val cacheName = "telemetry-cache"

// This object is required to perform SQL queries over custom key object
val queryEntity = new QueryEntity("TelemetryKey", "TelemetryValue")
val fields: util.LinkedHashMap[String, String] = new util.LinkedHashMap[String, String]
fields.put("deviceId", classOf[String].getName)
fields.put("metricName", classOf[String].getName)
fields.put("timestamp", classOf[String].getName)
queryEntity.setFields(fields)
val keyFields: util.HashSet[String] = new util.HashSet[String]()
keyFields.add("deviceId")
keyFields.add("metricName")
keyFields.add("timestamp")
queryEntity.setKeyFields(keyFields)
queryEntity.setIndexes(Collections.emptyList[QueryIndex]())

new CacheConfiguration()
  .setName(cacheName)
  .setDataRegionName(dataRegionName)
  .setCacheMode(CacheMode.PARTITIONED) // Data is split among nodes
  .setBackups(1) // each partition has 1 backup
  .setIndexedTypes(classOf[String], classOf[TelemetryKey])  // Index by ID
  .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC) // Faster, clients do not wait for cache
  // synchronization. Consistency issues?
  .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) // Allows transactional query
  .setQueryEntities(Collections.singletonList(queryEntity)) 
}

And those are the code of my TelemetryKey:

case class TelemetryKey private (
                              @(AffinityKeyMapped @field)
                              @(QuerySqlField@field)(index = true)
                              deviceId: String,
                              @(QuerySqlField@field)(index = false)
                              metricName: String,
                              @(QuerySqlField@field)(index = true)
                              timestamp: String) extends Serializable 

And TelemetryValue:

class TelemetryValue private(valueType: ValueTypes.Value, doubleValue: Option[Double],
                             stringValue: Option[String],
                             longValue: Option[Long]) extends Serializable

A sample SQL query I have to achieve could be "Select * from CACHE where deviceId = 'dev1234'" and I expect to receive all the Cache.Entry[TelemetryKey, TelemetryValue] of the same deviceId

Here is how I perform the query:

 private def sqlQuery(query: SqlQuery[TelemetryKey, TelemetryValue]):
  QueryCursor[Cache.Entry[TelemetryKey, TelemetryValue]] = {
    cache.query(query)
  }

  def getEntries(ofDeviceId: String):
  QueryCursor[Cache.Entry[TelemetryKey, TelemetryValue]] = {
    val q = new SqlQuery[TelemetryKey, TelemetryValue](classOf[TelemetryKey], "deviceId = ?")
    sqlQuery(q.setArgs(ofDeviceId))
  }

Even changing the body of the query i receive a cursor object which is empty. I cannot even perform a "Select *" query.

Thanks for the help


Solution

  • There are two ways to configure indexes and queryable fields.

    1. Annotation based configuration
      Your key and value classes need to be annotated @QuerySqlField as follows.
    
        case class TelemetryKey private (
        @(AffinityKeyMapped @field)
        @(QuerySqlField@field)(index = true)
        deviceId: String,
        @(QuerySqlField@field)(index = false)
        metricName: String,
        @(QuerySqlField@field)(index = true)
        timestamp: String) extends Serializable
    

    After indexed and queryable fields are defined, they have to be registered in the SQL engine along with the object types they belong to.

    
        new CacheConfiguration()
        .setName(cacheName)
        .setDataRegionName(dataRegionName)
        .setCacheMode(CacheMode.PARTITIONED)
        .setBackups(1)
        .setIndexedTypes(classOf[TelemetryKey], classOf[TelemetryValue])
        .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC)
        .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
    

    UPD: One more thing that should be fixed as well is your SqlQuery

      def getEntries(ofDeviceId: String):
      QueryCursor[Cache.Entry[TelemetryKey, TelemetryValue]] = {
        val q = new SqlQuery[TelemetryKey, TelemetryValue](classOf[TelemetryValue], "deviceId = ?")
        sqlQuery(q.setArgs(ofDeviceId))
      }
    
    1. QueryEntity based approach
    val queryEntity = new QueryEntity(classOf[TelemetryKey], classOf[TelemetryValue]);
    
    
        new CacheConfiguration()
        .setName(cacheName)
        .setDataRegionName(dataRegionName)
        .setCacheMode(CacheMode.PARTITIONED)
        .setBackups(1)
        .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC)
        .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
        .setQueryEntities(Collections.singletonList(queryEntity))