From ede4364da52508a1838025079eca40aec0d24207 Mon Sep 17 00:00:00 2001 From: Lokesh Dhoundiyal Date: Tue, 17 Jun 2025 16:46:26 +1200 Subject: [PATCH 1/5] plugin: input and process plugin for unix domain socket Systems were ipfix traffic is metered/counted by the hardware support this new input plugin sock and the corresponding process plugin sockpktinfo can be useful to receive and process ipfix flow information in form of switch records via a unix domain socket which then can be formatted in ipfix data packet and exported to a collector via ipfixprobe. Add some new fields for IPv4 and IPv6 that could be useful. --- CMakeLists.txt | 1 + include/ipfixprobe/flowifc.hpp | 3 + include/ipfixprobe/ipfix-elements.hpp | 18 +- include/ipfixprobe/packet.hpp | 12 + init/config2args.py | 15 + init/link0.conf.example | 5 +- init/schema.json | 20 ++ pkg/rpm/CMakeLists.txt | 4 + pkg/rpm/ipfixprobe.spec.in | 19 +- src/plugins/input/CMakeLists.txt | 4 + src/plugins/input/sock/CMakeLists.txt | 31 ++ src/plugins/input/sock/README.md | 0 src/plugins/input/sock/src/sock.cpp | 278 ++++++++++++++++++ src/plugins/input/sock/src/sock.hpp | 141 +++++++++ src/plugins/process/CMakeLists.txt | 4 + .../process/sockpktinfo/CMakeLists.txt | 27 ++ src/plugins/process/sockpktinfo/README.md | 0 .../process/sockpktinfo/src/sockpktinfo.cpp | 62 ++++ .../process/sockpktinfo/src/sockpktinfo.hpp | 120 ++++++++ src/plugins/storage/cache/src/cache.cpp | 11 + 20 files changed, 769 insertions(+), 6 deletions(-) create mode 100644 src/plugins/input/sock/CMakeLists.txt create mode 100644 src/plugins/input/sock/README.md create mode 100644 src/plugins/input/sock/src/sock.cpp create mode 100644 src/plugins/input/sock/src/sock.hpp create mode 100644 src/plugins/process/sockpktinfo/CMakeLists.txt create mode 100644 src/plugins/process/sockpktinfo/README.md create mode 100644 src/plugins/process/sockpktinfo/src/sockpktinfo.cpp create mode 100644 src/plugins/process/sockpktinfo/src/sockpktinfo.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 784bb56ec..bc731597e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,6 +12,7 @@ include(cmake/installation.cmake) set(CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake/modules) +option(ENABLE_INPUT_SOCK "Enable build of input SOCK plugin" OFF) option(ENABLE_INPUT_PCAP "Enable build of input PCAP plugin" OFF) option(ENABLE_INPUT_DPDK "Enable build of input DPDK plugin" OFF) option(ENABLE_INPUT_NFB "Enable build of input NFB plugin" OFF) diff --git a/include/ipfixprobe/flowifc.hpp b/include/ipfixprobe/flowifc.hpp index 4919df27f..cbe9b21d6 100644 --- a/include/ipfixprobe/flowifc.hpp +++ b/include/ipfixprobe/flowifc.hpp @@ -253,9 +253,12 @@ struct Flow : public Record { uint32_t dst_packets; uint8_t src_tcp_flags; uint8_t dst_tcp_flags; + uint32_t drop_packets; + uint32_t ing_phy_interface; uint8_t ip_version; + uint8_t ip_tos; uint8_t ip_proto; uint16_t src_port; uint16_t dst_port; diff --git a/include/ipfixprobe/ipfix-elements.hpp b/include/ipfixprobe/ipfix-elements.hpp index 0ed972a5f..35cd80b8b 100644 --- a/include/ipfixprobe/ipfix-elements.hpp +++ b/include/ipfixprobe/ipfix-elements.hpp @@ -89,6 +89,8 @@ namespace ipxp { #define OUTPUT_INTERFACE(F) F(0, 14, 2, nullptr) #define FLOW_END_REASON(F) F(0, 136, 1, &flow.end_reason) #define FLOW_ID(F) F(0, 148, 8, &flow.flow_hash) +#define DROPS(F) F(0, 133, 4, &flow.drop_packets) +#define ING_PHY_INTERFACE(F) F(0, 252, 4, &flow.ing_phy_interface) #define ETHERTYPE(F) F(0, 256, 2, nullptr) @@ -100,9 +102,10 @@ namespace ipxp { #define L3_PROTO(F) F(0, 60, 1, &flow.ip_version) #define L3_IPV4_ADDR_SRC(F) F(0, 8, 4, &flow.src_ip.v4) #define L3_IPV4_ADDR_DST(F) F(0, 12, 4, &flow.dst_ip.v4) -#define L3_IPV4_TOS(F) F(0, 5, 1, nullptr) +#define L3_IPV4_TOS(F) F(0, 5, 1, &flow.ip_tos) #define L3_IPV6_ADDR_SRC(F) F(0, 27, 16, &flow.src_ip.v6) #define L3_IPV6_ADDR_DST(F) F(0, 28, 16, &flow.dst_ip.v6) +#define L3_IPV6_TOS(F) F(0, 5, 1, &flow.ip_tos) #define L3_IPV4_IDENTIFICATION(F) F(0, 54, 2, nullptr) #define L3_IPV4_FRAGMENT(F) F(0, 88, 2, nullptr) #define L3_IPV4_TTL(F) F(0, 192, 1, nullptr) @@ -343,7 +346,8 @@ namespace ipxp { F(L3_IPV4_ADDR_SRC) \ F(L3_IPV4_ADDR_DST) \ F(L2_SRC_MAC) \ - F(L2_DST_MAC) + F(L2_DST_MAC) \ + F(L3_IPV4_TOS) #define BASIC_TMPLT_V6(F) \ F(FLOW_END_REASON) \ @@ -363,7 +367,8 @@ namespace ipxp { F(L3_IPV6_ADDR_SRC) \ F(L3_IPV6_ADDR_DST) \ F(L2_SRC_MAC) \ - F(L2_DST_MAC) + F(L2_DST_MAC) \ + F(L3_IPV6_TOS) #define IPFIX_HTTP_TEMPLATE(F) \ F(HTTP_USERAGENT) \ @@ -583,6 +588,10 @@ namespace ipxp { #define IPFIX_MPLS_TEMPLATE(F) F(MPLS_TOP_LABEL_STACK_SECTION) +#define IPFIX_SOCKPKTINFO_TEMPLATE(F) \ + F(ING_PHY_INTERFACE) \ + F(DROPS) + /** * List of all known templated. * @@ -616,7 +625,8 @@ namespace ipxp { IPFIX_ICMP_TEMPLATE(F) \ IPFIX_VLAN_TEMPLATE(F) \ IPFIX_NETTISA_TEMPLATE(F) \ - IPFIX_FLOW_HASH_TEMPLATE(F) + IPFIX_FLOW_HASH_TEMPLATE(F) \ + IPFIX_SOCKPKTINFO_TEMPLATE(F) /** * Helper macro, convert FIELD into its name as a C literal. diff --git a/include/ipfixprobe/packet.hpp b/include/ipfixprobe/packet.hpp index c84baa534..70a38b1ab 100644 --- a/include/ipfixprobe/packet.hpp +++ b/include/ipfixprobe/packet.hpp @@ -100,6 +100,12 @@ struct Packet : public Record { uint16_t buffer_size; /**< Size of buffer */ bool source_pkt; /**< Direction of packet from flow point of view */ + uint32_t pkt_cnt; /**< Number of packets on a flow - input plugin 'sock' */ + uint32_t drop_cnt; /**< Number of dropped packets on a flow - input plugin 'sock' */ + uint64_t byte_cnt; /**< Bytes received on a flow - input plugin 'sock' */ + uint32_t source_interface; /**< source interface for the flow - input plugin 'sock' */ + uint8_t end_reason; /**< Flow end reason - input plugin 'sock' */ + struct timeval end_ts; /**< Flow end time - input plugin 'sock' */ /** * \brief Constructor. @@ -142,6 +148,12 @@ struct Packet : public Record { , buffer(nullptr) , buffer_size(0) , source_pkt(true) + , pkt_cnt(0) + , drop_cnt(0) + , byte_cnt(0) + , source_interface(0) + , end_reason (0) + , end_ts({0, 0}) { } }; diff --git a/init/config2args.py b/init/config2args.py index fb90b91e1..246779e00 100755 --- a/init/config2args.py +++ b/init/config2args.py @@ -39,6 +39,8 @@ def process_input_plugin(config): return process_input_dpdk_plugin(settings) if plugin == "dpdk_ring": return process_input_dpdk_ring_plugin(settings) + if plugin == "sock": + return process_input_sock_plugin(settings) if plugin == "raw": return process_input_raw_plugin(settings) if plugin == "ndp": @@ -286,6 +288,19 @@ def process_input_raw_plugin(settings): return " ".join(params) +def process_input_sock_plugin(settings): + params = ['-i "sock'] + + if settings is None: + raise ValueError("Settings for sock plugin cannot be empty.") + + sock = settings.get("sock") + if sock is None: + raise ValueError("sock must be specified in the sock plugin configuration.") + + params.append(f"sock={sock}") + + return " ".join(params) def process_process_plugins(config): process_plugins = config.get("process_plugins", []) diff --git a/init/link0.conf.example b/init/link0.conf.example index 8e27a8e85..2056cde77 100644 --- a/init/link0.conf.example +++ b/init/link0.conf.example @@ -1,7 +1,7 @@ # Input plugin configuration (input_plugin) input_plugin: # IMPORTANT: Only one input plugin can be specified. Choose one of the following options: - # raw, pcap_file, pcap_live, ndp, dpdk_ring, or dpdk. + # raw, pcap_file, pcap_live, ndp, dpdk_ring, or dpdk, sock. raw: interface: eth0 # Network interface name to capture traffic from [required] @@ -35,6 +35,9 @@ input_plugin: eal_opts: null # EAL options (null = default options) mtu: null # Maximum Transmission Unit (defaults to RTE_ETHER_MAX_LEN) + sock: + sock: Unix domain socket path [required] + # Storage configuration (storage) storage: cache: diff --git a/init/schema.json b/init/schema.json index 6180fecc3..2ceeaab98 100644 --- a/init/schema.json +++ b/init/schema.json @@ -32,6 +32,26 @@ "raw" ] }, + { + "type": "object", + "properties": { + "sock": { + "type": "object", + "properties": { + "path": { + "type": "string" + } + }, + "required": [ + "path" + ], + "additionalProperties": false + } + }, + "required": [ + "sock" + ] + }, { "type": "object", "properties": { diff --git a/pkg/rpm/CMakeLists.txt b/pkg/rpm/CMakeLists.txt index cbdd25f48..5da5f9661 100644 --- a/pkg/rpm/CMakeLists.txt +++ b/pkg/rpm/CMakeLists.txt @@ -22,6 +22,10 @@ if (ENABLE_INPUT_PCAP) list(APPEND RPMBUILD_ARGS "--with" "input_pcap") endif() +if (ENABLE_INPUT_SOCK) + list(APPEND RPMBUILD_ARGS "--with" "input_sock") +endif() + if (ENABLE_INPUT_DPDK) list(APPEND RPMBUILD_ARGS "--with" "input_dpdk") endif() diff --git a/pkg/rpm/ipfixprobe.spec.in b/pkg/rpm/ipfixprobe.spec.in index 09e12b534..456eb1cf1 100644 --- a/pkg/rpm/ipfixprobe.spec.in +++ b/pkg/rpm/ipfixprobe.spec.in @@ -2,6 +2,7 @@ %bcond_with input_dpdk %bcond_with input_nfb %bcond_with process_experimental +%bcond_with input_sock %global _unitdir %{_prefix}/lib/systemd/system @@ -62,6 +63,16 @@ BuildRequires: libpcap-devel Input plugin for libpcap. %endif +%if %{with input_sock} +%package input-sock +Summary: Input plugin to read flow records from a unix domain socket using libsock. +Requires: libsock +BuildRequires: libsock-devel + +%description input-sock +Input plugin for libsock. +%endif + %if %{with input_dpdk} %package input-dpdk Summary: Input plugin to read packets from interfaces using dpdk. @@ -102,7 +113,7 @@ Experimental process plugins. %if 0%{?rhel} == 8 source /opt/rh/gcc-toolset-14/enable %endif -%cmake -DCMAKE_BUILD_TYPE=Release %{?with_input_pcap:-DENABLE_INPUT_PCAP=ON} %{?with_input_dpdk:-DENABLE_INPUT_DPDK=ON} %{?with_input_nfb:-DENABLE_INPUT_NFB=ON} %{?with_process_experimental: -DENABLE_PROCESS_EXPERIMENTAL=ON} +%cmake -DCMAKE_BUILD_TYPE=Release %{?with_input_pcap:-DENABLE_INPUT_PCAP=ON} %{?with_input_sock:-DENABLE_INPUT_SOCK=ON} %{?with_input_dpdk:-DENABLE_INPUT_DPDK=ON} %{?with_input_nfb:-DENABLE_INPUT_NFB=ON} %{?with_process_experimental: -DENABLE_PROCESS_EXPERIMENTAL=ON} %cmake_build %install @@ -155,6 +166,12 @@ source /opt/rh/gcc-toolset-14/enable %{_libdir}/ipfixprobe/input/libipfixprobe-input-pcap.so %endif +%if %{with input_sock} +%files input-sock +%{_libdir}/ipfixprobe/input/libipfixprobe-input-sock.so +%{_libdir}/ipfixprobe/process/libipfixprobe-process-sockpktinfo.so +%endif + %if %{with input_nfb} %files input-nfb %{_libdir}/ipfixprobe/input/libipfixprobe-input-nfb.so diff --git a/src/plugins/input/CMakeLists.txt b/src/plugins/input/CMakeLists.txt index de07ed619..7fd9d0547 100644 --- a/src/plugins/input/CMakeLists.txt +++ b/src/plugins/input/CMakeLists.txt @@ -1,5 +1,9 @@ add_subdirectory(raw) +if (ENABLE_INPUT_SOCK) + add_subdirectory(sock) +endif() + if (ENABLE_INPUT_PCAP) add_subdirectory(pcap) endif() diff --git a/src/plugins/input/sock/CMakeLists.txt b/src/plugins/input/sock/CMakeLists.txt new file mode 100644 index 000000000..63d59151d --- /dev/null +++ b/src/plugins/input/sock/CMakeLists.txt @@ -0,0 +1,31 @@ +project(ipfixprobe-input-sock VERSION 1.0.0 DESCRIPTION "ipfixprobe-input-sock plugin") + +add_library(ipfixprobe-input-sock MODULE + src/sock.cpp + src/sock.hpp + ../parser/parser.cpp + ../parser/parser.hpp +) + +set_target_properties(ipfixprobe-input-sock PROPERTIES + CXX_VISIBILITY_PRESET hidden + VISIBILITY_INLINES_HIDDEN YES +) +target_include_directories(ipfixprobe-input-sock PRIVATE + ${SOCK_INCLUDE_DIRS} + ${CMAKE_SOURCE_DIR}/include/ + ${CMAKE_SOURCE_DIR}/src/plugins/input/parser +) + +target_compile_definitions(ipfixprobe-input-sock PRIVATE + WITH_SOCK +) + +target_link_libraries(ipfixprobe-input-sock PRIVATE + ${SOCK_LIBRARIES} + telemetry::telemetry +) + +install(TARGETS ipfixprobe-input-sock + LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixprobe/input/" +) diff --git a/src/plugins/input/sock/README.md b/src/plugins/input/sock/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/src/plugins/input/sock/src/sock.cpp b/src/plugins/input/sock/src/sock.cpp new file mode 100644 index 000000000..2dd5f0585 --- /dev/null +++ b/src/plugins/input/sock/src/sock.cpp @@ -0,0 +1,278 @@ +/** + * @file + * @brief Switch records reader from unix domain sockets. + * This is useful plugin for devices with IPFIX support in silicon. + * A switch record identified by the device can be sent to + * this input plugin via a unix domain socket for processing + * exporting to a collector. + * @author Lokesh dhoundiyal + * @date 2025 + * + * Copyright (c) 2025 CESNET + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "sock.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace ipxp { + +// Print debug message if debugging is allowed. +#define DEBUG_MSG(format, ...) fprintf(stderr, format, ##__VA_ARGS__) +// Process code if debugging is allowed. +#define DEBUG_CODE(code) code + + +static const PluginManifest sockPluginManifest = { + .name = "sock", + .description = "sock input plugin for reading ipfix flow records from a unix domain socket.", + .pluginVersion = "1.0.0", + .apiVersion = "1.0.0", + .usage = + []() { + SockOptParser parser; + parser.usage(std::cout); + }, +}; + +SockReader::SockReader(const std::string& params) + : sock(-1) +{ + init(params.c_str()); +} + +SockReader::~SockReader() +{ + close(); +} + +void SockReader::init(const char* params) +{ + SockOptParser parser; + try { + parser.parse(params); + } catch (ParserError& e) { + throw PluginError(e.what()); + } + + if (parser.m_sock.empty()) { + throw PluginError("specify socket path"); + } + + open_sock(parser.m_sock); +} + +void SockReader::close() +{ + if (sock >= 0) { + ::close(sock); + sock = -1; + } +} + +void SockReader::open_sock(const std::string& m_sock) +{ + int server_sock, len, rc; + struct sockaddr_un server_sockaddr; + + server_sock = socket(AF_UNIX, SOCK_DGRAM, 0); + if (server_sock == -1) { + throw PluginError( + std::string("could not create AF_UNIX socket: ") + strerror(errno)); + } + + server_sockaddr.sun_family = AF_UNIX; + strcpy(server_sockaddr.sun_path, m_sock.c_str()); + len = sizeof(server_sockaddr); + unlink(m_sock.c_str()); + rc = bind(server_sock, (struct sockaddr*)&server_sockaddr, len); + if (rc == -1) { + ::close(server_sock); + throw PluginError( + std::string("bind failed: ") + strerror(errno)); + } + sock = server_sock; +} + +void SockReader::set_packet(Packet* pkt, struct SwitchRecordData* recordData) +{ + char dst_str[INET6_ADDRSTRLEN]; + char src_str[INET6_ADDRSTRLEN]; + + DEBUG_CODE(char timestamp[32]; time_t time = recordData->start_time.tv_sec; + strftime(timestamp, sizeof(timestamp), "%FT%T", localtime(&time));); + DEBUG_MSG("Time:\t\t\t%s.%06lu\n", timestamp, recordData->start_time.tv_usec); + DEBUG_MSG("Source interface:\t%u\n", recordData->src_if); + + pkt->ts = recordData->start_time; + pkt->end_ts = recordData->end_time; + pkt->end_reason = recordData->end_reason; + pkt->ip_version = recordData->ip_version; + pkt->source_interface = recordData->src_if; + pkt->src_port = 0; + pkt->dst_port = 0; + pkt->ip_proto = 0; + pkt->ip_ttl = 0; + pkt->ip_flags = 0; + pkt->ip_payload_len = 0; + pkt->tcp_flags = 0; + pkt->tcp_window = 0; + pkt->tcp_options = 0; + pkt->tcp_mss = 0; + memcpy(pkt->dst_mac, recordData->dst_mac, sizeof(recordData->dst_mac)); + memcpy(pkt->src_mac, recordData->src_mac, sizeof(recordData->src_mac)); + pkt->ethertype = recordData->eth_type; + pkt->vlan_id = recordData->vlan_id; + pkt->ip_tos = recordData->tos; + + DEBUG_CODE( + char src_mac[18]; // ether_ntoa missing on some platforms + char dst_mac[18]; + uint8_t *p = (uint8_t *) pkt->src_mac; + snprintf(src_mac, sizeof(src_mac), "%02x:%02x:%02x:%02x:%02x:%02x", p[0], p[1], p[2], p[3], p[4], p[5]); + p = (uint8_t *) pkt->dst_mac; + snprintf(dst_mac, sizeof(dst_mac), "%02x:%02x:%02x:%02x:%02x:%02x", p[0], p[1], p[2], p[3], p[4], p[5]); + ); + DEBUG_MSG("\tDest mac:\t%s\n", dst_mac); + DEBUG_MSG("\tSrc mac:\t%s\n", src_mac); + DEBUG_MSG("\tEthertype:\t%#06x\n", pkt->ethertype); + DEBUG_MSG("\tVLAN:\t%u\n", pkt->vlan_id); + + if (pkt->ip_version == 4) { + pkt->src_ip.v4 = recordData->src_ip.s_addr; + pkt->dst_ip.v4 = recordData->dst_ip.s_addr; + inet_ntop(AF_INET, &recordData->src_ip, src_str, 16); + inet_ntop(AF_INET, &recordData->dst_ip, dst_str, 16); + DEBUG_MSG("IPv4 header:\n"); + } + else if (pkt->ip_version == 6) { + memcpy(pkt->src_ip.v6, recordData->src_ip6.s6_addr, 16); + memcpy(pkt->dst_ip.v6, recordData->dst_ip6.s6_addr, 16); + inet_ntop(AF_INET6, &recordData->src_ip6, src_str, INET6_ADDRSTRLEN); + inet_ntop(AF_INET6, &recordData->dst_ip6, dst_str, INET6_ADDRSTRLEN); + DEBUG_MSG("IPv6 header:\n"); + } + + pkt->ip_proto = recordData->ip_proto; + pkt->ip_len = recordData->ip_length; + pkt->ip_ttl = recordData->ip_ttl; + pkt->ip_flags = recordData->ip_flags; + pkt->ip_payload_len = recordData->ip_payload_len; + + DEBUG_MSG("\tHDR version:\t%u\n", pkt->ip_version); + DEBUG_MSG("\tHDR length:\t%u\n", pkt->ip_payload_len); + DEBUG_MSG("\tTotal length:\t%u\n", pkt->ip_len); + DEBUG_MSG("\tTOS:\t\t%u\n", pkt->ip_tos); + DEBUG_MSG("\tProtocol:\t%u\n", pkt->ip_proto); + DEBUG_MSG("\tSrc addr:\t%s\n", src_str); + DEBUG_MSG("\tDest addr:\t%s\n", dst_str); + DEBUG_MSG("\tFlags:\t\t%#x\n", pkt->ip_flags); + DEBUG_MSG("\tTTL:\t\t%u\n", pkt->ip_ttl); + + pkt->src_port = recordData->src_port; + pkt->dst_port = recordData->dst_port; + if (pkt->ip_proto == IPPROTO_TCP) { + pkt->tcp_flags = recordData->tcp_control_bits; + pkt->tcp_window = recordData->tcp_window; + pkt->tcp_seq = recordData->tcp_seq; + pkt->tcp_ack = recordData->tcp_ack; + DEBUG_MSG("TCP header:\n"); + DEBUG_MSG("\tSrc port:\t%u\n", pkt->src_port); + DEBUG_MSG("\tDest port:\t%u\n", pkt->dst_port); + DEBUG_MSG("\tFlags:\t%u\n", pkt->tcp_flags); + DEBUG_MSG("\tSEQ:\t\t%#x\n", pkt->tcp_seq); + DEBUG_MSG("\tACK SEQ:\t%#x\n", pkt->tcp_ack); + DEBUG_MSG("\tWindow:\t\t%u\n", pkt->tcp_window); + } + if (pkt->ip_proto == IPPROTO_UDP) { + DEBUG_MSG("UDP header:\n"); + DEBUG_MSG("\tSrc port:\t%u\n", pkt->src_port); + DEBUG_MSG("\tDest port:\t%u\n", pkt->dst_port); + } + + pkt->pkt_cnt = recordData->pkt_cnt; + pkt->byte_cnt = recordData->byte_cnt; + DEBUG_MSG("Packet count %u byte count: %lu\n", pkt->pkt_cnt, pkt->byte_cnt); +} + +InputPlugin::Result SockReader::get(PacketBlock& pblock) +{ + int bytes_rec = -1; + struct sockaddr_un peer_sock; + int len; + Packet* pkt; + struct SwitchRecordData* recordData; + struct SwitchRecordHdr* recordHdr; + uint8_t recordHdrBuffer[sizeof(struct SwitchRecordHdr)]; + int recordBuffer_size; + uint8_t* recordBuffer = NULL; + + bytes_rec = recvfrom(sock, recordHdrBuffer, sizeof(struct SwitchRecordHdr), MSG_PEEK | MSG_DONTWAIT, + (struct sockaddr*) &peer_sock, (socklen_t*) &len); + if (bytes_rec == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return Result::TIMEOUT; + } else { + ::close(sock); + throw PluginError(std::string("recvfrom failed: ") + strerror(errno)); + } + } else { + recordHdr = (struct SwitchRecordHdr*) recordHdrBuffer; + DEBUG_MSG("Switch record version: %u num_records %u bytes_rec %d\n", + recordHdr->version, recordHdr->num_records, bytes_rec); + + if (recordHdr->version == SWITCH_RECORD_VERSION_V1) { + recordBuffer_size = sizeof(struct SwitchRecordHdr) + + (sizeof(struct SwitchRecordData) * recordHdr->num_records); + + recordBuffer = (uint8_t*) malloc(recordBuffer_size); + if (!recordBuffer) { + ::close(sock); + throw PluginError("not enough memory"); + } else { + bytes_rec = recvfrom(sock, recordBuffer, recordBuffer_size, 0, + (struct sockaddr*) &peer_sock, (socklen_t*) &len); + if (bytes_rec == -1) { + ::close(sock); + throw PluginError(std::string("recvfrom failed: ") + strerror(errno)); + } else { + pblock.cnt = 0; + DEBUG_MSG("bytes_rec :%d \n", bytes_rec); + recordData = (struct SwitchRecordData*) (recordBuffer + sizeof(struct SwitchRecordHdr)); + for (int i = 0; i < recordHdr->num_records; i++, recordData++) { + pkt = &pblock.pkts[pblock.cnt]; + if (recordData) { + DEBUG_MSG("Record count: %d\n", i); + set_packet(pkt, recordData); + pblock.cnt++; + pblock.bytes += pkt->ip_len; + m_seen += recordData->pkt_cnt; + m_parsed += recordData->pkt_cnt; + } + } + } + } + } + } + free(recordBuffer); + return pblock.cnt ? Result::PARSED : Result::NOT_PARSED; +} + +static const PluginRegistrar sockRegistrar(sockPluginManifest); +} // namespace ipxp diff --git a/src/plugins/input/sock/src/sock.hpp b/src/plugins/input/sock/src/sock.hpp new file mode 100644 index 000000000..e0f08bcdf --- /dev/null +++ b/src/plugins/input/sock/src/sock.hpp @@ -0,0 +1,141 @@ +/** + * @file + * @brief Switch records reader using unix domain sockets + * @author Lokesh dhoundiyal + * @date 2025 + * + * Copyright (c) 2025 CESNET + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +#include +#include +#include +#include + + +namespace ipxp { + +#define SWITCH_RECORD_VERSION_V1 1 + +class SockOptParser : public OptionsParser { + public: + std::string m_sock; + + SockOptParser() + : OptionsParser( + "sock", + "Input plugin for reading records from a unix domain socket") + , m_sock("") + { + register_option( + "s", + "sock", + "PATH", + "Unix domain socket path", + [this](const char* arg) { + m_sock = arg; + return true; + }, + OptionFlags::RequiredArgument); + } +}; + +class SockReader : public InputPlugin { + public: + SockReader(const std::string& params); + ~SockReader(); + void init(const char* params); + void close(); + OptionsParser* get_parser() const { return new SockOptParser(); } + std::string get_name() const { return "sock"; } + InputPlugin::Result get(PacketBlock& packets); + + private: + int sock; + void open_sock(const std::string& m_sock); + void set_packet(Packet* pkt, struct SwitchRecordData* recordData); +}; + +struct __attribute__((packed)) SwitchRecordData { + struct timeval start_time; + struct timeval end_time; + uint8_t end_reason; + uint8_t unused; + + uint32_t pkt_cnt; + uint32_t drop_cnt; + uint64_t byte_cnt; + uint32_t src_if; + + uint8_t dst_mac[6]; + uint8_t src_mac[6]; + uint16_t eth_type; + uint32_t vlan_id; + + uint8_t ip_version; + uint8_t ip_proto; + uint8_t tos; + uint8_t ip_ttl; + uint8_t ip_flags; + uint16_t ip_length; /* Length of IP header + its payload */ + uint16_t ip_payload_len; /* Length of IP payload */ + + uint16_t src_port; + uint16_t dst_port; + struct in_addr src_ip; + struct in_addr dst_ip; + struct in6_addr src_ip6; + struct in6_addr dst_ip6; + + uint8_t tcp_control_bits; + uint16_t tcp_window; + uint32_t tcp_seq; + uint32_t tcp_ack; + + /** + * \brief Constructor. + */ + SwitchRecordData() + : start_time({0, 0}) + , end_time({0, 0}) + , end_reason(0) + , pkt_cnt(0) + , drop_cnt(0) + , byte_cnt(0) + , src_if(0) + , dst_mac() + , src_mac() + , eth_type(0) + , vlan_id(0) + , ip_version(0) + , ip_proto(0) + , tos(0) + , ip_ttl(0) + , ip_flags(0) + , ip_length(0) + , ip_payload_len(0) + , src_port(0) + , dst_port(0) + , src_ip({0}) + , dst_ip({0}) + , src_ip6({0}) + , dst_ip6({0}) + , tcp_control_bits(0) + , tcp_window(0) + , tcp_seq(0) + , tcp_ack(0) + { + } +}; + + struct __attribute__((packed)) SwitchRecordHdr { + uint8_t version; + uint8_t unused; + uint16_t num_records; + }; +} // namespace ipxp + diff --git a/src/plugins/process/CMakeLists.txt b/src/plugins/process/CMakeLists.txt index a47322075..d8e9d4319 100644 --- a/src/plugins/process/CMakeLists.txt +++ b/src/plugins/process/CMakeLists.txt @@ -29,3 +29,7 @@ if (ENABLE_PROCESS_EXPERIMENTAL) add_subdirectory(ntp) add_subdirectory(nettisa) endif() + +if (ENABLE_INPUT_SOCK) + add_subdirectory(sockpktinfo) +endif() diff --git a/src/plugins/process/sockpktinfo/CMakeLists.txt b/src/plugins/process/sockpktinfo/CMakeLists.txt new file mode 100644 index 000000000..e2977bb91 --- /dev/null +++ b/src/plugins/process/sockpktinfo/CMakeLists.txt @@ -0,0 +1,27 @@ +project(ipfixprobe-process-sockpktinfo VERSION 1.0.0 DESCRIPTION "ipfixprobe-process-sockpktinfo plugin") + +add_library(ipfixprobe-process-sockpktinfo MODULE + src/sockpktinfo.cpp + src/sockpktinfo.hpp +) + +set_target_properties(ipfixprobe-process-sockpktinfo PROPERTIES + CXX_VISIBILITY_PRESET hidden + VISIBILITY_INLINES_HIDDEN YES +) + +target_include_directories(ipfixprobe-process-sockpktinfo PRIVATE + ${CMAKE_SOURCE_DIR}/include/ +) + +if(ENABLE_NEMEA) + target_link_libraries(ipfixprobe-process-sockpktinfo PRIVATE + -Wl,--whole-archive ipfixprobe-nemea-fields -Wl,--no-whole-archive + unirec::unirec + trap::trap + ) +endif() + +install(TARGETS ipfixprobe-process-sockpktinfo + LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixprobe/process/" +) diff --git a/src/plugins/process/sockpktinfo/README.md b/src/plugins/process/sockpktinfo/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/src/plugins/process/sockpktinfo/src/sockpktinfo.cpp b/src/plugins/process/sockpktinfo/src/sockpktinfo.cpp new file mode 100644 index 000000000..275c872a0 --- /dev/null +++ b/src/plugins/process/sockpktinfo/src/sockpktinfo.cpp @@ -0,0 +1,62 @@ +/** + * @file + * @brief Plugin for parsing packet info arriving via the "sock" input plugin. + * @author Lokesh Dhoundiyal + * @date 2025 + * + * Copyright (c) 2025 CESNET + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "sockpktinfo.hpp" + +#include + +#include +#include + +namespace ipxp { + +static const PluginManifest sockpktinfoPluginManifest = { + .name = "sockpktinfo", + .description = "Sock input plugin packet information process plugin.", + .pluginVersion = "1.0.0", + .apiVersion = "1.0.0", + .usage = + []() { + OptionsParser parser( + "sockpktinfo", + "Process additional information coming in via the sock input plugin use"); + parser.usage(std::cout); + }, +}; + +SOCKPKTINFOPlugin::SOCKPKTINFOPlugin(const std::string& params, int pluginID) + : ProcessPlugin(pluginID) +{ + init(params.c_str()); +} + +ProcessPlugin* SOCKPKTINFOPlugin::copy() +{ + return new SOCKPKTINFOPlugin(*this); +} + +int SOCKPKTINFOPlugin::post_create(Flow& rec, const Packet& pkt) +{ + auto ext = new RecordExtSOCKPKTINFO(m_pluginID); + ext->ing_phy_interface = pkt.source_interface; + ext->drop_packets = pkt.drop_cnt; + + /* Update packet count and byte count received from sock input plugin */ + rec.src_packets = pkt.pkt_cnt; + rec.src_bytes = pkt.byte_cnt; + rec.add_extension(ext); + return FLOW_FLUSH; +} + +static const PluginRegistrar + sockpktinfoRegistrar(sockpktinfoPluginManifest); + +} // namespace ipxp diff --git a/src/plugins/process/sockpktinfo/src/sockpktinfo.hpp b/src/plugins/process/sockpktinfo/src/sockpktinfo.hpp new file mode 100644 index 000000000..0bf847724 --- /dev/null +++ b/src/plugins/process/sockpktinfo/src/sockpktinfo.hpp @@ -0,0 +1,120 @@ +/** + * @file + * @brief Plugin for parsing packet info arriving via the "sock" input plugin. + * @author Lokesh Dhoundiyal + * @date 2025 + * + * Copyright (c) 2025 CESNET + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +#include + +#ifdef WITH_NEMEA +#include "fields.h" +#endif + +#include +#include +#include + +//#include +#include +#include +#include +#include + +namespace ipxp { + +#define SOCKPKTINFO_UNIREC_TEMPLATE "ING_PHY_INTERFACE,DROPS" + +UR_FIELDS( + uint32 ING_PHY_INTERFACE, + uint64 DROPS) + +/** + * \brief Flow record extension header for storing parsed SOCKPKTINFO data. + */ +struct RecordExtSOCKPKTINFO : public RecordExt { + uint32_t ing_phy_interface; + uint32_t drop_packets; + + RecordExtSOCKPKTINFO(int pluginID) + : RecordExt(pluginID) + , ing_phy_interface(0) + , drop_packets(0) + { + } + + #ifdef WITH_NEMEA + virtual void fill_unirec(ur_template_t* tmplt, void* record) + { + ur_set(tmplt, record, F_ING_PHY_INTERFACE, ing_phy_interface); + ur_set(tmplt, record, F_DROPS, drop_packets); + } + + const char* get_unirec_tmplt() const + { + return SOCKPKTINFO_UNIREC_TEMPLATE; + } + #endif + + int fill_ipfix(uint8_t* buffer, int size) override + { + const int LEN = sizeof(ing_phy_interface) + sizeof(drop_packets); + if (size < LEN) { + return -1; + } + *(uint32_t*)buffer = ntohl(ing_phy_interface); + *(uint32_t*)(buffer + 4) = ntohl(drop_packets); + + return LEN; + } + + const char** get_ipfix_tmplt() const + { + static const char* ipfix_template[] = { + IPFIX_SOCKPKTINFO_TEMPLATE(IPFIX_FIELD_NAMES) + NULL + }; + return ipfix_template; + } + + std::string get_text() const + { + std::ostringstream out; + out << "ing_phy_interface=\"" << ing_phy_interface << '"' << ",drop_packets=\"" << drop_packets << '"'; + return out.str(); + } +}; + +/** + * \brief Process plugin for parsing SOCKPKTINFO packets. + */ +class SOCKPKTINFOPlugin : public ProcessPlugin { + public: + SOCKPKTINFOPlugin(const std::string& params, int pluginID); + OptionsParser* get_parser() const + { + return new OptionsParser( + "sockpktinfo", + "Parse SOCKPKTINFO traffic"); + } + std::string get_name() const + { + return "sockpktinfo"; + } + RecordExt* get_ext() const + { + return new RecordExtSOCKPKTINFO(m_pluginID); + } + ProcessPlugin* copy(); + + int post_create(Flow& rec, const Packet& pkt); +}; + +} // namespace ipxp + diff --git a/src/plugins/storage/cache/src/cache.cpp b/src/plugins/storage/cache/src/cache.cpp index bda8bb4ac..0b85f42f5 100644 --- a/src/plugins/storage/cache/src/cache.cpp +++ b/src/plugins/storage/cache/src/cache.cpp @@ -68,6 +68,8 @@ void FlowRecord::erase() m_flow.dst_bytes = 0; m_flow.src_tcp_flags = 0; m_flow.dst_tcp_flags = 0; + m_flow.end_reason = 0; + m_flow.ip_tos = 0; } void FlowRecord::reuse() { @@ -99,6 +101,13 @@ void FlowRecord::create(const Packet& pkt, uint64_t hash) m_flow.time_first = pkt.ts; m_flow.time_last = pkt.ts; + if (pkt.end_ts.tv_sec || pkt.end_ts.tv_usec) { + m_flow.time_last = pkt.end_ts; + } + + if (pkt.end_reason) { + m_flow.end_reason = pkt.end_reason; + } m_flow.flow_hash = hash; memcpy(m_flow.src_mac, pkt.src_mac, 6); @@ -107,12 +116,14 @@ void FlowRecord::create(const Packet& pkt, uint64_t hash) if (pkt.ip_version == IP::v4) { m_flow.ip_version = pkt.ip_version; m_flow.ip_proto = pkt.ip_proto; + m_flow.ip_tos = pkt.ip_tos; m_flow.src_ip.v4 = pkt.src_ip.v4; m_flow.dst_ip.v4 = pkt.dst_ip.v4; m_flow.src_bytes = pkt.ip_len; } else if (pkt.ip_version == IP::v6) { m_flow.ip_version = pkt.ip_version; m_flow.ip_proto = pkt.ip_proto; + m_flow.ip_tos = pkt.ip_tos; memcpy(m_flow.src_ip.v6, pkt.src_ip.v6, 16); memcpy(m_flow.dst_ip.v6, pkt.dst_ip.v6, 16); m_flow.src_bytes = pkt.ip_len; From c80f7be17f8af5d47b96fadda267a804a285e2da Mon Sep 17 00:00:00 2001 From: Lokesh Dhoundiyal Date: Tue, 17 Jun 2025 12:59:38 +1200 Subject: [PATCH 2/5] ipfix: Add export fd check Make sure fd is valid before exporting data to the collector. --- src/plugins/output/ipfix/src/ipfix.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/plugins/output/ipfix/src/ipfix.cpp b/src/plugins/output/ipfix/src/ipfix.cpp index 2095d28ed..0d4e65b09 100644 --- a/src/plugins/output/ipfix/src/ipfix.cpp +++ b/src/plugins/output/ipfix/src/ipfix.cpp @@ -879,7 +879,7 @@ int IPFIXExporter::send_packet(ipfix_packet_t* packet) auto data = packetDataBuffer.getCompressed(); /* sendto() does not guarantee that everything will be send in one piece */ - while (sent < dataLen) { + while (fd != -1 && sent < dataLen) { /* Send data to collector (TCP and SCTP ignores last two arguments) */ ret = sendto( fd, From a440fce4ad4c91de0312d1791f7783d5e1c5fc47 Mon Sep 17 00:00:00 2001 From: Lokesh Dhoundiyal Date: Tue, 17 Jun 2025 12:52:31 +1200 Subject: [PATCH 3/5] ipfixprobe: Enable SIG_IGN on SIG_PIPE If the sock stream connection with the collector is disrupted and socket is still being written to then it can generate a SIGPIPE which is not being handled. This patch allows SIGPIPE signal to be ignored. The EPIPE error is handled after the sendto call. --- src/core/ipfixprobe.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/core/ipfixprobe.cpp b/src/core/ipfixprobe.cpp index aceb789b7..c375e51be 100644 --- a/src/core/ipfixprobe.cpp +++ b/src/core/ipfixprobe.cpp @@ -76,9 +76,7 @@ void register_handlers() signal(SIGTERM, signal_handler); signal(SIGINT, signal_handler); signal(SIGSEGV, signal_handler); -#ifdef WITH_NEMEA signal(SIGPIPE, SIG_IGN); -#endif } void error(std::string msg) From f557bb6911e619032fc92514cdc5428f1adbfafe Mon Sep 17 00:00:00 2001 From: Lokesh Dhoundiyal Date: Wed, 25 Jun 2025 16:28:20 +1200 Subject: [PATCH 4/5] plugin: Make lz4 support conditional --- CMakeLists.txt | 1 + cmake/dependencies.cmake | 4 +++- pkg/rpm/ipfixprobe-msec.spec.in | 4 ++++ pkg/rpm/ipfixprobe-nemea.spec.in | 4 ++++ pkg/rpm/ipfixprobe.spec.in | 6 +++++- src/plugins/output/ipfix/CMakeLists.txt | 10 ++++++++-- src/plugins/output/ipfix/src/ipfix.cpp | 21 +++++++++++++++------ src/plugins/output/ipfix/src/ipfix.hpp | 7 ++++++- 8 files changed, 46 insertions(+), 11 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index bc731597e..a25033a53 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,6 +17,7 @@ option(ENABLE_INPUT_PCAP "Enable build of input PCAP plugin" option(ENABLE_INPUT_DPDK "Enable build of input DPDK plugin" OFF) option(ENABLE_INPUT_NFB "Enable build of input NFB plugin" OFF) option(ENABLE_OUTPUT_UNIREC "Enable build of output UNIREC plugin" OFF) +option(ENABLE_OUTPUT_LZ4 "Enable Build of output LZ4 compression" OFF) option(ENABLE_PROCESS_EXPERIMENTAL "Enable build of experimental process plugins" OFF) option(ENABLE_MILLISECONDS_TIMESTAMP "Compile ipfixprobe with miliseconds timestamp precesion" OFF) option(ENABLE_NEMEA "Enable build of NEMEA plugins" OFF) diff --git a/cmake/dependencies.cmake b/cmake/dependencies.cmake index 2459ed45c..a07a8859b 100644 --- a/cmake/dependencies.cmake +++ b/cmake/dependencies.cmake @@ -4,7 +4,9 @@ find_package(PkgConfig REQUIRED) find_package(Threads REQUIRED) find_package(Atomic REQUIRED) find_package(Unwind REQUIRED) -find_package(LZ4 REQUIRED) +if (ENABLE_OUTPUT_LZ4) + find_package(LZ4 REQUIRED) +endif() find_package(OpenSSL REQUIRED) if (ENABLE_INPUT_PCAP) diff --git a/pkg/rpm/ipfixprobe-msec.spec.in b/pkg/rpm/ipfixprobe-msec.spec.in index 43088601d..fc8b8a242 100644 --- a/pkg/rpm/ipfixprobe-msec.spec.in +++ b/pkg/rpm/ipfixprobe-msec.spec.in @@ -31,13 +31,17 @@ BuildRequires: cmake >= 3.12 BuildRequires: libunwind-devel BuildRequires: gcc-toolset-14-libatomic-devel BuildRequires: pkgconfig +%if %{with lz4} BuildRequires: lz4-devel +%endif BuildRequires: openssl-devel BuildRequires: git Requires: libatomic Requires: fuse3 +%if %{with lz4} Requires: lz4 +%endif Requires: openssl Requires: python3 Requires: python3-pyyaml diff --git a/pkg/rpm/ipfixprobe-nemea.spec.in b/pkg/rpm/ipfixprobe-nemea.spec.in index c93f92606..a97e61d2c 100644 --- a/pkg/rpm/ipfixprobe-nemea.spec.in +++ b/pkg/rpm/ipfixprobe-nemea.spec.in @@ -35,14 +35,18 @@ BuildRequires: cmake >= 3.12 BuildRequires: libunwind-devel BuildRequires: gcc-toolset-14-libatomic-devel BuildRequires: pkgconfig +%if %{with lz4} BuildRequires: lz4-devel +%endif BuildRequires: openssl-devel BuildRequires: nemea-framework-devel BuildRequires: git Requires: libatomic Requires: fuse3 +%if %{with lz4} Requires: lz4 +%endif Requires: openssl Requires: python3 Requires: python3-pyyaml diff --git a/pkg/rpm/ipfixprobe.spec.in b/pkg/rpm/ipfixprobe.spec.in index 456eb1cf1..81ecc30ad 100644 --- a/pkg/rpm/ipfixprobe.spec.in +++ b/pkg/rpm/ipfixprobe.spec.in @@ -38,13 +38,17 @@ BuildRequires: cmake >= 3.12 BuildRequires: libunwind-devel BuildRequires: gcc-toolset-14-libatomic-devel BuildRequires: pkgconfig +%if %{with lz4} BuildRequires: lz4-devel +%endif BuildRequires: openssl-devel BuildRequires: git Requires: libatomic Requires: fuse3 +%if %{with lz4} Requires: lz4 +%endif Requires: openssl Requires: python3 Requires: python3-pyyaml @@ -113,7 +117,7 @@ Experimental process plugins. %if 0%{?rhel} == 8 source /opt/rh/gcc-toolset-14/enable %endif -%cmake -DCMAKE_BUILD_TYPE=Release %{?with_input_pcap:-DENABLE_INPUT_PCAP=ON} %{?with_input_sock:-DENABLE_INPUT_SOCK=ON} %{?with_input_dpdk:-DENABLE_INPUT_DPDK=ON} %{?with_input_nfb:-DENABLE_INPUT_NFB=ON} %{?with_process_experimental: -DENABLE_PROCESS_EXPERIMENTAL=ON} +%cmake -DCMAKE_BUILD_TYPE=Release %{?with_output_lz4:-DENABLE_OUTPUT_LZ4=ON} %{?with_input_pcap:-DENABLE_INPUT_PCAP=ON} %{?with_input_sock:-DENABLE_INPUT_SOCK=ON} %{?with_input_dpdk:-DENABLE_INPUT_DPDK=ON} %{?with_input_nfb:-DENABLE_INPUT_NFB=ON} %{?with_process_experimental: -DENABLE_PROCESS_EXPERIMENTAL=ON} %cmake_build %install diff --git a/src/plugins/output/ipfix/CMakeLists.txt b/src/plugins/output/ipfix/CMakeLists.txt index 41ad2d80d..2ab3db20e 100644 --- a/src/plugins/output/ipfix/CMakeLists.txt +++ b/src/plugins/output/ipfix/CMakeLists.txt @@ -15,10 +15,16 @@ target_include_directories(ipfixprobe-output-ipfix PRIVATE ${CMAKE_SOURCE_DIR}/include/ ) -target_link_libraries(ipfixprobe-output-ipfix PRIVATE - lz4::lz4 +if (ENABLE_OUTPUT_LZ4) + target_link_libraries(ipfixprobe-output-ipfix PRIVATE + lz4::lz4 ) +target_compile_definitions(ipfixprobe-output-ipfix PRIVATE + WITH_LZ4 +) +endif() + install( TARGETS ipfixprobe-output-ipfix LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixprobe/output/" diff --git a/src/plugins/output/ipfix/src/ipfix.cpp b/src/plugins/output/ipfix/src/ipfix.cpp index 0d4e65b09..c219edf64 100644 --- a/src/plugins/output/ipfix/src/ipfix.cpp +++ b/src/plugins/output/ipfix/src/ipfix.cpp @@ -19,7 +19,9 @@ #include #include #include +#ifdef WITH_LZ4 #include +#endif #include #include #include @@ -170,14 +172,16 @@ void IPFIXExporter::init(const char* params) dir_bit_field = parser.m_dir; templateRefreshTime = parser.m_template_refresh_time; - int res; + int res = -1; // check if compression is enabled if (parser.m_lz4_compression) { +#ifdef WITH_LZ4 res = packetDataBuffer.init( true, LZ4_COMPRESSBOUND(mtu) + CompressBuffer::C_ADD_SIZE, // mtu * 3 is arbitrary value, it should be more than mtu * 2 std::max(parser.m_lz4_buffer_size, mtu * 3)); +#endif } else { res = packetDataBuffer.init(false, 0, mtu); } @@ -1189,7 +1193,9 @@ CompressBuffer::CompressBuffer() , readSize(0) , lastReadIndex(0) , lastReadSize(0) +#ifdef WITH_LZ4 , lz4Stream(nullptr) +#endif { } @@ -1216,12 +1222,12 @@ int CompressBuffer::init(bool compress, size_t compressSize, size_t writeSize) return -1; } compressedSize = compressSize; - +#ifdef WITH_LZ4 lz4Stream = LZ4_createStream(); if (!lz4Stream) { return -1; } - +#endif shouldResetConnection = true; return 0; @@ -1297,7 +1303,7 @@ int CompressBuffer::compress() readSize = 0; return compressedSize; } - +#ifdef WITH_LZ4 // resize the buffer if it may not be large enough if (compressedSize < LZ4_COMPRESSBOUND(readSize) + C_ADD_SIZE) { auto newSize = LZ4_COMPRESSBOUND(readSize); @@ -1366,6 +1372,9 @@ int CompressBuffer::compress() readSize = 0; return res + (com - compressed); +#else + return 0; +#endif } const uint8_t* CompressBuffer::getCompressed() const @@ -1425,12 +1434,12 @@ void CompressBuffer::close() compressed = nullptr; compressedSize = 0; } - +#ifdef WITH_LZ4 if (lz4Stream) { LZ4_freeStream(lz4Stream); lz4Stream = nullptr; } - +#endif shouldResetConnection = false; shouldCompress = false; readIndex = 0; diff --git a/src/plugins/output/ipfix/src/ipfix.hpp b/src/plugins/output/ipfix/src/ipfix.hpp index d2f7e9f74..6f2481b2f 100644 --- a/src/plugins/output/ipfix/src/ipfix.hpp +++ b/src/plugins/output/ipfix/src/ipfix.hpp @@ -24,7 +24,9 @@ #include #include #include +#ifdef WITH_LZ4 #include +#endif #define COUNT_IPFIX_TEMPLATES(T) +1 @@ -185,6 +187,7 @@ class IpfixOptParser : public OptionsParser { return true; }, OptionFlags::NoArgument); +#ifdef WITH_LZ4 register_option( "c", "lz4-compression", @@ -196,6 +199,7 @@ class IpfixOptParser : public OptionsParser { return true; }, OptionFlags::NoArgument); +#endif register_option( "s", "lz4-buffer-size", @@ -539,9 +543,10 @@ class CompressBuffer { // last compressed data position size_t lastReadIndex; size_t lastReadSize; - +#ifdef WITH_LZ4 // compression stream used by lz4 LZ4_stream_t* lz4Stream; +#endif }; class IPFIXExporter : public OutputPlugin { From 3edc098bd7a3bbe4025ca9ee3579fa3ec631dad2 Mon Sep 17 00:00:00 2001 From: Lokesh Dhoundiyal Date: Thu, 26 Jun 2025 12:27:58 +1200 Subject: [PATCH 5/5] plugin: Make unwind support conditional --- CMakeLists.txt | 2 +- cmake/dependencies.cmake | 5 ++++- pkg/rpm/ipfixprobe-msec.spec.in | 2 ++ pkg/rpm/ipfixprobe-nemea.spec.in | 2 ++ pkg/rpm/ipfixprobe.spec.in | 4 +++- src/core/CMakeLists.txt | 9 ++++++++- src/core/ipfixprobe.cpp | 2 ++ src/core/stacktrace.cpp | 2 ++ 8 files changed, 24 insertions(+), 4 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index a25033a53..86c0ec408 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -21,7 +21,7 @@ option(ENABLE_OUTPUT_LZ4 "Enable Build of output LZ4 compression" option(ENABLE_PROCESS_EXPERIMENTAL "Enable build of experimental process plugins" OFF) option(ENABLE_MILLISECONDS_TIMESTAMP "Compile ipfixprobe with miliseconds timestamp precesion" OFF) option(ENABLE_NEMEA "Enable build of NEMEA plugins" OFF) - +option(ENABLE_UNWIND "Enable Build with Lib unwind" OFF) option(ENABLE_RPMBUILD "Enable build of RPM package" ON) option(ENABLE_TESTS "Build tests (make test)" OFF) diff --git a/cmake/dependencies.cmake b/cmake/dependencies.cmake index a07a8859b..048d8be76 100644 --- a/cmake/dependencies.cmake +++ b/cmake/dependencies.cmake @@ -3,7 +3,10 @@ find_package(PkgConfig REQUIRED) find_package(Threads REQUIRED) find_package(Atomic REQUIRED) -find_package(Unwind REQUIRED) +if (ENABLE_UNWIND) + find_package(Unwind REQUIRED) +endif() + if (ENABLE_OUTPUT_LZ4) find_package(LZ4 REQUIRED) endif() diff --git a/pkg/rpm/ipfixprobe-msec.spec.in b/pkg/rpm/ipfixprobe-msec.spec.in index fc8b8a242..3948d568a 100644 --- a/pkg/rpm/ipfixprobe-msec.spec.in +++ b/pkg/rpm/ipfixprobe-msec.spec.in @@ -28,7 +28,9 @@ BuildRequires: gcc-c++ >= 10 %endif BuildRequires: make BuildRequires: cmake >= 3.12 +%if %{with unwind} BuildRequires: libunwind-devel +%endif BuildRequires: gcc-toolset-14-libatomic-devel BuildRequires: pkgconfig %if %{with lz4} diff --git a/pkg/rpm/ipfixprobe-nemea.spec.in b/pkg/rpm/ipfixprobe-nemea.spec.in index a97e61d2c..3205168b4 100644 --- a/pkg/rpm/ipfixprobe-nemea.spec.in +++ b/pkg/rpm/ipfixprobe-nemea.spec.in @@ -32,7 +32,9 @@ BuildRequires: gcc-c++ >= 10 BuildRequires: make BuildRequires: cmake >= 3.12 +%if %{with unwind} BuildRequires: libunwind-devel +%endif BuildRequires: gcc-toolset-14-libatomic-devel BuildRequires: pkgconfig %if %{with lz4} diff --git a/pkg/rpm/ipfixprobe.spec.in b/pkg/rpm/ipfixprobe.spec.in index 81ecc30ad..53650bf46 100644 --- a/pkg/rpm/ipfixprobe.spec.in +++ b/pkg/rpm/ipfixprobe.spec.in @@ -35,7 +35,9 @@ BuildRequires: gcc-c++ >= 10 BuildRequires: make BuildRequires: cmake >= 3.12 +%if %{with unwind} BuildRequires: libunwind-devel +%endif BuildRequires: gcc-toolset-14-libatomic-devel BuildRequires: pkgconfig %if %{with lz4} @@ -117,7 +119,7 @@ Experimental process plugins. %if 0%{?rhel} == 8 source /opt/rh/gcc-toolset-14/enable %endif -%cmake -DCMAKE_BUILD_TYPE=Release %{?with_output_lz4:-DENABLE_OUTPUT_LZ4=ON} %{?with_input_pcap:-DENABLE_INPUT_PCAP=ON} %{?with_input_sock:-DENABLE_INPUT_SOCK=ON} %{?with_input_dpdk:-DENABLE_INPUT_DPDK=ON} %{?with_input_nfb:-DENABLE_INPUT_NFB=ON} %{?with_process_experimental: -DENABLE_PROCESS_EXPERIMENTAL=ON} +%cmake -DCMAKE_BUILD_TYPE=Release %{?with_unwind:-DENABLE_UNWIND=ON} %{?with_output_lz4:-DENABLE_OUTPUT_LZ4=ON} %{?with_input_pcap:-DENABLE_INPUT_PCAP=ON} %{?with_input_sock:-DENABLE_INPUT_SOCK=ON} %{?with_input_dpdk:-DENABLE_INPUT_DPDK=ON} %{?with_input_nfb:-DENABLE_INPUT_NFB=ON} %{?with_process_experimental: -DENABLE_PROCESS_EXPERIMENTAL=ON} %cmake_build %install diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index 340f69d25..3f463ead5 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -30,10 +30,17 @@ target_link_libraries(ipfixprobe-core telemetry::telemetry telemetry::appFs atomic::atomic - unwind::unwind ${CMAKE_DL_LIBS} ) +if (ENABLE_UNWIND) +target_link_libraries(ipfixprobe-core unwind::unwind) + +target_compile_definitions(ipfixprobe-core PRIVATE + WITH_UNWIND +) +endif() + add_executable(ipfixprobe main.cpp) target_link_libraries(ipfixprobe ${CORE_LIB}) target_link_options(ipfixprobe PRIVATE -Wl,--export-dynamic) diff --git a/src/core/ipfixprobe.cpp b/src/core/ipfixprobe.cpp index c375e51be..2ba9f36c9 100644 --- a/src/core/ipfixprobe.cpp +++ b/src/core/ipfixprobe.cpp @@ -65,7 +65,9 @@ void signal_handler(int sig) { (void) sig; if (sig == SIGSEGV) { +#ifdef WITH_UNWIND st_dump(STDERR_FILENO, sig); +#endif abort(); } stop = 1; diff --git a/src/core/stacktrace.cpp b/src/core/stacktrace.cpp index 9654a7cd7..79109ee4b 100644 --- a/src/core/stacktrace.cpp +++ b/src/core/stacktrace.cpp @@ -36,6 +36,7 @@ #define UNW_LOCAL_ONLY #include "stacktrace.hpp" +#ifdef WITH_UNWIND #include namespace ipxp { @@ -192,3 +193,4 @@ void st_dump(int fd, int sig) } } // namespace ipxp +#endif