summaryrefslogtreecommitdiffstats
path: root/src/ReceiverThreads/ReceiverThreads.cpp
blob: e5c339b762826a15ea1bb596ab773ec08167e684 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/*
 * Copyright 2016
 * 
 * ReceiverThreads.cpp
 *
 *  Created on: 21.07.2016
 *      Author: Tobias Frust
 */

#include "ReceiverThreads.h"
#include "../UDPServer/UDPServer.h"

#include <boost/log/trivial.hpp>

ReceiverThreads::ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules, const int firstPort)
   : timeIntervall_{timeIntervall}, numberOfDetectorModules_{numberOfDetectorModules}, address_{address}, loss_{0} {

   for(auto i = 0; i < numberOfDetectorModules; i++){
      receiverModules_.emplace_back(&ReceiverThreads::receiverThread, this, firstPort+i);
   }

   for(auto i = 0; i < numberOfDetectorModules; i++){
      receiverModules_[i].join();
   }

}

auto ReceiverThreads::receiverThread(const int port) -> void {
    int max_packets = 100;
    int max_packet_size = 65535;

    UDPServer server = UDPServer(address_, port);
    std::vector<std::vector<char>> buffers;

    std::size_t rcv_index = 0;
    std::size_t rcv_packets = 0;
    std::size_t rcv_size  = 0;
    
    std::size_t lastIndex{0};
    std::size_t loss = 0;
   
    struct mmsghdr msg[max_packets];
    struct iovec msgvec[max_packets];

    buffers.resize(max_packets);

    memset(msg, 0, sizeof(msg));
    memset(msgvec, 0, sizeof(msgvec));
    for (int i = 0; i < max_packets; i++) {
	buffers[i].resize(max_packet_size);

	msgvec[i].iov_base = buffers[i].data();
	msgvec[i].iov_len = buffers[i].size();
	msg[i].msg_hdr.msg_iov = &msgvec[i];
	msg[i].msg_hdr.msg_iovlen = 1;
    }

   
   printf("Listening %d\n", port);
   BOOST_LOG_TRIVIAL(info) << "Address: " << address_ << " port: " << port << " timeout: " << timeIntervall_;

   double coef = 1000. * 1000. * 1000. / 1024. / 1024. / 1024.;
   auto ts_last = std::chrono::high_resolution_clock::now();
   while(true){
	int packets = server.mrecv(max_packets, msg, 1); //timeIntervall_);

	if (packets >= 0) {
	  for (int i = 0; i < packets; i++) {
	    int bytes = msg[i].msg_len;
	    unsigned short *buf =  reinterpret_cast<unsigned short*>(msgvec[i].iov_base);
	
	    rcv_packets++;
	    rcv_size += bytes;

//	    BOOST_LOG_TRIVIAL(debug) << "Received " << bytes << " Bytes.";
    	    std::size_t index =*((std::size_t *)buf);
    	    int diff = index - lastIndex - 1;
    	    if(diff > 0){
        	loss += diff;
        	BOOST_LOG_TRIVIAL(debug) << "Packet loss or wrong order! new: " << index << " old: " << lastIndex;
    	    }

/*    	    if (port == 4000) {
    		printf("%i:%i:%i:%i,", index, diff, loss, i);
    	    }*/

    	    lastIndex = index;
    	  }
    	}
     
	auto ts = std::chrono::high_resolution_clock::now();
	std::chrono::nanoseconds d = ts - ts_last;
	if (d.count() >= 1000000000) {
	    printf("Lost %.2lf%, Received: %i (%zu bytes, %.3lf GBit/s) in %.3lf ms\n", loss / (double)(lastIndex - rcv_index)*100.0, rcv_packets, rcv_size, 8. * rcv_size * coef / d.count() , 1. * d.count() / 1000000);
	    rcv_packets = 0;
	    rcv_size = 0;
	    rcv_index = lastIndex;
	    loss = 0;
	    ts_last = ts;
	}
   }
   
   BOOST_LOG_TRIVIAL(info) << "Lost " << loss << " from " << lastIndex << " packets; (" << loss/(double)lastIndex*100.0 << "%)";
   loss_ += loss;
}