summaryrefslogtreecommitdiffstats
path: root/src/ReceiverThreads.vma/ReceiverThreads.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/ReceiverThreads.vma/ReceiverThreads.cpp')
-rw-r--r--src/ReceiverThreads.vma/ReceiverThreads.cpp166
1 files changed, 0 insertions, 166 deletions
diff --git a/src/ReceiverThreads.vma/ReceiverThreads.cpp b/src/ReceiverThreads.vma/ReceiverThreads.cpp
deleted file mode 100644
index 650a840..0000000
--- a/src/ReceiverThreads.vma/ReceiverThreads.cpp
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Copyright 2016
- *
- * ReceiverThreads.cpp
- *
- * Created on: 21.07.2016
- * Author: Tobias Frust
- */
-
-#include "ReceiverThreads.h"
-#include "../UDPServer/UDPServer.h"
-
-#include <boost/log/trivial.hpp>
-
-ReceiverThreads::ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules)
- : timeIntervall_{timeIntervall}, numberOfDetectorModules_{numberOfDetectorModules}, address_{address}, loss_{0} {
-
- for(auto i = 0; i < numberOfDetectorModules; i++){
- receiverModules_.emplace_back(&ReceiverThreads::receiverThread, this, 4000+i);
- }
-
- for(auto i = 0; i < numberOfDetectorModules; i++){
- receiverModules_[i].join();
- }
-
-}
-
-auto ReceiverThreads::receiverThread(const int port) -> void {
- int max_packets = 100;
- int max_packet_size = 65535;
-
-
- struct vma_ap_t *vma = vma_get_api();
- if (!vma) throw "Can't get LibVMA API"
-
- struct vma_packet_desc_t vma_packet;
- struct vma_buff_t *vma_buf;
- int vma_buf_offset;
-
- int vma_ring_fd;
- vma->get_socket_rings_fds(fd?, &vma_ring_fd, 1);
- if (vma_ring_fd < 0) throw "Can't get ring fds";
-
- while (true) {
- // free vma packets
-
-
- struct vma_completion_t vma_comps[max_packets];
- int n_packets = vma->socketxtreme_poll(vma_ring_fd, vma_comps, max_packets, 0);
- if (!n_packets) continue;
-
- for (int i = 0; i < n_packets; i++) {
- switch (vma_comps[i].events) {
- case VMA_SOCKETXTREME_PACKET:
- break;
- case EPOLLERR:
- case EPOLLRDHUP:
- printf("Polling error event=0x%lx user_data=%ld\n", vma_comps[i].events, vma_comps[i].user_data);
- throw "Polling error";
- default:
- printf("Unsupported event=0x%lx user_data=%ld\n", vma_comps[i].events, vma_comps[i].user_data);
- throw "Polling error";
- }
-
- int packet_len = vma_comps[i].packet.total_len;
- int n_buffs = vma_comps[i].packet.num_bufs;
-
- vma_comps.packet.buff_lst
-
- vma_comps[i].packet.buff_lst->payload
-/*
-<------><------><------><------><------>conn = conns_in[vma_comps.user_data] vma_comps.packet.total_len;
-<------><------><------><------><------>conn->vma_packet.num_bufs = vma_comps.packet.num_bufs;
-<------><------><------><------><------>conn->vma_packet.total_len = vma_comps.packet.total_len;
-<------><------><------><------><------>conn->vma_packet.buff_lst = vma_comps.packet.buff_lst;
-<------><------><------><------><------>conn->vma_buf = conn->vma_packet.buff_lst;
-<------><------><------><------><------>conn->vma_buf_offset = 0;
-*/
-/*
- ret = _min((_config.msg_size - conn->msg_len), (conn->vma_buf->len - conn->vma_buf_offset));
- memcpy(((uint8_t *)msg_hdr) + conn->msg_len,
- ((uint8_t *)conn->vma_buf->payload) + conn->vma_buf_offset,
- ret);
-*/
- _vma_api->socketxtreme_free_vma_packets(&conn->vma_packet, 1);
- }
-
-
-auto ReceiverThreads::receiverThread(const int port) -> void {
- int max_packets = 100;
- int max_packet_size = 65535;
-
- UDPServer server = UDPServer(address_, port);
- std::vector<std::vector<char>> buffers;
-
- std::size_t rcv_index = 0;
- std::size_t rcv_packets = 0;
- std::size_t rcv_size = 0;
-
- std::size_t lastIndex{0};
- std::size_t loss = 0;
-
- struct mmsghdr msg[max_packets];
- struct iovec msgvec[max_packets];
-
- buffers.resize(max_packets);
-
- memset(msg, 0, sizeof(msg));
- memset(msgvec, 0, sizeof(msgvec));
- for (int i = 0; i < max_packets; i++) {
- buffers[i].resize(max_packet_size);
-
- msgvec[i].iov_base = buffers[i].data();
- msgvec[i].iov_len = buffers[i].size();
- msg[i].msg_hdr.msg_iov = &msgvec[i];
- msg[i].msg_hdr.msg_iovlen = 1;
- }
-
-
- BOOST_LOG_TRIVIAL(info) << "Address: " << address_ << " port: " << port << " timeout: " << timeIntervall_;
-
- double coef = 1000. * 1000. * 1000. / 1024. / 1024. / 1024.;
- auto ts_last = std::chrono::high_resolution_clock::now();
- while(true){
- int packets = server.mrecv(max_packets, msg, 1); //timeIntervall_);
-
- if (packets >= 0) {
- for (int i = 0; i < packets; i++) {
- int bytes = msg[i].msg_len;
- unsigned short *buf = reinterpret_cast<unsigned short*>(msgvec[i].iov_base);
-
- rcv_packets++;
- rcv_size += bytes;
-
-// BOOST_LOG_TRIVIAL(debug) << "Received " << bytes << " Bytes.";
- std::size_t index =*((std::size_t *)buf);
- int diff = index - lastIndex - 1;
- if(diff > 0){
- loss += diff;
- BOOST_LOG_TRIVIAL(debug) << "Packet loss or wrong order! new: " << index << " old: " << lastIndex;
- }
-
-/* if (port == 4000) {
- printf("%i:%i:%i:%i,", index, diff, loss, i);
- }*/
-
- lastIndex = index;
- }
- }
-
- auto ts = std::chrono::high_resolution_clock::now();
- std::chrono::nanoseconds d = ts - ts_last;
- if (d.count() >= 1000000000) {
- printf("Lost %.2lf%, Received: %i (%zu bytes, %.3lf GBit/s) in %.3lf ms\n", loss / (double)(lastIndex - rcv_index)*100.0, rcv_packets, rcv_size, 8. * rcv_size * coef / d.count() , 1. * d.count() / 1000000);
- rcv_packets = 0;
- rcv_size = 0;
- rcv_index = lastIndex;
- loss = 0;
- ts_last = ts;
- }
- }
-
- BOOST_LOG_TRIVIAL(info) << "Lost " << loss << " from " << lastIndex << " packets; (" << loss/(double)lastIndex*100.0 << "%)";
- loss_ += loss;
-}
-