Search code examples
postgresqlapache-kafkaapache-kafka-connectconfluent-platformdebezium

Confluent Control Center not intercepting stream


I'm using CCC with a Kafka stream, which is populated by the Postgres Connector from Debezium.

I'm using the following docker-compose.yml:

version: '2'
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-1
    container_name: zookeeper-1
    volumes:
    - /path/to/something/zk1/zk-data:/var/lib/zookeeper/data
    - /path/to/something/zk1/zk-txn-logs:/var/lib/zookeeper/log
    ports:
    - 22181:22181
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 22181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888

  zookeeper-2:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-2
    container_name: zookeeper-2
    volumes:
    - /path/to/something/zk2/zk-data:/var/lib/zookeeper/data
    - /path/to/something/zk2/zk-txn-logs:/var/lib/zookeeper/log
    ports:
    - 32181:32181
    environment:
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888

  zookeeper-3:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-3
    container_name: zookeeper-3
    volumes:
    - /path/to/something/zk3/zk-data:/var/lib/zookeeper/data
    - /path/to/something/zk3/zk-txn-logs:/var/lib/zookeeper/log
    ports:
    - 42181:42181
    environment:
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_CLIENT_PORT: 42181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper-1:22888:23888;zookeeper-2:32888:33888;zookeeper-3:42888:43888

  kafka-1:
    image: confluentinc/cp-enterprise-kafka:latest
    hostname: kafka-1
    container_name: kafka-1
    volumes:
    - /path/to/something/kafka1/kafka-data:/var/lib/kafka/data
    ports:
    - 19092:19092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.71:19092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
      KAFKA_REPLICA_FETCH_MAX_BYTES: 3145728
      KAFKA_MESSAGE_MAX_BYTES: 3145728
      KAFKA_PRODUCER_MAX_REQUEST_SIZE: 3145728
      KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES: 3145728    
    depends_on:
    - zookeeper-1
    - zookeeper-2
    - zookeeper-3

  kafka-2:
    image: confluentinc/cp-enterprise-kafka:latest
    hostname: kafka-2
    container_name: kafka-2
    volumes:
    - /path/to/something/kafka2/kafka-data:/var/lib/kafka/data
    ports:
    - 19093:19093
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.71:19093
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
      KAFKA_REPLICA_FETCH_MAX_BYTES: 3145728
      KAFKA_MESSAGE_MAX_BYTES: 3145728
      KAFKA_PRODUCER_MAX_REQUEST_SIZE: 3145728
      KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES: 3145728
    depends_on:
    - zookeeper-1
    - zookeeper-2
    - zookeeper-3

  kafka-3:
    image: confluentinc/cp-enterprise-kafka:latest
    hostname: kafka-3
    container_name: kafka-3
    volumes:
    - /path/to/something/kafka3/kafka-data:/var/lib/kafka/data
    ports:
    - 19094:19094
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.71:19094
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
      KAFKA_REPLICA_FETCH_MAX_BYTES: 3145728
      KAFKA_MESSAGE_MAX_BYTES: 3145728
      KAFKA_PRODUCER_MAX_REQUEST_SIZE: 3145728
      KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES: 3145728
    depends_on:
    - zookeeper-1
    - zookeeper-2
    - zookeeper-3

  schema-registry:
    image: confluentinc/cp-schema-registry:latest
    hostname: schema-registry
    container_name: schema-registry
    ports:
    - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181

  connect:
    image: confluentinc/cp-kafka-connect:latest
    hostname: connect
    container_name: connect
    depends_on:
      - schema-registry
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
      - kafka-1
      - kafka-2
      - kafka-3
    ports:
    - "8083:8083"
    volumes:
    - /path/to/something/postgres-source-connector:/usr/share/java/postgres-source-connector
    - /path/to/something/mongodb-sink-connector:/usr/share/java/mongodb-sink-connector
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: '/usr/share/java'
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.0.0.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PRODUCER_MAX_REQUEST_SIZE: 3145728
      CONNECT_CONSUMER_MAX_PARTITION_FETCH_BYTES: 3145728

  control-center:
    image: confluentinc/cp-enterprise-control-center:latest
    hostname: control-center
    container_name: control-center
    depends_on:
      - schema-registry
      - connect
      - ksql-server
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
      - kafka-1
      - kafka-2
      - kafka-3
    ports:
    - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
      CONTROL_CENTER_ZOOKEEPER_CONNECT: zookeeper-1:22181,zookeeper-2:32181,zookeeper-3:42181
      CONTROL_CENTER_CONNECT_CLUSTER: 'http://connect:8083'
      CONTTROL_CENTER_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONTROL_CENTER_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONTROL_CENTER_KSQL_URL: "http://ksql-server:8088"
      CONTROL_CENTER_CONNECT_CLUSTER: "http://connect:8083"
      CONTROL_CENTER_KSQL_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "https://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      CONTROL_CENTER_CUB_KAFKA_TIMEOUT: 300
      PORT: 9021

  ksql-server:
    image: confluentinc/cp-ksql-server:latest
    hostname: ksql-server
    container_name: ksql-server
    depends_on:
    - connect
    ports:
    - "8088:8088"
    environment:
      KSQL_CUB_KAFKA_TIMEOUT: 300
      KSQL_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_COMMIT_INTERVAL_MS: 2000
      KSQL_KSQL_CACHE_MAX_BYTES_BUFFERING: 10000000
      KSQL_KSQL_AUTO_OFFSET_RESET: earliest

  ksql-cli:
    image: confluentinc/cp-ksql-cli:latest
    hostname: ksql-cli
    container_name: ksql-cli
    depends_on:
    - connect
    - ksql-server
    entrypoint: /bin/sh
    tty: true

  rest-proxy:
    image: confluentinc/cp-kafka-rest:latest
    hostname: rest-proxy
    container_name: rest-proxy
    depends_on:
    - schema-registry
    ports:
    - 8082:8082
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: kafka-1:19092,kafka-2:19093,kafka-3:19094
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      KAFKA_REST_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS'
      KAFKA_REST_ACCESS_CONTROL_ALLOW_ORIGIN: '*'

  postgres:
    image: debezium/postgres
    hostname: postgres
    container_name: postgres
    volumes:
      - /path/to/something/postgres:/var/lib/postgresql/data
    environment:
      POSTGRES_USER: admin
      POSTGRES_PASSWORD: admin
      POSTGRES_DB: some-db
    ports:
      - 5432:5432

I've mapped the Postgres Connector into Kafka Connect (via volumes in Compose), and can see it in CCC when creating a new Source Connector.

When I create a Source connector, I can see the log message indicating that the topic for this connector was created. I also see this topic in CCC's Connect area. I also can see that Connect is able to authenticate to Postgres via this Connector.

When I make a change to the table I specify in the Connector, I see Kafka (I have a cluster of 3) figuring out who's going to store this message. Meaning, the Postgres tx log created a message of the appropriate topic in response to my change, so the DB, Connector and Kafka are working correctly.

However, no matter what I do, I cannot get this event to display in the Data Streams or System Health (neither the > Topics nor > Brokers areas) (edit: this works now. Data Streams still does not).

I'm at a loss for what's going wrong. The only indication I get is the initial message saying

Double check to see if monitoring interceptors have been properly configured for any clients producing to or consuming from the cluster controlcenter.cluster

I am under the impression that this means essentially that my Control Center container is configured with the *_INTERCEPTOR_CLASSES, which I pasted above. I followed the link on this message, which takes you to their Documentation site, which says to check for the response of the web service which gives kafka data. As their documentation suggests, I get a response of just {}, indicating that Kafka is saying it has no data. But it definitely does.

Is it trying to say that I also need these interceptors configured into the Connector somehow? I don't know what it means to have monitoring interceptors for any consumers/producers -- I don't have any raw Java consumers/producers (yet)... only Source connectors for now.

My connector configuration is as follows (created via the CCC UI) if it matters:

{
  "database.server.name": "my-namespace",
  "database.dbname": "my-database",
  "database.hostname": "my-hostname",
  "database.port": "5432",
  "database.user": "admin",
  "schema.whitelist": "public",
  "table.whitelist": "my-database.my-table",
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "name": "my-connector",
  "database.password": "its correct"
}

When starting all of the services, I see the following in the corresponding logs which I suspect may be of interest (in no particular order below):

control-center     | 2018-09-17T20:45:02.748463792Z     interceptor.classes = []
kafka-2            | 2018-09-17T20:44:56.293701931Z     interceptor.classes = []
schema-registry    | 2018-09-17T20:45:34.658065846Z     interceptor.classes = []
connect            | 2018-09-17T20:48:52.628218936Z [2018-09-17 20:48:52,628] WARN The configuration 'producer.interceptor.classes' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
connect            | 2018-09-17T20:48:52.628472218Z [2018-09-17 20:48:52,628] WARN The configuration 'consumer.interceptor.classes' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)

Any help is appreciated. Thanks!


Solution

  • You are referencing 5.1.0 JAR for the interceptors, which does not exist in the latest image. If you docker-compose exec connect bash and go to the path defined, you'll see which version is there (currently 5.0.0 in latest). So change your compose to read

    CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.0.0.jar
    

    Have a look at https://github.com/rmoff/ksql/blob/clickstream-c3/ksql-clickstream-demo/docker-compose.yml for an example of a working Docker Compose with Confluent Control Center and interceptors working with Kafka Connect (and also KSQL, if you're interested).

    For debugging further, check:

    1. Kafka Connect log file - if the interceptors are working you should see

      [2018-03-02 11:39:38,594] INFO ConsumerConfig values:
      [...]
              interceptor.classes = [io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor]
      
      [2018-03-02 11:39:38,806] INFO ProducerConfig values:
      [...]
              interceptor.classes = [io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor]
      
      [2018-03-02 11:39:39,455] INFO creating interceptor (io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor:74)
      [2018-03-02 11:39:39,456] INFO creating interceptor (io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor:70)
      [2018-03-02 11:39:39,486] INFO MonitoringInterceptorConfig values:
              confluent.monitoring.interceptor.publishMs = 15000
              confluent.monitoring.interceptor.topic = _confluent-monitoring
      (io.confluent.monitoring.clients.interceptor.MonitoringInterceptorConfig:223)
      [2018-03-02 11:39:39,486] INFO MonitoringInterceptorConfig values:
              confluent.monitoring.interceptor.publishMs = 15000
              confluent.monitoring.interceptor.topic = _confluent-monitoring
      (io.confluent.monitoring.clients.interceptor.MonitoringInterceptorConfig:223)
      
    2. See the Confluent Control Center troubleshooting doc for details of the control-center-console-consumer you can use for checking the actual interceptor data being received (or not, if things aren't set up correctly).