From bdfd72792ed023f67ea10df5b92ec1a0761eed8e Mon Sep 17 00:00:00 2001 From: modev2301 Date: Mon, 14 Jul 2025 00:56:45 -0500 Subject: [PATCH 01/63] feat(sources): add NetFlow/IPFIX/sFlow source - Add comprehensive NetFlow v5, v9, IPFIX, and sFlow parsing - Implement template caching for template-based protocols - Add defensive parsing to prevent infinite loops on malformed packets - Include extensive test coverage with 15+ test cases - Add configuration examples and documentation - Support multicast groups and configurable protocols - Add raw data inclusion for debugging This source supports all major flow protocols used in network monitoring and observability, with robust error handling and performance optimizations. --- Cargo.toml | 4 +- config/examples/netflow.yaml | 91 + docs/sources/netflow.md | 223 ++ src/sources/mod.rs | 2 + src/sources/netflow.rs | 2423 ++++++++++++++++++++++ tests/data/netflow/netflow_v5_sample.bin | 3 + tests/integration/netflow.rs | 164 ++ 7 files changed, 2909 insertions(+), 1 deletion(-) create mode 100644 config/examples/netflow.yaml create mode 100644 docs/sources/netflow.md create mode 100644 src/sources/netflow.rs create mode 100644 tests/data/netflow/netflow_v5_sample.bin create mode 100644 tests/integration/netflow.rs diff --git a/Cargo.toml b/Cargo.toml index c598761dd75ad..50a756636feb6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -591,7 +591,8 @@ sources-logs = [ "sources-file_descriptor", "sources-redis", "sources-socket", - "sources-splunk_hec", +"sources-netflow", +"sources-splunk_hec", "sources-stdin", "sources-syslog", "sources-vector", @@ -652,6 +653,7 @@ sources-prometheus-pushgateway = ["sinks-prometheus", "sources-utils-http", "vec sources-pulsar = ["dep:apache-avro", "dep:pulsar"] sources-redis = ["dep:redis"] sources-socket = ["sources-utils-net", "tokio-util/net"] +sources-netflow = ["sources-utils-net-udp", "tokio-util/net", "dep:base64"] sources-splunk_hec = ["dep:roaring"] sources-statsd = ["sources-utils-net", "tokio-util/net"] sources-stdin = ["tokio-util/io"] diff --git a/config/examples/netflow.yaml b/config/examples/netflow.yaml new file mode 100644 index 0000000000000..c8a32a86b6c32 --- /dev/null +++ b/config/examples/netflow.yaml @@ -0,0 +1,91 @@ +# NetFlow source configuration example +# This source collects network flow data from NetFlow/IPFIX/sFlow exporters + +sources: + netflow: + type: netflow + address: "0.0.0.0:2055" # Default NetFlow port + max_length: 65536 # Maximum packet size + protocols: + - "netflow_v5" + - "netflow_v9" + - "ipfix" + - "sflow" + include_raw_data: false # Set to true for debugging + max_templates: 1000 # Maximum templates to cache per observation domain + template_timeout_secs: 3600 # Template cache timeout (1 hour) + multicast_groups: # Optional: join multicast groups + - "224.0.0.2" + - "224.0.0.4" + receive_buffer_bytes: 262144 # Optional: set socket receive buffer + +# Example transforms to process the flow data +transforms: + netflow_enrich: + type: remap + inputs: ["netflow"] + source: | + # Add timestamp if not present + if !exists(.timestamp) { + .timestamp = now() + } + + # Add source information + .source = "netflow" + + # Parse IP addresses for better visualization + if exists(.src_addr) { + .src_ip = .src_addr + } + if exists(.dst_addr) { + .dst_ip = .dst_addr + } + + # Add protocol names + if exists(.protocol) { + .protocol_name = match(.protocol) { + 1 => "ICMP", + 6 => "TCP", + 17 => "UDP", + _ => "Unknown" + } + } + +# Example sinks to send the data +sinks: + console: + type: console + inputs: ["netflow_enrich"] + encoding: + codec: json + + elasticsearch: + type: elasticsearch + inputs: ["netflow_enrich"] + endpoints: ["http://localhost:9200"] + index: "netflow-%Y.%m.%d" + compression: gzip + + prometheus: + type: prometheus_remote_write + inputs: ["netflow_enrich"] + endpoint: "http://localhost:9090/api/v1/write" + default_namespace: "netflow" + metrics: + - name: "netflow_packets_total" + type: "counter" + inputs: ["netflow_enrich"] + field: "packets" + tags: + src_ip: ".src_ip" + dst_ip: ".dst_ip" + protocol: ".protocol_name" + + - name: "netflow_bytes_total" + type: "counter" + inputs: ["netflow_enrich"] + field: "octets" + tags: + src_ip: ".src_ip" + dst_ip: ".dst_ip" + protocol: ".protocol_name" \ No newline at end of file diff --git a/docs/sources/netflow.md b/docs/sources/netflow.md new file mode 100644 index 0000000000000..ab193bc654944 --- /dev/null +++ b/docs/sources/netflow.md @@ -0,0 +1,223 @@ +# NetFlow Source + +The NetFlow source collects network flow data from NetFlow/IPFIX/sFlow exporters. It supports multiple flow protocols and can handle template-based protocols like NetFlow v9 and IPFIX. + +## Configuration + +```yaml +sources: + netflow: + type: netflow + address: "0.0.0.0:2055" # Default NetFlow port + max_length: 65536 # Maximum packet size + protocols: + - "netflow_v5" + - "netflow_v9" + - "ipfix" + - "sflow" + include_raw_data: false # Set to true for debugging + max_templates: 1000 # Maximum templates to cache per observation domain + template_timeout_secs: 3600 # Template cache timeout (1 hour) + multicast_groups: # Optional: join multicast groups + - "224.0.0.2" + - "224.0.0.4" + receive_buffer_bytes: 262144 # Optional: set socket receive buffer +``` + +## Options + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `address` | `string` | `"0.0.0.0:2055"` | The address to bind to for receiving NetFlow packets | +| `max_length` | `integer` | `65536` | Maximum size of incoming packets | +| `protocols` | `array` | `["netflow_v5", "netflow_v9", "ipfix", "sflow"]` | List of supported flow protocols | +| `include_raw_data` | `boolean` | `false` | Whether to include raw packet data in events for debugging | +| `max_templates` | `integer` | `1000` | Maximum number of templates to cache per observation domain | +| `template_timeout_secs` | `integer` | `3600` | Template cache timeout in seconds | +| `multicast_groups` | `array` | `[]` | List of IPv4 multicast groups to join | +| `receive_buffer_bytes` | `integer` | `null` | Size of the receive buffer for the listening socket | + +## Supported Protocols + +### NetFlow v5 +- Fixed format flow records +- No templates required +- Most common NetFlow version + +### NetFlow v9 +- Template-based format +- Supports variable-length fields +- Templates are cached automatically + +### IPFIX +- Modern standard (RFC 7011) +- Template-based format +- Supports enterprise-specific fields +- Templates are cached automatically + +### sFlow +- Sampled flow data +- Different header format +- Includes counter samples + +## Output Events + +The source generates log events with the following structure: + +### NetFlow v5 Events +```json +{ + "flow_type": "netflow_v5", + "version": 5, + "sys_uptime": 123456, + "unix_secs": 1640995200, + "flow_sequence": 1, + "engine_type": 0, + "engine_id": 0, + "sampling_interval": 1000, + "src_addr": "192.168.1.1", + "dst_addr": "10.0.0.1", + "src_port": 80, + "dst_port": 443, + "protocol": 6, + "protocol_name": "TCP", + "packets": 100, + "octets": 1024, + "tcp_flags": 2, + "tos": 0, + "src_as": 65000, + "dst_as": 65001, + "input": 1, + "output": 2, + "first": 1000, + "last": 2000, + "flow_duration_ms": 1000 +} +``` + +### NetFlow v9/IPFIX Events +```json +{ + "flow_type": "netflow_v9_data", + "template_id": 256, + "source_id": 1, + "in_bytes": 1024, + "in_packets": 100, + "ipv4_src_addr": "192.168.1.1", + "ipv4_dst_addr": "10.0.0.1", + "l4_src_port": 80, + "l4_dst_port": 443, + "protocol": 6, + "tcp_flags": 2 +} +``` + +### sFlow Events +```json +{ + "flow_type": "sflow", + "version": 5, + "agent_address": "192.168.1.1", + "sub_agent_id": 0, + "sequence_number": 1, + "sys_uptime": 123456, + "num_samples": 1 +} +``` + +## Template Caching + +For template-based protocols (NetFlow v9 and IPFIX), the source automatically caches templates received from exporters. Templates are keyed by: + +- Exporter address (peer_addr) +- Observation domain ID +- Template ID + +Templates are automatically cleaned up after the configured timeout period. + +## Multicast Support + +The source can join multicast groups to receive NetFlow traffic from multiple sources. When using multicast: + +1. Set the listening address to `0.0.0.0` (not a specific interface) +2. Configure the `multicast_groups` option with the desired multicast addresses +3. Ensure your network infrastructure supports the multicast groups + +## Performance Considerations + +- **Template Cache Size**: Monitor memory usage if you have many exporters with different templates +- **Packet Size**: Adjust `max_length` based on your network's MTU +- **Receive Buffer**: Increase `receive_buffer_bytes` if you experience packet drops +- **Protocol Filtering**: Only enable the protocols you need to reduce processing overhead + +## Troubleshooting + +### Enable Raw Data +Set `include_raw_data: true` to include base64-encoded raw packet data in events for debugging. + +### Check Template Cache +Monitor template cache size and cleanup frequency. If templates are expiring too quickly, increase `template_timeout_secs`. + +### Multicast Issues +- Ensure the listening address is `0.0.0.0` +- Check that multicast groups are valid IPv4 addresses +- Verify network infrastructure supports the multicast groups + +### Packet Drops +- Increase `receive_buffer_bytes` +- Check system UDP buffer limits +- Monitor network interface statistics + +## Examples + +### Basic NetFlow v5 Collection +```yaml +sources: + netflow: + type: netflow + address: "0.0.0.0:2055" + protocols: + - "netflow_v5" +``` + +### IPFIX with Template Caching +```yaml +sources: + ipfix: + type: netflow + address: "0.0.0.0:4739" + protocols: + - "ipfix" + max_templates: 2000 + template_timeout_secs: 7200 +``` + +### Multicast NetFlow Collection +```yaml +sources: + netflow_multicast: + type: netflow + address: "0.0.0.0:2055" + multicast_groups: + - "224.0.0.2" + - "224.0.0.4" + protocols: + - "netflow_v5" + - "netflow_v9" +``` + +### Debug Configuration +```yaml +sources: + netflow_debug: + type: netflow + address: "0.0.0.0:2055" + protocols: + - "netflow_v5" + - "netflow_v9" + - "ipfix" + - "sflow" + include_raw_data: true + max_templates: 100 + template_timeout_secs: 1800 +``` \ No newline at end of file diff --git a/src/sources/mod.rs b/src/sources/mod.rs index 91e6c333d3705..de618c8a61f10 100644 --- a/src/sources/mod.rs +++ b/src/sources/mod.rs @@ -80,6 +80,8 @@ pub mod pulsar; pub mod redis; #[cfg(feature = "sources-socket")] pub mod socket; +#[cfg(feature = "sources-netflow")] +pub mod netflow; #[cfg(feature = "sources-splunk_hec")] pub mod splunk_hec; #[cfg(feature = "sources-static_metrics")] diff --git a/src/sources/netflow.rs b/src/sources/netflow.rs new file mode 100644 index 0000000000000..b36ae0ebc36be --- /dev/null +++ b/src/sources/netflow.rs @@ -0,0 +1,2423 @@ +use std::collections::HashMap; +use std::net::{Ipv4Addr, SocketAddr}; + +use base64::Engine; +use bytes::BytesMut; +use serde::{Deserialize, Serialize}; +use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig}; +use vector_lib::configurable::configurable_component; +use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path}; +use vector_lib::config::LogNamespace; + +use crate::{ + codecs::{Decoder, DecodingConfig}, + config::{SourceConfig, SourceContext, SourceOutput}, + event::Event, + internal_events::{ + SocketBindError, SocketEventsReceived, SocketMode, SocketMulticastGroupJoinError, + SocketReceiveError, StreamClosedError, + }, + serde::default_decoding, + shutdown::ShutdownSignal, + sources::util::net::{try_bind_udp_socket, SocketListenAddr}, + SourceSender, +}; + +/// Configuration for the `netflow` source. +#[configurable_component(source("netflow", "Collect network flow data from NetFlow/IPFIX/sFlow exporters."))] +#[derive(Clone, Debug)] +pub struct NetflowConfig { + #[configurable(derived)] + address: SocketListenAddr, + + /// List of IPv4 multicast groups to join on socket's binding process. + /// + /// In order to read multicast packets, this source's listening address should be set to `0.0.0.0`. + /// If any other address is used (such as `127.0.0.1` or an specific interface address), the + /// listening interface will filter out all multicast packets received, + /// as their target IP would be the one of the multicast group + /// and it will not match the socket's bound IP. + /// + /// Note that this setting will only work if the source's address + /// is an IPv4 address (IPv6 and systemd file descriptor as source's address are not supported + /// with multicast groups). + #[serde(default)] + #[configurable(metadata(docs::examples = "['224.0.0.2', '224.0.0.4']"))] + pub(super) multicast_groups: Vec, + + /// The maximum buffer size of incoming messages. + /// + /// Messages larger than this are truncated. + #[serde(default = "default_max_length")] + #[configurable(metadata(docs::type_unit = "bytes"))] + pub(super) max_length: usize, + + /// Overrides the name of the log field used to add the peer host to each event. + /// + /// The value will be the peer host's address, including the port i.e. `1.2.3.4:9000`. + /// + /// By default, the [global `log_schema.host_key` option][global_host_key] is used. + /// + /// Set to `""` to suppress this key. + /// + /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key + host_key: Option, + + /// Overrides the name of the log field used to add the peer host's port to each event. + /// + /// The value will be the peer host's port i.e. `9000`. + /// + /// By default, `"port"` is used. + /// + /// Set to `""` to suppress this key. + #[serde(default = "default_port_key")] + port_key: OptionalValuePath, + + /// The size of the receive buffer used for the listening socket. + #[configurable(metadata(docs::type_unit = "bytes"))] + receive_buffer_bytes: Option, + + /// Supported flow protocols to parse. + #[serde(default = "default_protocols")] + pub(super) protocols: Vec, + + /// Whether to include raw packet data in events for debugging. + #[serde(default = "crate::serde::default_false")] + pub(super) include_raw_data: bool, + + /// Maximum number of templates to cache per observation domain. + #[serde(default = "default_max_templates")] + pub(super) max_templates: usize, + + /// Template cache timeout in seconds. + #[serde(default = "default_template_timeout")] + pub(super) template_timeout_secs: u64, + + #[configurable(derived)] + pub(super) framing: Option, + + #[configurable(derived)] + #[serde(default = "default_decoding")] + pub(super) decoding: DeserializerConfig, + + /// The namespace to use for logs. This overrides the global setting. + #[serde(default)] + #[configurable(metadata(docs::hidden))] + pub log_namespace: Option, +} + +/// Supported flow protocols. +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum FlowProtocol { + /// NetFlow version 5. + NetflowV5, + /// NetFlow version 9. + NetflowV9, + /// IPFIX (Internet Protocol Flow Information Export). + IPFIX, + /// sFlow (sampled flow). + SFlow, +} + +#[derive(Debug, Clone)] +struct TemplateField { + field_type: u16, + field_length: u16, + enterprise_number: Option, +} + +#[derive(Debug, Clone)] +struct Template { + #[allow(dead_code)] + template_id: u16, + fields: Vec, +} + +type TemplateKey = (std::net::SocketAddr, u32, u16); // (exporter, observation_domain, template_id) +type TemplateCache = HashMap; + +#[allow(dead_code)] +fn parse_netflow_v9_templates(data: &[u8]) -> Vec