elk
This is an old revision of the document!
Netflow for Mikrotik Routers
Instructions for feeding netflow data into ELK, assuming the stack is already up and running. Netflow data will be placed into separate daily indexes, using the pattern logstash-netflow.${YYYY.MM.DD}.
- Define a template mapping for the netflow data, mapping the fields to the correct datatypes:
curl -XPUT http://localhost:9200/_template/logstash-netflow -d '{ "template" : "logstash-netflow-*", "order": 10, "settings": { "index.cache.field.type": "soft", "index.store.compress.stored": true }, "mappings" : { "_default_" : { "_all" : {"enabled" : false}, "properties" : { "@message": { "index": "analyzed", "type": "string" }, "@source": { "index": "not_analyzed", "type": "string" }, "@source_host": { "index": "not_analyzed", "type": "string" }, "@source_path": { "index": "not_analyzed", "type": "string" }, "@tags": { "index": "not_analyzed", "type": "string" }, "@timestamp": { "index": "not_analyzed", "type": "date" }, "@type": { "index": "not_analyzed", "type": "string" }, "netflow": { "dynamic": true, "properties": { "version": { "index": "analyzed", "type": "integer" }, "first_switched": { "index": "not_analyzed", "type": "date" }, "last_switched": { "index": "not_analyzed", "type": "date" }, "direction": { "index": "not_analyzed", "type": "integer" }, "flowset_id": { "index": "not_analyzed", "type": "integer" }, "flow_sampler_id": { "index": "not_analyzed", "type": "integer" }, "flow_seq_num": { "index": "not_analyzed", "type": "long" }, "src_tos": { "index": "not_analyzed", "type": "integer" }, "tcp_flags": { "index": "not_analyzed", "type": "integer" }, "protocol": { "index": "not_analyzed", "type": "integer" }, "ipv4_next_hop": { "index": "analyzed", "type": "ip" }, "in_bytes": { "index": "not_analyzed", "type": "long" }, "in_pkts": { "index": "not_analyzed", "type": "long" }, "out_bytes": { "index": "not_analyzed", "type": "long" }, "out_pkts": { "index": "not_analyzed", "type": "long" }, "input_snmp": { "index": "not_analyzed", "type": "long" }, "output_snmp": { "index": "not_analyzed", "type": "long" }, "ipv4_dst_addr": { "index": "analyzed", "type": "ip" }, "ipv4_src_addr": { "index": "analyzed", "type": "ip" }, "dst_mask": { "index": "analyzed", "type": "integer" }, "src_mask": { "index": "analyzed", "type": "integer" }, "dst_as": { "index": "analyzed", "type": "integer" }, "src_as": { "index": "analyzed", "type": "integer" }, "l4_dst_port": { "index": "not_analyzed", "type": "long" }, "l4_src_port": { "index": "not_analyzed", "type": "long" } }, "type": "object" } } } } }'
- Setup a listening UDP port to receive the UDP data, and feed it into the netflow indexes in elasticsearch:
- /etc/logstash/conf.d/50-netflow.conf
input { udp { port => 9995 codec => netflow { versions => [5] } type => "netflow" } } output { elasticsearch { hosts => ["localhost:9200"] codec => "json" index => "logstash-%{type}-%{+YYYY.MM.dd}" } }
elk.1453926174.txt.gz · Last modified: by ben
