Search code examples
apache-kafkadruidpydruid

protobuf ingestion in druid is only in running state but no data source is being created


I have done a simple druid setup using the quickstart compose file. I want to ingest protobuf from kafka to druid. I followed this link but no matter what i use for path in descriptor file URL it doesnt pickup,but in tasks it shows running.

this is my compose file

version: "2.2"

volumes:
  metadata_data: {}
  middle_var: {}
  historical_var: {}
  broker_var: {}
  coordinator_var: {}
  router_var: {}


services:
  postgres:
    container_name: postgres
    image: postgres:latest
    volumes:
      - metadata_data:/var/lib/postgresql/data
    environment:
      - POSTGRES_PASSWORD=FoolishPassword
      - POSTGRES_USER=druid
      - POSTGRES_DB=druid
      
  zookeeper:
    container_name: zookeeper
    image: zookeeper:3.5
    environment:
      - ZOO_MY_ID=1

  coordinator:
    image: apache/druid:0.20.0
    container_name: coordinator
    volumes:
      - /home/druid-data:/opt/data
      - coordinator_var:/opt/druid/var
    depends_on: 
      - zookeeper
      - postgres
    ports:
      - "8081:8081"
    command:
      - coordinator
    env_file:
      - environment
    extra_hosts: 
      - kafka-1:127.0.0.1
      - kafka-2:127.0.0.1
      - kafka-3:127.0.0.1

  broker:
    image: apache/druid:0.20.0
    container_name: broker
    volumes:
      - broker_var:/opt/druid/var
    depends_on: 
      - zookeeper
      - postgres
      - coordinator
    ports:
      - "8082:8082"
    command:
      - broker
    env_file:
      - environment
    extra_hosts: 
      - kafka-1:127.0.0.1
      - kafka-2:127.0.0.1
      - kafka-3:127.0.0.1

  historical:
    image: apache/druid:0.20.0
    container_name: historical
    volumes:
      - /mnt/druid-data:/opt/data
      - /mnt/druid-data/historical:/opt/druid/var
    
    depends_on: 
      - zookeeper
      - postgres
      - coordinator
    ports:
      - "8083:8083"
    command:
      - historical
    env_file:
      - environment
    extra_hosts: 
      - kafka-1:127.0.0.1
      - kafka-2:127.0.0.1
      - kafka-3:127.0.0.1
  middlemanager:
    image: apache/druid:0.20.0
    container_name: middlemanager
    volumes:
      - /home/druid-data:/opt/data
      - middle_var:/opt/druid/var
      - /home/druid-data/proto:/home
    depends_on: 
      - zookeeper
      - postgres
      - coordinator
    ports:
      - "8091:8091"
    command:
      - middleManager
    env_file:
      - environment
    extra_hosts: 
      - kafka-1:127.0.0.1
      - kafka-2:127.0.0.1
      - kafka-3:127.0.0.1

  router:
    image: apache/druid:0.20.0
    container_name: router
    volumes:
      - router_var:/opt/druid/var
      - /home/druid-data/proto:/home
    depends_on:
      - zookeeper
      - postgres
      - coordinator
    ports:
      - "8888:8888"
    command:
      - router
    env_file:
      - environment
    extra_hosts: 
      - kafka-1:127.0.0.1
      - kafka-2:127.0.0.1
      - kafka-3:127.0.0.1

I really dont understand why its not picking the the file descriptor from the kafka supervisor.I am using the same kafka supervisor mentioned in this link and the same proto file.

task logs

    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2020-12-15T11:33:57,320 INFO [task-runner-0-priority-0] org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.6.0
2020-12-15T11:33:57,320 INFO [task-runner-0-priority-0] org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 62abe01bee039651
2020-12-15T11:33:57,320 INFO [task-runner-0-priority-0] org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1608032037318
2020-12-15T11:33:57,322 INFO [task-runner-0-priority-0] org.apache.druid.server.coordination.CuratorDataSegmentServerAnnouncer - Announcing self[DruidServerMetadata{name='192.168.16.7:8100', hostAndPort='192.168.16.7:8100', hostAndTlsPort='null', maxSize=0, tier='_default_tier', type=indexer-executor, priority=0}] at [/druid/announcements<ip>:8100]
2020-12-15T11:33:57,329 INFO [task-runner-0-priority-0] org.apache.druid.curator.discovery.CuratorDruidNodeAnnouncer - Announced self [{"druidNode":{"service":"druid/middleManager","host":"192.168.16.7","bindOnHost":false,"plaintextPort":8100,"port":-1,"tlsPort":-1,"enablePlaintextPort":true,"enableTlsPort":false},"nodeType":"peon","services":{"dataNodeService":{"type":"dataNodeService","tier":"_default_tier","maxSize":0,"type":"indexer-executor","priority":0},"lookupNodeService":{"type":"lookupNodeService","lookupTier":"__default"}}}].
2020-12-15T11:33:57,383 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Initialized sequences: SequenceMetadata{sequenceId=0, sequenceName='index_kafka_metrics-protobuf_8ea4c14da35ee9d_0', assignments=[0], startOffsets={0=4}, exclusiveStartPartitions=[], endOffsets={0=9223372036854775807}, sentinel=false, checkpointed=false}
2020-12-15T11:33:57,385 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Adding partition[0], start[4] -> end[9223372036854775807] to assignment.
2020-12-15T11:33:57,387 INFO [task-runner-0-priority-0] org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-kafka-supervisor-mhchilod-1, groupId=kafka-supervisor-mhchilod] Subscribed to partition(s): protobuff-druid-test-py-0
2020-12-15T11:33:57,393 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[0] to[4].
2020-12-15T11:33:57,393 INFO [task-runner-0-priority-0] org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-kafka-supervisor-mhchilod-1, groupId=kafka-supervisor-mhchilod] Seeking to offset 4 for partition protobuff-druid-test-py-0
2020-12-15T11:33:57,685 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.security.basic.BasicSecurityResourceFilter to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,740 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.security.basic.authentication.endpoint.BasicAuthenticatorResource to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,768 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.security.basic.authorization.endpoint.BasicAuthorizerResource to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,770 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.server.http.security.StateResourceFilter to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,778 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.server.http.SegmentListerResource to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,781 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.server.http.HistoricalResource to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,782 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.server.QueryResource to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,785 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.segment.realtime.firehose.ChatHandlerResource to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,787 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.server.http.security.ConfigResourceFilter to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,792 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.query.lookup.LookupListeningResource to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,795 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.query.lookup.LookupIntrospectionResource to GuiceInstantiatedComponentProvider
2020-12-15T11:33:57,797 INFO [main] com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory - Binding org.apache.druid.server.StatusResource to GuiceManagedComponentProvider with the scope "Undefined"
2020-12-15T11:33:57,834 INFO [main] org.eclipse.jetty.server.handler.ContextHandler - Started o.e.j.s.ServletContextHandler@6248cfab{/,null,AVAILABLE}
2020-12-15T11:33:57,848 INFO [main] org.eclipse.jetty.server.AbstractConnector - Started ServerConnector@4a833595{HTTP/1.1, (http/1.1)}{0.0.0.0:8100}
2020-12-15T11:33:57,848 INFO [main] org.eclipse.jetty.server.Server - Started @5737ms
2020-12-15T11:33:57,849 INFO [main] org.apache.druid.java.util.common.lifecycle.Lifecycle - Starting lifecycle [module] stage [ANNOUNCEMENTS]
2020-12-15T11:33:58,054 INFO [main] org.apache.druid.java.util.common.lifecycle.Lifecycle - Successfully started lifecycle [module]
2020-12-15T11:33:58,144 INFO [task-runner-0-priority-0] org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-kafka-supervisor-mhchilod-1, groupId=kafka-supervisor-mhchilod] Cluster ID: 1Ay9hrPhR0qKn-CCrYihgg
2020-12-15T11:33:58,622 DEBUG [qtp758201484-122] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/status HTTP/1.1
2020-12-15T11:33:58,638 DEBUG [qtp758201484-113] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/checkpoints HTTP/1.1
2020-12-15T11:33:58,645 DEBUG [qtp758201484-133] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/time/start HTTP/1.1
2020-12-15T11:34:23,731 DEBUG [qtp758201484-120] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:34:53,731 DEBUG [qtp758201484-115] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:35:23,731 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:35:53,729 DEBUG [qtp758201484-120] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:36:23,730 DEBUG [qtp758201484-137] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:36:53,730 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:37:23,729 DEBUG [qtp758201484-121] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:37:53,731 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:38:23,729 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:38:53,730 DEBUG [qtp758201484-120] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:39:23,731 DEBUG [qtp758201484-142] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:39:53,729 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:40:23,730 DEBUG [qtp758201484-137] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:40:53,729 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:41:23,729 DEBUG [qtp758201484-137] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:41:53,730 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:42:23,731 DEBUG [qtp758201484-137] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:42:53,730 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:43:23,730 DEBUG [qtp758201484-121] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:43:53,729 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:44:23,730 DEBUG [qtp758201484-120] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:44:53,730 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:45:23,729 DEBUG [qtp758201484-120] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:45:53,731 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:46:23,731 DEBUG [qtp758201484-120] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:46:53,729 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:47:23,729 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:47:53,729 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:48:23,729 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:48:53,729 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:49:23,729 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:49:53,729 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:50:23,730 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:50:53,730 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:51:23,730 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:51:53,730 DEBUG [qtp758201484-127] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:52:23,730 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:52:53,731 DEBUG [qtp758201484-120] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:53:23,730 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:53:53,730 DEBUG [qtp758201484-121] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
2020-12-15T11:54:23,730 DEBUG [qtp758201484-132] org.apache.druid.jetty.RequestLog - <ip> GET /<ip>:8100/druid/worker/v1/chat/index_kafka_metrics-protobuf_8ea4c14da35ee9d_kmmggbig/offsets/current HTTP/1.1
    

Solution

  • So I figured out when ingesting from kafka if timestamps are same(a silly mistake from my side),then We wont be able to see new messages.after setting timestamp to real time rather than some hard coded data,it works like a charm.