Search code examples
regexparsingapache-kafkafluentdrsyslog

Rsyslog unable to send multiline logs


I'm unable to push the below logs via rsyslog. The rsyslog is only forwarding one line of the log.

Kafka-server logs:

[2022-07-25 11:43:45,091] INFO KafkaConfig values:
    advertised.host.name = null
    advertised.listeners = INTERNAL://0.0.0.0:9092,BROKER://0.0.0.0:9091,CLIENT://0.0.0.0:9093
    advertised.port = null
    alter.config.policy.class.name = null
    alter.log.dirs.replication.quota.window.num = 11
    alter.log.dirs.replication.quota.window.size.seconds = 1
    authorizer.class.name =
    auto.create.topics.enable = true
    auto.leader.rebalance.enable = true
    background.threads = 10
    broker.id = 0
    broker.id.generation.enable = true
    broker.rack = null
    client.quota.callback.class = null
    compression.type = producer
    connection.failed.authentication.delay.ms = 100
    connections.max.idle.ms = 600000
    connections.max.reauth.ms = 0
    control.plane.listener.name = null
    controlled.shutdown.enable = true
    controlled.shutdown.max.retries = 3
    controlled.shutdown.retry.backoff.ms = 5000
    controller.socket.timeout.ms = 30000
    create.topic.policy.class.name = null
    default.replication.factor = 1
    delegation.token.expiry.check.interval.ms = 3600000
    delegation.token.expiry.time.ms = 86400000
    delegation.token.master.key = null
    delegation.token.max.lifetime.ms = 604800000
    delete.records.purgatory.purge.interval.requests = 1
    delete.topic.enable = true
    fetch.purgatory.purge.interval.requests = 1000
    group.initial.rebalance.delay.ms = 3000
    group.max.session.timeout.ms = 1800000
    group.max.size = 2147483647
    group.min.session.timeout.ms = 6000
    host.name =
    inter.broker.listener.name = BROKER
    inter.broker.protocol.version = 2.3-IV1
    kafka.metrics.polling.interval.secs = 10
    kafka.metrics.reporters = []
    leader.imbalance.check.interval.seconds = 300
    leader.imbalance.per.broker.percentage = 10
    listener.security.protocol.map = INTERNAL:PLAINTEXT,BROKER:PLAINTEXT,CLIENT:PLAINTEXT
    listeners = INTERNAL://:9092,BROKER://:9091,CLIENT://:9093
    log.cleaner.backoff.ms = 15000
    log.cleaner.dedupe.buffer.size = 134217728
    log.cleaner.delete.retention.ms = 86400000
    log.cleaner.enable = true
    log.cleaner.io.buffer.load.factor = 0.9
    log.cleaner.io.buffer.size = 524288
    log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
    log.cleaner.max.compaction.lag.ms = 9223372036854775807
    log.cleaner.min.cleanable.ratio = 0.5
    log.cleaner.min.compaction.lag.ms = 0
    log.cleaner.threads = 1
    log.cleanup.policy = [delete]
    log.dir = /tmp/kafka-logs
    log.dirs = /var/lib/kafka
    log.flush.interval.messages = 9223372036854775807
    log.flush.interval.ms = null
    log.flush.offset.checkpoint.interval.ms = 60000
    log.flush.scheduler.interval.ms = 9223372036854775807
    log.flush.start.offset.checkpoint.interval.ms = 60000
    log.index.interval.bytes = 4096
    log.index.size.max.bytes = 10485760
    log.message.downconversion.enable = true
    log.message.format.version = 2.3-IV1
    log.message.timestamp.difference.max.ms = 9223372036854775807
    log.message.timestamp.type = CreateTime
    log.preallocate = false
    log.retention.bytes = -1
    log.retention.check.interval.ms = 300000
    log.retention.hours = 120
    log.retention.minutes = null
    log.retention.ms = null
    log.roll.hours = 168
    log.roll.jitter.hours = 0
    log.roll.jitter.ms = null
    log.roll.ms = null
    log.segment.bytes = 1073741824
    log.segment.delete.delay.ms = 60000
    max.connections = 2147483647
    max.connections.per.ip = 2147483647
    max.connections.per.ip.overrides =
    max.incremental.fetch.session.cache.slots = 1000
    message.max.bytes = 1000012
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    min.insync.replicas = 1
    num.io.threads = 8
    num.network.threads = 3
    num.partitions = 1
    num.recovery.threads.per.data.dir = 1
    num.replica.alter.log.dirs.threads = null
    num.replica.fetchers = 1
    offset.metadata.max.bytes = 4096
    offsets.commit.required.acks = -1
    offsets.commit.timeout.ms = 5000
    offsets.load.buffer.size = 5242880
    offsets.retention.check.interval.ms = 600000
    offsets.retention.minutes = 10080
    offsets.topic.compression.codec = 0
    offsets.topic.num.partitions = 50
    offsets.topic.replication.factor = 1
    offsets.topic.segment.bytes = 104857600
    password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
    password.encoder.iterations = 4096
    password.encoder.key.length = 128
    password.encoder.keyfactory.algorithm = null
    password.encoder.old.secret = null
    password.encoder.secret = null
    port = 9092
    principal.builder.class = null
    producer.purgatory.purge.interval.requests = 1000
    queued.max.request.bytes = -1
    queued.max.requests = 500
    quota.consumer.default = 9223372036854775807
    quota.producer.default = 9223372036854775807
    quota.window.num = 11
    quota.window.size.seconds = 1
    replica.fetch.backoff.ms = 1000
    replica.fetch.max.bytes = 1048576
    replica.fetch.min.bytes = 1
    replica.fetch.response.max.bytes = 10485760
    replica.fetch.wait.max.ms = 500
    replica.high.watermark.checkpoint.interval.ms = 5000
    replica.lag.time.max.ms = 10000
    replica.socket.receive.buffer.bytes = 65536
    replica.socket.timeout.ms = 30000
    replication.quota.window.num = 11
    replication.quota.window.size.seconds = 1
    request.timeout.ms = 30000
    reserved.broker.max.id = 1000
    sasl.client.callback.handler.class = null
    sasl.enabled.mechanisms = [GSSAPI]
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.principal.to.local.rules = [DEFAULT]
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism.inter.broker.protocol = GSSAPI
    sasl.server.callback.handler.class = null
    security.inter.broker.protocol = PLAINTEXT
    socket.receive.buffer.bytes = 102400
    socket.request.max.bytes = 104857600
    socket.send.buffer.bytes = 102400
    ssl.cipher.suites = []
    ssl.client.auth = none
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.principal.mapping.rules = [DEFAULT]
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
    transaction.max.timeout.ms = 900000
    transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
    transaction.state.log.load.buffer.size = 5242880
    transaction.state.log.min.isr = 2
    transaction.state.log.num.partitions = 50
    transaction.state.log.replication.factor = 3
    transaction.state.log.segment.bytes = 104857600
    transactional.id.expiration.ms = 604800000
    unclean.leader.election.enable = false
    zookeeper.connect = 0.0.0.0:2181
    zookeeper.connection.timeout.ms = 18000
    zookeeper.max.in.flight.requests = 10
    zookeeper.session.timeout.ms = 6000
    zookeeper.set.acl = false
    zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
[2022-07-25 11:43:45,145] ERROR Fatal error during SupportedServerStartable startup. Prepare to shutdown (io.confluent.support.metrics.SupportedKafka)
java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address.
    at scala.Predef$.require(Predef.scala:224)
    at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1492)
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1460)
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1114)
    at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:1094)
    at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:1091)
    at kafka.server.KafkaConfig.fromProps(KafkaConfig.scala)
    at io.confluent.support.metrics.SupportedServerStartable.<init>(SupportedServerStartable.java:52)
    at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:45)

rsysconf.d/10kafka.conf


$InputFilePollInterval 1
input(type="imfile"
  File="/var/log/kafka/server.log"
  Tag="app-error"
  Severity="error"
  startmsg.regex="^[[:digit:]]{4}-[[:digit:]]{2}"
)

*.* @Fluentdvmip:5142  

Can someone please guide how to send complete logs from rsys to Fluentd? Below is the regex which I'll be using in fluentD configuration.

https://regex101.com/r/NaNVcr/1

or do we need to modify kafka log4j properties to have proper logging?


Solution

  • Instead of sending Kafka server logs into RSYS. First, I converted Kakfa logs into Json format. Created a Jar using this Link

    Steps Followed

    1. Git clone the above link in VM where kafka was installed

    2. Installed maven latest version

    3. Ran mvn package inside the recently cloned directory

    4. A jar was created inside target directory "log4j-json-layout-1.0-SNAPSHOT.jar"

    5. Copied this jar to /usr/share/java/kafka/

    6. Added the below code in /etc/kafka/log4j.properties and commented the older code

      log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log log4j.appender.kafkaAppender.layout=de.thmshmm.log4j.JsonLayout log4j.appender.kafkaAppender.layout.DatePattern=yyyy-MM-dd HH:mm:ss

    7. restarted confluent-kafka service

    Before logs:

    enter image description here

    After Logs:

    enter image description here