summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTobias Frust <tobiasfrust@gmail.com>2016-07-21 09:33:42 +0200
committerTobias Frust <tobiasfrust@gmail.com>2016-07-21 09:33:42 +0200
commitcdf8aac7e3a88df0fb93586bbf47b17c192ae2fc (patch)
treead9cc18aeefd5f7923ab0106bc2e580490ba087f
parent409e2fd20af5620066796e43410a92521376b2c1 (diff)
downloadods-cdf8aac7e3a88df0fb93586bbf47b17c192ae2fc.tar.gz
ods-cdf8aac7e3a88df0fb93586bbf47b17c192ae2fc.tar.bz2
ods-cdf8aac7e3a88df0fb93586bbf47b17c192ae2fc.tar.xz
ods-cdf8aac7e3a88df0fb93586bbf47b17c192ae2fc.zip
added ReceiverThreads for tests with 40GBE
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/CMakeLists.txt~14
-rw-r--r--src/DetectorModule/DetectorModule.cpp2
-rw-r--r--src/ReceiverThreads/ReceiverThreads.cpp48
-rw-r--r--src/ReceiverThreads/ReceiverThreads.h35
-rw-r--r--src/UDPServer/UDPServer.cpp26
-rw-r--r--src/main_server.cpp33
7 files changed, 122 insertions, 37 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index d77a039..d4ee49d 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -45,6 +45,7 @@ set(SOURCES_CLIENT
)
set(SOURCES_SERVER
+ "${CMAKE_SOURCE_DIR}/ReceiverThreads/ReceiverThreads.cpp"
"${CMAKE_SOURCE_DIR}/UDPServer/UDPServer.cpp"
"${CMAKE_SOURCE_DIR}/main_server.cpp"
)
diff --git a/src/CMakeLists.txt~ b/src/CMakeLists.txt~
index 80196d9..ebe2f50 100644
--- a/src/CMakeLists.txt~
+++ b/src/CMakeLists.txt~
@@ -31,9 +31,18 @@ include_directories(
${BOOST_INCLUDE_DIRS}
)
+set(LINK_LIBRARIES ${LINK_LIBRARIES}
+ ${LIBCONFIGPP_LIBRARY}
+ ${Boost_LIBRARIES}
+)
+
set(SOURCES_CLIENT
+ "${CMAKE_SOURCE_DIR}/ConfigReader/ConfigReader.cpp"
"${CMAKE_SOURCE_DIR}/UDPClient/UDPClient.cpp"
- "${CMAKE_SOURCE_DIR}/main.cpp"
+ "${CMAKE_SOURCE_DIR}/DetectorModule/DetectorModule.cpp"
+ "${CMAKE_SOURCE_DIR}/Detector/Detector.cpp"
+ "${CMAKE_SOURCE_DIR}/main_client.cpp"
+ "${CMAKE_SOURCE_DIR}/ReceiverThreads/ReceiverThreads.cpp"
)
set(SOURCES_SERVER
@@ -43,5 +52,6 @@ set(SOURCES_SERVER
add_executable(onlineDetectorSimulatorServer ${SOURCES_SERVER})
add_executable(onlineDetectorSimulatorClient ${SOURCES_CLIENT})
-
+target_link_libraries(onlineDetectorSimulatorClient ${LINK_LIBRARIES})
+target_link_libraries(onlineDetectorSimulatorServer ${LINK_LIBRARIES})
diff --git a/src/DetectorModule/DetectorModule.cpp b/src/DetectorModule/DetectorModule.cpp
index bee50e9..9c3d98f 100644
--- a/src/DetectorModule/DetectorModule.cpp
+++ b/src/DetectorModule/DetectorModule.cpp
@@ -28,7 +28,7 @@ void timer_start(std::function<void(void)> func, unsigned int interval){
DetectorModule::DetectorModule(const int detectorID, const std::string& address, const std::string& configPath) :
detectorID_{detectorID},
numberOfDetectorsPerModule_{16},
- index_{0},
+ index_{1},
client_{address, detectorID+4000} {
printf("Creating %d\n", detectorID);
diff --git a/src/ReceiverThreads/ReceiverThreads.cpp b/src/ReceiverThreads/ReceiverThreads.cpp
new file mode 100644
index 0000000..6f389f2
--- /dev/null
+++ b/src/ReceiverThreads/ReceiverThreads.cpp
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2016
+ *
+ * ReceiverThreads.cpp
+ *
+ * Created on: 21.07.2016
+ * Author: Tobias Frust
+ */
+
+#include "ReceiverThreads.h"
+#include "../UDPServer/UDPServer.h"
+
+#include <boost/log/trivial.hpp>
+
+ReceiverThreads::ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules)
+ : timeIntervall_{timeIntervall}, numberOfDetectorModules_{numberOfDetectorModules}, address_{address}, loss_{0} {
+
+ for(auto i = 0; i < numberOfDetectorModules; i++){
+ receiverModules_.emplace_back(&ReceiverThreads::receiverThread, this, 4000+i);
+ }
+
+ for(auto i = 0; i < numberOfDetectorModules; i++){
+ receiverModules_[i].join();
+ }
+
+}
+
+auto ReceiverThreads::receiverThread(const int port) -> void {
+ UDPServer server = UDPServer(address_, port);
+ std::vector<unsigned short> buf(16000);
+ std::size_t lastIndex{0};
+ BOOST_LOG_TRIVIAL(info) << "Address: " << address_ << " port: " << port << " timeout: " << timeIntervall_;
+ while(true){
+ int bytes = server.timed_recv((char*)buf.data(), buf.size()*sizeof(unsigned short), timeIntervall_);
+ if(bytes < 0){
+ break;
+ }
+ std::size_t index = *((std::size_t *)buf.data());
+ int diff = index - lastIndex - 1;
+ if(diff > 0){
+ BOOST_LOG_TRIVIAL(warning) << "Packet loss or wrong order! new: " << index << " old: " << lastIndex;
+ }
+ loss_ += diff;
+ lastIndex = index;
+ }
+ BOOST_LOG_TRIVIAL(info) << "Lost " << loss_ << " from " << lastIndex << " packets; (" << loss_/(double)lastIndex << "%)";
+}
+
diff --git a/src/ReceiverThreads/ReceiverThreads.h b/src/ReceiverThreads/ReceiverThreads.h
new file mode 100644
index 0000000..7cb04c0
--- /dev/null
+++ b/src/ReceiverThreads/ReceiverThreads.h
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2016
+ *
+ * ReceiverThreads.h
+ *
+ * Created on: 21.07.2016
+ * Author: Tobias Frust
+ */
+
+#ifndef RECEIVERTHREADS_H_
+#define RECEIVERTHREADS_H_
+
+#include <vector>
+#include <thread>
+
+class ReceiverThreads {
+public:
+ ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules);
+
+ auto run() -> void;
+private:
+ auto receiverThread(const int port) -> void;
+
+ std::vector<std::thread> receiverModules_;
+
+ std::size_t loss_;
+
+ int timeIntervall_;
+ int numberOfDetectorModules_;
+
+ std::string address_;
+
+};
+
+#endif /* RECEIVERTHREADS_H_ */
diff --git a/src/UDPServer/UDPServer.cpp b/src/UDPServer/UDPServer.cpp
index 3a50d0c..8c9decf 100644
--- a/src/UDPServer/UDPServer.cpp
+++ b/src/UDPServer/UDPServer.cpp
@@ -168,31 +168,19 @@ int UDPServer::recv(char *msg, size_t max_size)
*
* \param[in] msg The buffer where the message will be saved.
* \param[in] max_size The size of the \p msg buffer in bytes.
- * \param[in] max_wait_ms The maximum number of milliseconds to wait for a message.
+ * \param[in] max_wait_s The maximum number of seconds to wait for a message.
*
* \return -1 if an error occurs or the function timed out, the number of bytes received otherwise.
*/
-int UDPServer::timed_recv(char *msg, size_t max_size, int max_wait_ms)
+int UDPServer::timed_recv(char *msg, size_t max_size, int max_wait_s)
{
fd_set s;
FD_ZERO(&s);
FD_SET(f_socket, &s);
struct timeval timeout;
- timeout.tv_sec = max_wait_ms / 1000;
- timeout.tv_usec = (max_wait_ms % 1000) * 1000;
- int retval = select(f_socket + 1, &s, &s, &s, &timeout);
- if(retval == -1)
- {
- // select() set errno accordingly
- return -1;
- }
- if(retval > 0)
- {
- // our socket has data
- return ::recv(f_socket, msg, max_size, 0);
- }
-
- // our socket has no data
- errno = EAGAIN;
- return -1;
+ timeout.tv_sec = max_wait_s;
+ timeout.tv_usec = 0;
+ setsockopt(f_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout,sizeof(struct timeval));
+ // our socket has data
+ return ::recv(f_socket, msg, max_size, 0);
}
diff --git a/src/main_server.cpp b/src/main_server.cpp
index 6e936e4..90b3835 100644
--- a/src/main_server.cpp
+++ b/src/main_server.cpp
@@ -1,4 +1,5 @@
#include "UDPServer/UDPServer.h"
+#include "ReceiverThreads/ReceiverThreads.h"
#include <boost/log/core.hpp>
#include <boost/log/trivial.hpp>
@@ -30,9 +31,9 @@ int main (int argc, char *argv[]){
initLog();
std::string address = "localhost";
- int port = 4002;
-
- UDPServer server = UDPServer(address, port);
+// int port = 4002;
+//
+// UDPServer server = UDPServer(address, port);
std::size_t length{32768};
std::size_t lastIndex{0};
@@ -41,6 +42,8 @@ int main (int argc, char *argv[]){
std::cout << "Receiving UDP packages: " << std::endl;
+ ReceiverThreads(address, 10, 27);
+
// for(auto i = 0; i < 27; i++){
// std::function<void(void)> f = [=]() {
// server.recv();
@@ -48,18 +51,18 @@ int main (int argc, char *argv[]){
// start();
// }
- while(true){
- int bytes = server.recv((char*)buf.data(), length);
- std::size_t index = *((std::size_t *)buf.data());
- if(index%1000 == 99) printf("%lu\n", index);
-
- if(lastIndex != (index-1))
- BOOST_LOG_TRIVIAL(warning) << "Packet loss or wrong order!";
-
- lastIndex = index;
-
- BOOST_LOG_TRIVIAL(debug) << "Server: Received " << bytes << " Bytes with Index " << index;
- }
+// while(true){
+// int bytes = server.recv((char*)buf.data(), length);
+// std::size_t index = *((std::size_t *)buf.data());
+// if(index%1000 == 99) printf("%lu\n", index);
+//
+// if(lastIndex != (index-1))
+// BOOST_LOG_TRIVIAL(warning) << "Packet loss or wrong order!";
+//
+// lastIndex = index;
+//
+// BOOST_LOG_TRIVIAL(debug) << "Server: Received " << bytes << " Bytes with Index " << index;
+// }
return 0;