From 76affa8334acbd21f3a1186fdaace1efe93e2e31 Mon Sep 17 00:00:00 2001 From: "Suren A. Chilingaryan" Date: Wed, 25 Jul 2018 15:57:55 +0200 Subject: Recieve multiple packets in one system call --- analyze.sh | 27 ++++++++++ src/ReceiverThreads/ReceiverThreads.cpp | 88 +++++++++++++++++++++++++++------ src/UDPServer/UDPServer.cpp | 16 ++++++ src/UDPServer/UDPServer.h | 1 + 4 files changed, 116 insertions(+), 16 deletions(-) create mode 100755 analyze.sh diff --git a/analyze.sh b/analyze.sh new file mode 100755 index 0000000..9c11bd6 --- /dev/null +++ b/analyze.sh @@ -0,0 +1,27 @@ +#!/bin/bash + +sleep=1 + +stats1=($(ethtool -S ens11 | grep -P "rx\d?_(packets|bytes)" | awk '{ print $2 }')) +sleep $sleep +stats2=($(ethtool -S ens11 | grep -P "rx\d?_(packets|bytes)" | awk '{ print $2 }')) + +for i in "${!stats1[@]}"; do + diff=$(bc <<< "(${stats2[$i]} - ${stats1[$i]}) / $sleep") + + if [ $((i & 1)) -eq 0 ]; then + if [ $i -lt 2 ]; then + echo -n "Total:" + elif [ $i -lt 4 ]; then + echo -n "Phi :" + else + echo -n "Queue:" + fi + printf "packets: %9.3f kpps" $(bc -l <<< "1 * $diff / 1000") + else + printf ", bandwidth: %9.3f Gb/s\n" $(bc -l <<< "8. * $diff / 1024 / 1024 / 1024") + fi + +#echo $i +done + diff --git a/src/ReceiverThreads/ReceiverThreads.cpp b/src/ReceiverThreads/ReceiverThreads.cpp index 3d22c66..e60971e 100644 --- a/src/ReceiverThreads/ReceiverThreads.cpp +++ b/src/ReceiverThreads/ReceiverThreads.cpp @@ -26,24 +26,80 @@ ReceiverThreads::ReceiverThreads(const std::string& address, const int timeInter } auto ReceiverThreads::receiverThread(const int port) -> void { - UDPServer server = UDPServer(address_, port); - std::vector buf(33000); - std::size_t lastIndex{0}; + int max_packets = 100; + int max_packet_size = 65535; + + UDPServer server = UDPServer(address_, port); + std::vector> buffers; + + std::size_t rcv_index = 0; + std::size_t rcv_packets = 0; + std::size_t rcv_size = 0; + + std::size_t lastIndex{0}; + std::size_t loss = 0; + + struct mmsghdr msg[max_packets]; + struct iovec msgvec[max_packets]; + + buffers.resize(max_packets); + + memset(msg, 0, sizeof(msg)); + memset(msgvec, 0, sizeof(msgvec)); + for (int i = 0; i < max_packets; i++) { + buffers[i].resize(max_packet_size); + + msgvec[i].iov_base = buffers[i].data(); + msgvec[i].iov_len = buffers[i].size(); + msg[i].msg_hdr.msg_iov = &msgvec[i]; + msg[i].msg_hdr.msg_iovlen = 1; + } + + BOOST_LOG_TRIVIAL(info) << "Address: " << address_ << " port: " << port << " timeout: " << timeIntervall_; + + double coef = 1000. * 1000. * 1000. / 1024. / 1024. / 1024.; + auto ts_last = std::chrono::high_resolution_clock::now(); while(true){ - int bytes = server.timed_recv((char*)buf.data(), 65536, timeIntervall_); - if(bytes < 0){ - break; - } - BOOST_LOG_TRIVIAL(debug) << "Received " << bytes << " Bytes."; - std::size_t index = *((std::size_t *)buf.data()); - int diff = index - lastIndex - 1; - if(diff > 0){ - loss_ += diff; - BOOST_LOG_TRIVIAL(debug) << "Packet loss or wrong order! new: " << index << " old: " << lastIndex; - } - lastIndex = index; + int packets = server.mrecv(max_packets, msg, 1); //timeIntervall_); + + if (packets >= 0) { + for (int i = 0; i < packets; i++) { + int bytes = msg[i].msg_len; + unsigned short *buf = reinterpret_cast(msgvec[i].iov_base); + + rcv_packets++; + rcv_size += bytes; + +// BOOST_LOG_TRIVIAL(debug) << "Received " << bytes << " Bytes."; + std::size_t index =*((std::size_t *)buf); + int diff = index - lastIndex - 1; + if(diff > 0){ + loss += diff; + BOOST_LOG_TRIVIAL(debug) << "Packet loss or wrong order! new: " << index << " old: " << lastIndex; + } + +/* if (port == 4000) { + printf("%i:%i:%i:%i,", index, diff, loss, i); + }*/ + + lastIndex = index; + } + } + + auto ts = std::chrono::high_resolution_clock::now(); + std::chrono::nanoseconds d = ts - ts_last; + if (d.count() >= 1000000000) { + printf("Lost %.2lf%, Received: %i (%zu bytes, %.3lf GBit/s) in %.3lf ms\n", loss / (double)(lastIndex - rcv_index)*100.0, rcv_packets, rcv_size, 8. * rcv_size * coef / d.count() , 1. * d.count() / 1000000); + rcv_packets = 0; + rcv_size = 0; + rcv_index = lastIndex; + loss = 0; + ts_last = ts; + } } - BOOST_LOG_TRIVIAL(info) << "Lost " << loss_ << " from " << lastIndex << " packets; (" << loss_/(double)lastIndex*100.0 << "%)"; + + BOOST_LOG_TRIVIAL(info) << "Lost " << loss << " from " << lastIndex << " packets; (" << loss/(double)lastIndex*100.0 << "%)"; + loss_ += loss; } diff --git a/src/UDPServer/UDPServer.cpp b/src/UDPServer/UDPServer.cpp index 8c9decf..42166b4 100644 --- a/src/UDPServer/UDPServer.cpp +++ b/src/UDPServer/UDPServer.cpp @@ -81,6 +81,13 @@ UDPServer::UDPServer(const std::string& addr, int port) close(f_socket); throw udp_client_server_runtime_error(("could not bind UDP socket with: \"" + addr + ":" + decimal_port + "\"").c_str()); } + +/* + int a = 134217728; + if (setsockopt(f_socket, SOL_SOCKET, SO_RCVBUF, &a, sizeof(int)) == -1) { + fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); + } +*/ } /** \brief Clean up the UDP server. @@ -184,3 +191,12 @@ int UDPServer::timed_recv(char *msg, size_t max_size, int max_wait_s) // our socket has data return ::recv(f_socket, msg, max_size, 0); } + +int UDPServer::mrecv(int n, struct mmsghdr *msg, int max_wait_s) +{ + struct timespec timeout; + timeout.tv_sec = max_wait_s; + timeout.tv_nsec = 0; + + return recvmmsg(f_socket, msg, n, MSG_WAITFORONE, &timeout); +} diff --git a/src/UDPServer/UDPServer.h b/src/UDPServer/UDPServer.h index 22f33b3..ed0e033 100644 --- a/src/UDPServer/UDPServer.h +++ b/src/UDPServer/UDPServer.h @@ -34,6 +34,7 @@ public: int recv(char *msg, size_t max_size); int timed_recv(char *msg, size_t max_size, int max_wait_ms); + int mrecv(int n, struct mmsghdr *msg, int max_wait_s); private: int f_socket; -- cgit v1.2.1