Instructions for feeding netflow data into ELK
, assuming the stack is already up and running. Netflow data will be placed into separate daily indexes, using the pattern logstash-netflow.${YYYY.MM.DD}
.
curl -XPUT http://localhost:9200/_template/logstash-netflow9 -d '{ "template" : "logstash-netflow9-*", "order": 10, "settings": { "index.cache.field.type": "soft", "index.store.compress.stored": true }, "mappings" : { "_default_" : { "_all" : {"enabled" : false}, "properties" : { "@message": { "index": "analyzed", "type": "string" }, "@source": { "index": "not_analyzed", "type": "string" }, "@source_host": { "index": "not_analyzed", "type": "string" }, "@source_path": { "index": "not_analyzed", "type": "string" }, "@tags": { "index": "not_analyzed", "type": "string" }, "@timestamp": { "index": "not_analyzed", "type": "date" }, "@type": { "index": "not_analyzed", "type": "string" }, "netflow": { "dynamic": true, "properties": { "version": { "index": "analyzed", "type": "integer" }, "first_switched": { "index": "not_analyzed", "type": "date" }, "last_switched": { "index": "not_analyzed", "type": "date" }, "direction": { "index": "not_analyzed", "type": "integer" }, "flowset_id": { "index": "not_analyzed", "type": "integer" }, "flow_sampler_id": { "index": "not_analyzed", "type": "integer" }, "flow_seq_num": { "index": "not_analyzed", "type": "long" }, "src_tos": { "index": "not_analyzed", "type": "integer" }, "tcp_flags": { "index": "not_analyzed", "type": "integer" }, "protocol": { "index": "not_analyzed", "type": "integer" }, "ipv4_next_hop": { "index": "analyzed", "type": "ip" }, "in_bytes": { "index": "not_analyzed", "type": "long" }, "in_pkts": { "index": "not_analyzed", "type": "long" }, "out_bytes": { "index": "not_analyzed", "type": "long" }, "out_pkts": { "index": "not_analyzed", "type": "long" }, "input_snmp": { "index": "not_analyzed", "type": "long" }, "output_snmp": { "index": "not_analyzed", "type": "long" }, "ipv4_dst_addr": { "index": "analyzed", "type": "ip" }, "ipv4_src_addr": { "index": "analyzed", "type": "ip" }, "dst_mask": { "index": "analyzed", "type": "integer" }, "src_mask": { "index": "analyzed", "type": "integer" }, "dst_as": { "index": "analyzed", "type": "integer" }, "src_as": { "index": "analyzed", "type": "integer" }, "l4_dst_port": { "index": "not_analyzed", "type": "long" }, "l4_src_port": { "index": "not_analyzed", "type": "long" }, "ipv4_dst_addr_postnat": { "index": "analyzed", "type": "ip" }, "ipv4_src_addr_postnat": { "index": "analyzed", "type": "ip" }, "l4_dst_port_postnat": { "index": "not_analyzed", "type": "long" }, "l4_src_port_postnat": { "index": "not_analyzed", "type": "long" } }, "type": "object" } } } } }'
netflow.yaml
definitions from /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-codec-netflow-2.0.2/lib/logstash/codecs/netflow.rb
to /etc/logstash/mikrotik.netflow9.yaml
and patch the following: # Changed from 2->4 10: - 4 - :input_snmp 14: - 4 - :output_snmp # Changed from uint24->uint32 31: - :uint32 - :ipv6_flow_label # Add these entries: 225: - :ip4_addr - :ipv4_src_addr_postnat 226: - :ip4_addr - :ipv4_dst_addr_postnat 227: - :uint16 - :l4_src_port_postnat 228: - :uint16 - :l4_dst_port_postnat
input { udp { port => 9996 codec => netflow { versions => [9] definitions => "/etc/logstash/mikrotik.netflow9.yaml" } type => "netflow9" } } output { elasticsearch { hosts => ["localhost:9200"] codec => "json" index => "logstash-%{type}-%{+YYYY.MM.dd}" } }
logstash-netflow9-*
(if this pattern already exists and the mapping has been changed since, remember to hit the refresh
button to ensure the changed datatypes are picked up. Verify all fields have the right type.IP→Traffic Flow
on the desired interfaces, and add the logstash host as a netflow v9 target*
against the logstash-netflow9-*
index patternThe following warnings will show up briefly when logstash first starts. This is because the templates needed to understand the netflow messages are published in-band on a regular basis, and when logstash first starts up it might not have seen a copy of the templates before flow data is received. Once the template message is received (defaulting to 20 packets on the mikrtoik boards), these messages will cease:
Jan 28 22:40:08 silverhold logstash[9496]: {:timestamp=>"2016-01-28T22:40:08.238000+0000", :message=>"No matching template for flow id 257", :level=>:warn} Jan 28 22:40:09 silverhold logstash[9496]: {:timestamp=>"2016-01-28T22:40:09.266000+0000", :message=>"No matching template for flow id 256", :level=>:warn}