Search code examples
apache-sparkspark-streamingdruid

Cannot write to Druid through SparkStreaming and Tranquility


I am trying to write results from Spark Streaming job to Druid datasource. Spark successfully completes its jobs and hands to Druid. Druid starts indexing but does not write anything.

My code and logs are as follows:

import org.apache.spark._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
impor org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import scala.util.parsing.json._
import com.metamx.tranquility.spark.BeamRDD._
import org.joda.time.{DateTime, DateTimeZone}


object MyDirectStreamDriver {
  def main(args:Array[String]) {

    val sc = new SparkContext()

    val ssc = new StreamingContext(sc, Minutes(5))

     val kafkaParams = Map[String, Object](
              "bootstrap.servers" -> "[$hadoopURL]:6667",
              "key.deserializer" -> classOf[StringDeserializer],
              "value.deserializer" -> classOf[StringDeserializer],
              "group.id" -> "use_a_separate_group_id_for_each_stream",
              "auto.offset.reset" -> "latest",
              "enable.auto.commit" -> (false: java.lang.Boolean)

    )


    val eventStream = KafkaUtils.createDirectStream[String, String](
                         ssc,
                         PreferConsistent,
                         Subscribe[String, String](Array("events_test"), kafkaParams)) 


    val t = eventStream.map(record => record.value).flatMap(_.split("(?<=\\}),(?=\\{)")).
                           map(JSON.parseRaw(_).getOrElse(new JSONObject(Map(""-> ""))).asInstanceOf[JSONObject]).                  
                           map( new DateTime(), x => (x.obj.getOrElse("OID", "").asInstanceOf[String], x.obj.getOrElse("STATUS", "").asInstanceOf[Double].toInt)).
                           map(x => MyEvent(x._1, x._2, x._3))
    t.saveAsTextFiles("/user/username/result", "txt")
    t.foreachRDD(rdd => rdd.propagate(new MyEventBeamFactory)) 


    ssc.start
    ssc.awaitTermination
  }
}

case class MyEvent (time: DateTime,oid: String,  status: Int)
{

  @JsonValue
  def toMap: Map[String, Any] = Map(
    "timestamp" -> (time.getMillis / 1000),
    "oid" -> oid,
    "status" -> status
  )
}  
object MyEvent {
    implicit val MyEventTimestamper = new Timestamper[MyEvent] {
    def timestamp(a: MyEvent) = a.time
  }

    val Columns = Seq("time", "oid",  "status")

    def fromMap(d: Dict): MyEvent = {
    MyEvent(
       new DateTime(long(d("timestamp")) * 1000), 
      str(d("oid")),          
      int(d("status"))
    )  
  }
}

    import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.BoundedExponentialBackoffRetry
import io.druid.granularity._
import io.druid.query.aggregation.LongSumAggregatorFactory
import com.metamx.common.Granularity
import org.joda.time.Period

class MyEventBeamFactory extends BeamFactory[MyEvent]
{
  // Return a singleton, so the same connection is shared across all tasks in the same JVM.
  def makeBeam: Beam[MyEvent] = MyEventBeamFactory.BeamInstance

  object MyEventBeamFactory {
  val BeamInstance: Beam[MyEvent] = {
    // Tranquility uses ZooKeeper (through Curator framework) for coordination.
    val curator = CuratorFrameworkFactory.newClient(
      "{IP_2}:2181",
      new BoundedExponentialBackoffRetry(100, 3000, 5)
    )
    curator.start()

    val indexService = DruidEnvironment("druid/overlord") // Your overlord's druid.service, with slashes replaced by colons.
    val discoveryPath = "/druid/discovery"     // Your overlord's druid.discovery.curator.path
    val dataSource = "events_druid"
    val dimensions = IndexedSeq("oid")
    val aggregators = Seq(new LongSumAggregatorFactory("status", "status"))

    // Expects simpleEvent.timestamp to return a Joda DateTime object.
    DruidBeams
      .builder((event: MyEvent) => event.time)
      .curator(curator)
      .discoveryPath(discoveryPath)
      .location(DruidLocation(indexService, dataSource))
      .rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularities.MINUTE))
      .tuning(
        ClusteredBeamTuning(
          segmentGranularity = Granularity.HOUR,
          windowPeriod = new Period("PT10M"),
          partitions = 1,
          replicants = 1
        )
      )
      .buildBeam()
  }   
}
}

This is the druid indexing task log: (index_realtime_events_druid_2017-12-28T13:00:00.000Z_0_0)

    2017-12-28T13:05:19,299 INFO [main] io.druid.indexing.worker.executor.ExecutorLifecycle - Running with task: {
  "type" : "index_realtime",
  "id" : "index_realtime_events_druid_2017-12-28T13:00:00.000Z_0_0",
  "resource" : {
    "availabilityGroup" : "events_druid-2017-12-28T13:00:00.000Z-0000",
    "requiredCapacity" : 1
  },
  "spec" : {
    "dataSchema" : {
      "dataSource" : "events_druid",
      "parser" : {
        "type" : "map",
        "parseSpec" : {
          "format" : "json",
          "timestampSpec" : {
            "column" : "timestamp",
            "format" : "iso",
            "missingValue" : null
          },
          "dimensionsSpec" : {
            "dimensions" : [ "oid" ],
            "spatialDimensions" : [ ]
          }
        }
      },
      "metricsSpec" : [ {
        "type" : "longSum",
        "name" : "status",
        "fieldName" : "status",
        "expression" : null
      } ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "HOUR",
        "queryGranularity" : {
          "type" : "duration",
          "duration" : 60000,
          "origin" : "1970-01-01T00:00:00.000Z"
        },
        "rollup" : true,
        "intervals" : null
      }
    },
    "ioConfig" : {
      "type" : "realtime",
      "firehose" : {
        "type" : "clipped",
        "delegate" : {
          "type" : "timed",
          "delegate" : {
            "type" : "receiver",
            "serviceName" : "firehose:druid:overlord:events_druid-013-0000-0000",
            "bufferSize" : 100000
          },
          "shutoffTime" : "2017-12-28T14:15:00.000Z"
        },
        "interval" : "2017-12-28T13:00:00.000Z/2017-12-28T14:00:00.000Z"
      },
      "firehoseV2" : null
    },
    "tuningConfig" : {
      "type" : "realtime",
      "maxRowsInMemory" : 75000,
      "intermediatePersistPeriod" : "PT10M",
      "windowPeriod" : "PT10M",
      "basePersistDirectory" : "/tmp/1514466313873-0",
      "versioningPolicy" : {
        "type" : "intervalStart"
      },
      "rejectionPolicy" : {
        "type" : "none"
      },
      "maxPendingPersists" : 0,
      "shardSpec" : {
        "type" : "linear",
        "partitionNum" : 0
      },
      "indexSpec" : {
        "bitmap" : {
          "type" : "concise"
        },
        "dimensionCompression" : "lz4",
        "metricCompression" : "lz4",
        "longEncoding" : "longs"
      },
      "buildV9Directly" : true,
      "persistThreadPriority" : 0,
      "mergeThreadPriority" : 0,
      "reportParseExceptions" : false,
      "handoffConditionTimeout" : 0,
      "alertTimeout" : 0
    }
  },
  "context" : null,
  "groupId" : "index_realtime_events_druid",
  "dataSource" : "events_druid"
}
2017-12-28T13:05:19,312 INFO [main] io.druid.indexing.worker.executor.ExecutorLifecycle - Attempting to lock file[/apps/druid/tasks/index_realtime_events_druid_2017-12-28T13:00:00.000Z_0_0/lock].
2017-12-28T13:05:19,313 INFO [main] io.druid.indexing.worker.executor.ExecutorLifecycle - Acquired lock file[/apps/druid/tasks/index_realtime_events_druid_2017-12-28T13:00:00.000Z_0_0/lock] in 1ms.
2017-12-28T13:05:19,317 INFO [task-runner-0-priority-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Running task: index_realtime_events_druid_2017-12-28T13:00:00.000Z_0_0
2017-12-28T13:05:19,323 INFO [task-runner-0-priority-0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_realtime_events_druid_2017-12-28T13:00:00.000Z_0_0] location changed to [TaskLocation{host='hadooptest9.{host}', port=8100}].
2017-12-28T13:05:19,323 INFO [task-runner-0-priority-0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_realtime_events_druid_2017-12-28T13:00:00.000Z_0_0] status changed to [RUNNING].
2017-12-28T13:05:19,327 INFO [main] org.eclipse.jetty.server.Server - jetty-9.3.19.v20170502
2017-12-28T13:05:19,350 INFO [task-runner-0-priority-0] io.druid.segment.realtime.plumber.RealtimePlumber - Creating plumber using rejectionPolicy[io.druid.segment.realtime.plumber.NoopRejectionPolicyFactory$1@7925d517]
2017-12-28T13:05:19,351 INFO [task-runner-0-priority-0] io.druid.server.coordination.CuratorDataSegmentServerAnnouncer - Announcing self[DruidServerMetadata{name='hadooptest9.{host}:8100', host='hadooptest9.{host}:8100', maxSize=0, tier='_default_tier', type='realtime', priority='0'}] at [/druid/announcements/hadooptest9.{host}:8100]
2017-12-28T13:05:19,382 INFO [task-runner-0-priority-0] io.druid.segment.realtime.plumber.RealtimePlumber - Expect to run at [2017-12-28T14:10:00.000Z]
2017-12-28T13:05:19,392 INFO [task-runner-0-priority-0] io.druid.segment.realtime.plumber.RealtimePlumber - Starting merge and push.
2017-12-28T13:05:19,392 INFO [task-runner-0-priority-0] io.druid.segment.realtime.plumber.RealtimePlumber - Found [0] segments. Attempting to hand off segments that start before [1970-01-01T00:00:00.000Z].
2017-12-28T13:05:19,392 INFO [task-runner-0-priority-0] io.druid.segment.realtime.plumber.RealtimePlumber - Found [0] sinks to persist and merge
2017-12-28T13:05:19,451 INFO [task-runner-0-priority-0] io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory - Connecting firehose: firehose:druid:overlord:events_druid-013-0000-0000
2017-12-28T13:05:19,453 INFO [task-runner-0-priority-0] io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory - Found chathandler of class[io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider]
2017-12-28T13:05:19,453 INFO [task-runner-0-priority-0] io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider - Registering Eventhandler[firehose:druid:overlord:events_druid-013-0000-0000]
2017-12-28T13:05:19,454 INFO [task-runner-0-priority-0] io.druid.curator.discovery.CuratorServiceAnnouncer - Announcing service[DruidNode{serviceName='firehose:druid:overlord:events_druid-013-0000-0000', host='hadooptest9.{host}', port=8100}]
2017-12-28T13:05:19,502 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Registering com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider as a provider class
2017-12-28T13:05:19,502 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Registering com.fasterxml.jackson.jaxrs.smile.JacksonSmileProvider as a provider class
2017-12-28T13:05:19,502 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Registering io.druid.server.initialization.jetty.CustomExceptionMapper as a provider class
2017-12-28T13:05:19,502 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Registering io.druid.server.StatusResource as a root resource class
2017-12-28T13:05:19,505 INFO [main] com.sun.jersey.server.impl.application.WebApplicationImpl - Initiating Jersey application, version 'Jersey: 1.19.3 10/24/2016 03:43 PM'
2017-12-28T13:05:19,515 INFO [task-runner-0-priority-0] io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider - Registering Eventhandler[events_druid-013-0000-0000]
2017-12-28T13:05:19,515 INFO [task-runner-0-priority-0] io.druid.curator.discovery.CuratorServiceAnnouncer - Announcing service[DruidNode{serviceName='events_druid-013-0000-0000', host='hadooptest9.{host}', port=8100}]
2017-12-28T13:05:19,529 WARN [task-runner-0-priority-0] org.apache.curator.utils.ZKPaths - The version of ZooKeeper being used doesn't support Container nodes. CreateMode.PERSISTENT will be used instead.
2017-12-28T13:05:19,535 INFO [task-runner-0-priority-0] io.druid.server.metrics.EventReceiverFirehoseRegister - Registering EventReceiverFirehoseMetric for service [firehose:druid:overlord:events_druid-013-0000-0000]
2017-12-28T13:05:19,536 INFO [task-runner-0-priority-0] io.druid.data.input.FirehoseFactory - Firehose created, will shut down at: 2017-12-28T14:15:00.000Z
2017-12-28T13:05:19,574 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding io.druid.server.initialization.jetty.CustomExceptionMapper to GuiceManagedComponentProvider with the scope "Singleton"
2017-12-28T13:05:19,576 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider to GuiceManagedComponentProvider with the scope "Singleton"
2017-12-28T13:05:19,583 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding com.fasterxml.jackson.jaxrs.smile.JacksonSmileProvider to GuiceManagedComponentProvider with the scope "Singleton"
2017-12-28T13:05:19,845 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding io.druid.server.http.security.StateResourceFilter to GuiceInstantiatedComponentProvider
2017-12-28T13:05:19,863 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding io.druid.server.http.SegmentListerResource to GuiceInstantiatedComponentProvider
2017-12-28T13:05:19,874 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding io.druid.server.QueryResource to GuiceInstantiatedComponentProvider
2017-12-28T13:05:19,876 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding io.druid.segment.realtime.firehose.ChatHandlerResource to GuiceInstantiatedComponentProvider
2017-12-28T13:05:19,880 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding io.druid.query.lookup.LookupListeningResource to GuiceInstantiatedComponentProvider
2017-12-28T13:05:19,882 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding io.druid.query.lookup.LookupIntrospectionResource to GuiceInstantiatedComponentProvider
2017-12-28T13:05:19,883 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding io.druid.server.StatusResource to GuiceManagedComponentProvider with the scope "Undefined"
2017-12-28T13:05:19,896 WARN [main] com.sun.jersey.spi.inject.Errors - The following warnings have been detected with resource and/or provider classes:
  WARNING: A HTTP GET method, public void io.druid.server.http.SegmentListerResource.getSegments(long,long,long,javax.servlet.http.HttpServletRequest) throws java.io.IOException, MUST return a non-void type.
2017-12-28T13:05:19,905 INFO [main] org.eclipse.jetty.server.handler.ContextHandler - Started o.e.j.s.ServletContextHandler@2fba0dac{/,null,AVAILABLE}
2017-12-28T13:05:19,914 INFO [main] org.eclipse.jetty.server.AbstractConnector - Started ServerConnector@25218a4d{HTTP/1.1,[http/1.1]}{0.0.0.0:8100}
2017-12-28T13:05:19,914 INFO [main] org.eclipse.jetty.server.Server - Started @6014ms
2017-12-28T13:05:19,915 INFO [main] io.druid.java.util.common.lifecycle.Lifecycle$AnnotationBasedHandler - Invoking start method[public void io.druid.server.listener.announcer.ListenerResourceAnnouncer.start()] on object[io.druid.query.lookup.LookupResourceListenerAnnouncer@426710f0].
2017-12-28T13:05:19,919 INFO [main] io.druid.server.listener.announcer.ListenerResourceAnnouncer - Announcing start time on [/druid/listeners/lookups/__default/hadooptest9.{host}:8100]
2017-12-28T13:05:20,517 WARN [task-runner-0-priority-0] io.druid.segment.realtime.firehose.PredicateFirehose - [0] InputRow(s) ignored as they do not satisfy the predicate

This is index_realtime_events_druid_2017-12-28T13:00:00.000Z_0_0 payload:

{
"task":"index_realtime_events_druid_2017-12-28T13:00:00.000Z_0_0","payload":{
"id":"index_realtime_events_druid_2017-12-28T13:00:00.000Z_0_0","resource":{
"availabilityGroup":"events_druid-2017-12-28T13:00:00.000Z-0000","requiredCapacity":1},"spec":{
"dataSchema":{
"dataSource":"events_druid","parser":{
"type":"map","parseSpec":{
"format":"json","timestampSpec":{
"column":"timestamp","format":"iso","missingValue":null},"dimensionsSpec":{
"dimensions":["oid"],"spatialDimensions":[]}}},"metricsSpec":[{
"type":"longSum","name":"status","fieldName":"status","expression":null}],"granularitySpec":{
"type":"uniform","segmentGranularity":"HOUR","queryGranularity":{
"type":"duration","duration":60000,"origin":"1970-01-01T00:00:00.000Z"},"rollup":true,"intervals":null}},"ioConfig":{
"type":"realtime","firehose":{
"type":"clipped","delegate":{
"type":"timed","delegate":{
"type":"receiver","serviceName":"firehose:druid:overlord:events_druid-013-0000-0000","bufferSize":100000},"shutoffTime":"2017-12-28T14:15:00.000Z"},"interval":"2017-12-28T13:00:00.000Z/2017-12-28T14:00:00.000Z"},"firehoseV2":null},"tuningConfig":{
"type":"realtime","maxRowsInMemory":75000,"intermediatePersistPeriod":"PT10M","windowPeriod":"PT10M","basePersistDirectory":"/tmp/1514466313873-0","versioningPolicy":{
"type":"intervalStart"},"rejectionPolicy":{
"type":"none"},"maxPendingPersists":0,"shardSpec":{
"type":"linear","partitionNum":0},"indexSpec":{
"bitmap":{
"type":"concise"},"dimensionCompression":"lz4","metricCompression":"lz4","longEncoding":"longs"},"buildV9Directly":true,"persistThreadPriority":0,"mergeThreadPriority":0,"reportParseExceptions":false,"handoffConditionTimeout":0,"alertTimeout":0}},"context":null,"groupId":"index_realtime_events_druid","dataSource":"events_druid"}}

This is end of spark job stderr

        50:09 INFO ZooKeeper: Client environment:os.version=3.10.0-514.10.2.el7.x86_64
    17/12/28 14:50:09 INFO ZooKeeper: Client environment:user.name=yarn
    17/12/28 14:50:09 INFO ZooKeeper: Client environment:user.home=/home/yarn
    17/12/28 14:50:09 INFO ZooKeeper: Client environment:user.dir=/data1/hadoop/yarn/local/usercache/hdfs/appcache/application_1512485869804_6924/container_e58_1512485869804_6924_01_000002
    17/12/28 14:50:09 INFO ZooKeeper: Initiating client connection, connectString={IP2}:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@5967905
    17/12/28 14:50:09 INFO ClientCnxn: Opening socket connection to server {IP2}/{IP2}:2181. Will not attempt to authenticate using SASL (unknown error)
    17/12/28 14:50:09 INFO ClientCnxn: Socket connection established, initiating session, client: /{IP6}:42704, server: {IP2}/{IP2}:2181
    17/12/28 14:50:09 INFO ClientCnxn: Session establishment complete on server {IP2}/{IP2}:2181, sessionid = 0x25fa4ea15980119, negotiated timeout = 40000
    17/12/28 14:50:10 INFO ConnectionStateManager: State change: CONNECTED
    17/12/28 14:50:10 INFO Version: HV000001: Hibernate Validator 5.1.3.Final
    17/12/28 14:50:10 INFO JsonConfigurator: Loaded class[class io.druid.guice.ExtensionsConfig] from props[druid.extensions.] as [ExtensionsConfig{searchCurrentClassloader=true, directory='extensions', hadoopDependenciesDir='hadoop-dependencies', hadoopContainerDruidClasspath='null', loadList=null}]
    17/12/28 14:50:10 INFO LoggingEmitter: Start: started [true]
    17/12/28 14:50:11 INFO FinagleRegistry: Adding resolver for scheme[disco].
    17/12/28 14:50:11 INFO CachedKafkaConsumer: Initial fetch for spark-executor-use_a_separate_group_id_for_each_stream events_test 0 6658
    17/12/28 14:50:12 INFO ClusteredBeam: Global latestCloseTime[2017-12-28T12:00:00.000Z] for identifier[druid:overlord/events_druid] has moved past timestamp[2017-12-28T12:00:00.000Z], not creating merged beam
    17/12/28 14:50:12 INFO ClusteredBeam: Turns out we decided not to actually make beams for identifier[druid:overlord/events_druid] timestamp[2017-12-28T12:00:00.000Z]. Returning None.
    17/12/28 14:50:12 WARN MapPartitioner: Cannot partition object of class[class MyEvent] by time and dimensions. Consider implementing a Partitioner.
    17/12/28 14:50:12 INFO ClusteredBeam: Global latestCloseTime[2017-12-28T12:00:00.000Z] for identifier[druid:overlord/events_druid] has moved past timestamp[2017-12-28T12:00:00.000Z], not creating merged beam
    17/12/28 14:50:12 INFO ClusteredBeam: Turns out we decided not to actually make beams for identifier[druid:overlord/events_druid] timestamp[2017-12-28T12:00:00.000Z]. Returning None.
    17/12/28 14:50:12 INFO ClusteredBeam: Global latestCloseTime[2017-12-28T12:00:00.000Z] for identifier[druid:overlord/events_druid] has moved past timestamp[2017-12-28T12:00:00.000Z], not creating merged beam
    17/12/28 14:50:12 INFO ClusteredBeam: Turns out we decided not to actually make beams for identifier[druid:overlord/events_druid] timestamp[2017-12-28T12:00:00.000Z]. Returning None.
    17/12/28 14:50:12 INFO ClusteredBeam: Global latestCloseTime[2017-12-28T12:00:00.000Z] for identifier[druid:overlord/events_druid] has moved past timestamp[2017-12-28T12:00:00.000Z], not creating merged beam
    17/12/28 14:50:12 INFO ClusteredBeam: Turns out we decided not to actually make beams for identifier[druid:overlord/events_druid] timestamp[2017-12-28T12:00:00.000Z]. Returning None.
    17/12/28 14:50:12 INFO ClusteredBeam: Global latestCloseTime[2017-12-28T12:00:00.000Z] for identifier[druid:overlord/events_druid] has moved past timestamp[2017-12-28T12:00:00.000Z], not creating merged beam
    17/12/28 14:50:12 INFO ClusteredBeam: Turns out we decided not to actually make beams for identifier[druid:overlord/events_druid] timestamp[2017-12-28T12:00:00.000Z]. Returning None.
    17/12/28 14:50:16 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1541 bytes result sent to driver

I have also written result to a text file to make sure data is coming and formatted. Here are a few lines of text file:

MyEvent(2017-12-28T16:10:00.387+03:00,0010,1)
MyEvent(2017-12-28T16:10:00.406+03:00,0030,1)
MyEvent(2017-12-28T16:10:00.417+03:00,0010,1)
MyEvent(2017-12-28T16:10:00.431+03:00,0010,1)
MyEvent(2017-12-28T16:10:00.448+03:00,0010,1)
MyEvent(2017-12-28T16:10:00.464+03:00,0030,1)    

Help is much appreciated. Thanks.


Solution

  • This problem was solved by adding timestampSpec to DruidBeams as such:

    DruidBeams
          .builder((event: MyEvent) => event.time)
          .curator(curator)
          .discoveryPath(discoveryPath)
          .location(DruidLocation(indexService, dataSource))
          .rollup(DruidRollup(SpecificDruidDimensions(dimensions), aggregators, QueryGranularities.MINUTE))
          .tuning(
            ClusteredBeamTuning(
              segmentGranularity = Granularity.HOUR,
              windowPeriod = new Period("PT10M"),
              partitions = 1,
              replicants = 1
            )
          )
          .timestampSpec(new TimestampSpec("timestamp", "posix", null))
          .buildBeam()