Search code examples
elasticsearchhigh-availabilityfluentd

Fluentd high-availability setup and duplicates


I have the following problem:

We are using fluentd in a high-availability setup: a few K of forwarders -> aggregators for geo region and ES/S3 at the end using copy plugin. We encountered a failure (logs were not going through for a couple of days) and since the recovery, we are getting tons of duplicated records from fluent to our ES cluster (including duplicated data from after the recovery). Are there any known issues with the @type copy plugin that could cause this kind of behaviour?

Our forwarders' config:

# TCP input
<source>
    @type forward
    port X
</source>

# Logs Forwarding
<match XXX>
    @type forward

    # forward to logs-aggregators
        <server> 
#...
        </server>

    # use tcp for heartbeat
    heartbeat_type tcp

    # use longer flush_interval to reduce CPU usage.
    # note that this is a trade-off against latency.
    flush_interval 10s

    # use file buffer to buffer events on disks.
    # max 4096 8MB chunks = 32GB of buffer space
    buffer_type file
    buffer_path /var/log/td-agent/buffer/forward
    buffer_chunk_limit 4m
    buffer_queue_limit 4096

    # use multi-threading to send buffered data in parallel
    num_threads 8

    # expire DNS cache (required for cloud environment such as EC2)
    expire_dns_cache 600
</match>

Our aggregators' config:

# TCP input
<source>
    @type forward
    port X
</source>

# rsyslog
<source>
  @type syslog
  port X
  tag  rsyslog
</source>

# Logs storage
<match rsyslog.**>
  @type copy
  <store>
    @type elasticsearch
    hosts X
    logstash_format true
    logstash_prefix rsyslog
    logstash_dateformat %Y-%m-%d


    num_threads 8

    utc_index true
    reload_on_failure true
  </store>
</match>

# Bids storage
<match X>
  @type copy

  # push data to elasticsearch cluster
  <store>
    @type elasticsearch
    hosts X

    # save like logstash would
    logstash_format true
    logstash_prefix jita
    logstash_dateformat %Y-%m-%d

    # 64G of buffer data
    buffer_chunk_limit 16m
    buffer_queue_limit 4096
    flush_interval 5m

    num_threads 8

    # everything in UTC
    utc_index true

    #  quickly remove a dead node from the list of addresses
    reload_on_failure true
  </store>

  # additionally store data in s3 bucket
  <store>
    @type s3
    aws_key_id X
    aws_sec_key X
    s3_bucket X
    s3_region X
    s3_object_key_format %{path}/%{time_slice}_%{index}.%{file_extension}
    store_as gzip
    num_threads 8
    path logs
    buffer_path /var/log/td-agent/buffer/s3
    time_slice_format %Y-%m-%d/%H
    time_slice_wait 10m
    utc
  </store>
</match>

Solution

  • The solution for us was adding an identity field and defining it in the fluentd configuration (elasticsearch plugin) using id_key. Since then we have no more problems no matter how hard td-agent retries :)