elk
This is an old revision of the document!
Netflow for Mikrotik Routers
Instructions for feeding netflow data into ELK
, assuming the stack is already up and running. Netflow data will be placed into separate daily indexes, using the pattern logstash-netflow.${YYYY.MM.DD}
.
- Define a template mapping for the netflow data, mapping the fields to the correct datatypes:
curl -XPUT http://localhost:9200/_template/logstash-netflow -d '{ "template" : "logstash-netflow-*", "order": 10, "settings": { "index.cache.field.type": "soft", "index.store.compress.stored": true }, "mappings" : { "_default_" : { "_all" : {"enabled" : false}, "properties" : { "@message": { "index": "analyzed", "type": "string" }, "@source": { "index": "not_analyzed", "type": "string" }, "@source_host": { "index": "not_analyzed", "type": "string" }, "@source_path": { "index": "not_analyzed", "type": "string" }, "@tags": { "index": "not_analyzed", "type": "string" }, "@timestamp": { "index": "not_analyzed", "type": "date" }, "@type": { "index": "not_analyzed", "type": "string" }, "netflow": { "dynamic": true, "properties": { "version": { "index": "analyzed", "type": "integer" }, "first_switched": { "index": "not_analyzed", "type": "date" }, "last_switched": { "index": "not_analyzed", "type": "date" }, "direction": { "index": "not_analyzed", "type": "integer" }, "flowset_id": { "index": "not_analyzed", "type": "integer" }, "flow_sampler_id": { "index": "not_analyzed", "type": "integer" }, "flow_seq_num": { "index": "not_analyzed", "type": "long" }, "src_tos": { "index": "not_analyzed", "type": "integer" }, "tcp_flags": { "index": "not_analyzed", "type": "integer" }, "protocol": { "index": "not_analyzed", "type": "integer" }, "ipv4_next_hop": { "index": "analyzed", "type": "ip" }, "in_bytes": { "index": "not_analyzed", "type": "long" }, "in_pkts": { "index": "not_analyzed", "type": "long" }, "out_bytes": { "index": "not_analyzed", "type": "long" }, "out_pkts": { "index": "not_analyzed", "type": "long" }, "input_snmp": { "index": "not_analyzed", "type": "long" }, "output_snmp": { "index": "not_analyzed", "type": "long" }, "ipv4_dst_addr": { "index": "analyzed", "type": "ip" }, "ipv4_src_addr": { "index": "analyzed", "type": "ip" }, "dst_mask": { "index": "analyzed", "type": "integer" }, "src_mask": { "index": "analyzed", "type": "integer" }, "dst_as": { "index": "analyzed", "type": "integer" }, "src_as": { "index": "analyzed", "type": "integer" }, "l4_dst_port": { "index": "not_analyzed", "type": "long" }, "l4_src_port": { "index": "not_analyzed", "type": "long" } }, "type": "object" } } } } }'
- Setup a listening UDP port to receive the UDP data, and feed it into the netflow indexes in elasticsearch:
- /etc/logstash/conf.d/50-netflow.conf
input { udp { port => 9995 codec => netflow { # Logstash doesn't support importing netflow v9 templates from the netflow device # and lacks built-in templates for id=256,257 leading to errors and no data versions => [5] } type => "netflow" } } output { elasticsearch { hosts => ["localhost:9200"] codec => "json" index => "logstash-%{type}-%{+YYYY.MM.dd}" } }
- In Kibana Settings, add a new index pattern for
logstash-netflow-*
(if this pattern already exists and the mapping has been changed since, remember to hit therefresh
button to ensure the changed datatypes are picked up. Verify all fields have the right type. - Enable
IP→Traffic Flow
on the desired interfaces, and add the logstash host as a netflow v5 target - Verify data is being indexed by doing a search on
*
against thelogstash-netflow-*
index pattern
elk.1453928112.txt.gz · Last modified: by ben