Search code examples
scalaakkaapache-kafkaconsumer

Akka - Measure time of consumer


I'm developing a system that pulls messages from a JMS(the consumer) and push it to a Kafka Topic(the producer).

Since my consumer stays alive waiting for new messages arriving in the JMS queue and push it to Kafka, how can I effectively measure how many messages I can pull by second?

Here is my code:

My Consumer:

class ActiveMqConsumerActor extends Consumer {

  var startTime: Long = _

  val log = Logging(context.system, this)

  val producerActor = context.actorOf(Props[KafkaProducerActor])

  override def autoAck = false
  override def endpointUri: String = "activemq:KafkaTest"

  override def receive: Receive = LoggingReceive {
    case msg: CamelMessage =>
      val camelMsg = msg.bodyAs[String]
      producerActor ! Message(camelMsg.getBytes)
      sender() ! Ack

    case ex: Exception => sender() ! Failure(ex)

    case _ =>
      log.error("Got a message that I don't understand")
      sender() ! Failure(new Exception("Got a message that I don't understand"))
  }
}

The main:

object ActiveMqConsumerTest extends App {

  val system = ActorSystem("KafkaSystem")
  val camel = CamelExtension(system)
  val camelContext = camel.context

  camelContext.addComponent("activemq", ActiveMQComponent.activeMQComponent("tcp://0.0.0.0:61616"))

  val consumer = system.actorOf(Props[ActiveMqConsumerActor].withRouter(FromConfig), "consumer")
  val producer = system.actorOf(Props[KafkaProducerActor].withRouter(FromConfig), "producer")

}

Thanks


Solution

  • You can try using something like "Metrics". https://dropwizard.github.io/metrics/3.1.0/manual/ You can define precise metrics, including time and use that inside of your actor.