Search code examples
apache-kafkaapache-kafka-connectamazon-kinesisaws-msk

Log more data from Kafka Source Connector


I installed the Kinesis Connector plugin in AWS, to connect my Kinesis Stream with my MSK cluster. It is working, but the logging that I see in Cloudwatch is not very helpful:

[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-0|offsets] WorkerSourceTask{id=msk-connector-kinesis-0} Finished commitOffsets successfully in 8 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,149] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Finished commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)

How can I enhance the logging that I get? For example, to see the number of records that are being written into the cluster per minute.

I saw the documentation https://docs.confluent.io/platform/current/connect/logging.html and I think that I am seeing the default that are being written into stdout. But I am not sure how to change the loglevel of the Kinesis connector to display more information.

This is my current configuration:

name=msk-connector-kinesis
connector.class=io.confluent.connect.kinesis.KinesisSourceConnector
tasks.max=3

kafka.topic=my-topic
kinesis.region=eu-central-1
kinesis.stream=kinesis_stream_eu-central-1

confluent.topic.bootstrap.servers=<server1>:9098,<server2>:9098,<server3>:9098
confluent.topic.replication.factor=3

security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.consumer.security.protocol=SASL_SSL
confluent.topic.consumer.sasl.mechanism=AWS_MSK_IAM
confluent.topic.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.producer.security.protocol=SASL_SSL
confluent.topic.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
confluent.topic.producer.sasl.mechanism=AWS_MSK_IAM
confluent.topic.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.storage.StringConverter


errors.log.enable=true
errors.log.include.messages=true
errors.tolerance=all

wefe


Solution

  • The required metric by the OP

    "number of records that are being written into the cluster per minute"

    can be found in Cloudwatch as part of the metrics emitted by MSK Connect. This is the case since you are running your Connector via the MSK Connect feature (see comments of OP in the question).

    Since this is a Source Connector (pushes data to MSK from a source), the records you are looking for are SourceRecordPollRate and SourceRecordWriteRate.

    An example of the query of these metrics:

    enter image description here

    (taken from the AWS Big Data Blog)

    I don't think the original lead the OP posted is valid (try to increase logging to get the metrics like the producer rates of the Connector), but nevertheless is worth mentioning that MSK Connect will push the Connector generated log records for severity levels INFO, WARN, ERROR and FATAL to either Cloudwatch Logs, Amazon S3 or a Kinesis Firehose Stream.

    As per the provided inputs from the OP this is working as expected:

    [Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-0|offsets] WorkerSourceTask{id=msk-connector-kinesis-0} Finished commitOffsets successfully in 8 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)
    [Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485)
    [Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502)
    [Worker-07578247b0d45ad42] [2023-08-09 07:51:20,149] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Finished commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)