Search code examples
elasticsearchlogstashnetflow

Logstash Netflow Module listening, but not reading packets


Logstash 6.2.4 with the netflow module

Elasticsearch Version: 6.2.4

Ubuntu 16.04 LTS

I have an issue where logstash is listening on the correct port, but does not seem to be collecting the netflow data and passing it to elasticsearch.

The routers in our network are sending their netflow data to Server A and nfcap is listening on port 9995 so trying to run logstash with the netflow module on Server A results in an address in use error. So, I am using iptables to duplicate the packets and forwarding them to a different server, Server B, like this.

iptables -t mangle -A PREROUTING -p udp --dport 9995 -j TEE --gateway <Server B ip address>

Checking with tcpdump, I can see the duplicated packets being received by Server B, with Server A's IP address. Output is below, but for security reasons I've redacted the IP addresses.

tcpdump -i eno1 -n dst port 9995
12:49:49.130772 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.131067 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.133504 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.133527 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1392
12:49:49.133533 IP <Router 1 ip address>.10005 > <Server A ip address>.9995: UDP, length 1260
12:49:49.391871 IP <Router 2 ip address>.62500 > <Server A ip address>.9995: UDP, length 1452
12:49:49.391894 IP <Router 2 ip address>.62500 > <Server A ip address>.9995: UDP, length 1368

So, I know that Server B is receiving the packets on port 9995. Checking with netstat also shows this.

netstat -an | grep 9995
udp        0      0 0.0.0.0:9995            0.0.0.0:*

logstash.yml is as follows

node.name: server-b
path.data: /var/lib/logstash
http.host: "0.0.0.0"

modules:
  - name: netflow
    var.input.udp.port: 9995 # Inbound connections
    var.elasticsearch.hosts: "<ip address>:9200"
    var.kibana.host: "<ip address>:5601"

path.logs: /var/log/logstash   

Checking /var/log/logstash/logstash-plain.log, the only warning I see is about the version of elasticsearch being greater than version 6 so type won't be used to determine the document type.

[2018-07-06T12:58:13,771][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.2.4"}
[2018-07-06T12:58:13,817][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-07-06T12:58:17,599][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"module-netflow", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-07-06T12:58:17,733][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://<ip address>:9200/]}}
[2018-07-06T12:58:17,734][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://<ip address>:9200/, :path=>"/"}
[2018-07-06T12:58:17,784][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://<ip address>:9200/"}
[2018-07-06T12:58:17,810][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>6}
[2018-07-06T12:58:17,810][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>6}
[2018-07-06T12:58:17,811][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//<ip address>:9200"]}
[2018-07-06T12:58:18,088][INFO ][logstash.filters.geoip   ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,101][INFO ][logstash.filters.geoip   ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,102][INFO ][logstash.filters.geoip   ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,103][INFO ][logstash.filters.geoip   ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,103][INFO ][logstash.filters.geoip   ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,103][INFO ][logstash.filters.geoip   ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,104][INFO ][logstash.filters.geoip   ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-07-06T12:58:18,104][INFO ][logstash.filters.geoip   ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-ASN.mmdb"}
[2018-07-06T12:58:18,120][INFO ][logstash.inputs.udp      ] Starting UDP listener {:address=>"0.0.0.0:9995"}
[2018-07-06T12:58:18,126][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"module-netflow", :thread=>"#<Thread:0x16700849@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:247 sleep>"}
[2018-07-06T12:58:18,131][INFO ][logstash.inputs.udp      ] UDP listener started {:address=>"0.0.0.0:9995", :receive_buffer_bytes=>"212992", :queue_size=>"2000"}
[2018-07-06T12:58:18,135][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["module-netflow"]}

ElasticSearch is running and is receiving data from packetbeat and filebeat, there is nothing in /var/log/elasticsearch/elasticsearch.log to sugguest any error with elasticsearch. However, elasticsearch does not have an index pattern for netflow. Kibana on the other hand, does.

So, logstash on Server B is listening on 0.0.0.0:9995, port 9995 is open and receiving packets from Server A, but logstash is not recognizing these packets. My assumption is that Server B is ignoring them because the destination IP address is that of Server A. Does that sound right? If so, is there away round this?

Is there a better way to forward the duplicated packets from Server A to Server B and have logstash read them?

Unfortunately, adding another netflow exporter destination to the router configs is not possible.


Solution

  • I'll answer my own question.

    Server B was indeed ignoring the netflow data because it did not recognise the IP address. I added server A's ip address as a loopback interface and it works as expected.

    That's probably not the best solution and one to avoid in a production environment, but for testing purposes it should be fine.