summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSuren A. Chilingaryan <csa@suren.me>2018-08-06 18:35:13 +0200
committerSuren A. Chilingaryan <csa@suren.me>2018-08-06 18:35:13 +0200
commit09b7e077f10fe324774f759086f3596947d69b19 (patch)
treed9d910844704459a5390382949d02648f3a3f5a9
parent76affa8334acbd21f3a1186fdaace1efe93e2e31 (diff)
downloadods-09b7e077f10fe324774f759086f3596947d69b19.tar.gz
ods-09b7e077f10fe324774f759086f3596947d69b19.tar.bz2
ods-09b7e077f10fe324774f759086f3596947d69b19.tar.xz
ods-09b7e077f10fe324774f759086f3596947d69b19.zip
Send multiple packets in one system call
-rw-r--r--config.cfg4
-rw-r--r--src/DetectorModule/DetectorModule.cpp92
-rw-r--r--src/DetectorModule/DetectorModule.h8
-rw-r--r--src/UDPClient/UDPClient.cpp13
-rw-r--r--src/UDPClient/UDPClient.h1
5 files changed, 94 insertions, 24 deletions
diff --git a/config.cfg b/config.cfg
index 87bdb78..d5f6978 100644
--- a/config.cfg
+++ b/config.cfg
@@ -1,10 +1,10 @@
numberOfFanDetectors = 432
-dataInputPath = "<change_path>/DetModStream/data_pumpe_repaired/"
+dataInputPath = "/mnt/ands/testdata/data_pumpe_repaired/"
dataFileName = "data_pumpe_repaired_DetModNr_"
dataFileEnding = ".fx"
numberOfPlanes = 2
samplingRate = 1
scanRate = 2000
numberOfDataFrames = 500
-numberOfProjectionsPerPacket = 500
+numberOfProjectionsPerPacket = 40
numberOfDetectorsPerModule = 16
diff --git a/src/DetectorModule/DetectorModule.cpp b/src/DetectorModule/DetectorModule.cpp
index 169c5a5..e7e0272 100644
--- a/src/DetectorModule/DetectorModule.cpp
+++ b/src/DetectorModule/DetectorModule.cpp
@@ -15,12 +15,27 @@
#include <exception>
#include <fstream>
-void timer_start(std::function<void(void)> func, unsigned int interval){
- std::thread([func, interval]() {
+void timer_start(std::function<void(int)> func, unsigned int interval, unsigned int max_packets){
+ std::thread([func, interval, max_packets]() {
+ int packets = 1;
+ auto next = std::chrono::high_resolution_clock::now();
while (true)
{
- func();
- std::this_thread::sleep_for(std::chrono::microseconds(interval));
+ func(packets);
+
+ next += std::chrono::microseconds(packets * interval);
+ auto now = std::chrono::high_resolution_clock::now();
+ if (now > next) {
+ std::chrono::nanoseconds late = now - next;
+ packets = 1 + (late.count() / interval / 1000);
+ if (packets > max_packets)
+ packets = max_packets;
+ } else {
+ packets = 1;
+ }
+
+ std::this_thread::sleep_until(next);
+// std::this_thread::sleep_for(std::chrono::microseconds(interval));
}
}).detach();
}
@@ -29,7 +44,8 @@ DetectorModule::DetectorModule(const int detectorID, const std::string& address,
detectorID_{detectorID},
numberOfDetectorsPerModule_{16},
index_{0u},
- client_{address, detectorID+4000}{
+ client_{address, detectorID+4000},
+ max_packets_{1000u}{
printf("Creating %d\n", detectorID);
@@ -37,7 +53,10 @@ DetectorModule::DetectorModule(const int detectorID, const std::string& address,
throw std::runtime_error("DetectorModule: Configuration file could not be loaded successfully. Please check!");
}
- sendBuffer_.resize(numberOfProjectionsPerPacket_*numberOfDetectorsPerModule_*sizeof(unsigned short)+sizeof(std::size_t));
+ sendBuffer_.resize(max_packets_);
+ for(auto &it: sendBuffer_) {
+ it.resize(numberOfProjectionsPerPacket_ * numberOfDetectorsPerModule_ * sizeof(unsigned short) + sizeof(size_t) + sizeof(short int));
+ }
//read the input data from the file corresponding to the detectorModuleID
readInput();
@@ -46,7 +65,7 @@ DetectorModule::DetectorModule(const int detectorID, const std::string& address,
printf("Created %d\n", detectorID);
}
-auto DetectorModule::send() -> void{
+auto DetectorModule::send(int packets = 1) -> void{
BOOST_LOG_TRIVIAL(debug) << "Detectormodule " << detectorID_ << " :sending udp packet with index " << index_ << ".";
int numberOfParts = numberOfProjections_/numberOfProjectionsPerPacket_;
// sendBuffer_[0] = (sizeof(std::size_t)) & 0xff;
@@ -57,23 +76,56 @@ auto DetectorModule::send() -> void{
// sendBuffer_[5] = (sizeof(std::size_t) >> 40) & 0xff;
// sendBuffer_[6] = (sizeof(std::size_t) >> 48) & 0xff;
// sendBuffer_[7] = (sizeof(std::size_t) >> 56) & 0xff;
- unsigned int bufferSizeIndex = index_ % 1000;
- unsigned int sinoSize = numberOfDetectorsPerModule_*numberOfProjectionsPerPacket_;
- *reinterpret_cast<int*>(sendBuffer_.data()) = index_;
- *reinterpret_cast<unsigned short*>(sendBuffer_.data()+sizeof(std::size_t)) = partID_;
- std::copy(((char*)buffer_.data())+sinoSize*(bufferSizeIndex*numberOfParts+partID_)*sizeof(unsigned short), ((char*)buffer_.data())+(sinoSize*(1+bufferSizeIndex*numberOfParts+partID_))*sizeof(unsigned short), sendBuffer_.begin()+sizeof(std::size_t)+sizeof(unsigned short));
- BOOST_LOG_TRIVIAL(debug) << "INDEX: " << (bufferSizeIndex*numberOfParts+partID_);
- client_.send(sendBuffer_.data(), sendBuffer_.size());
- partID_ = (partID_+1) % numberOfParts;
- if(partID_ == 0)
- ++index_;
+
+
+ struct mmsghdr msg[packets];
+ struct iovec msgvec[packets];
+
+ unsigned int hdrSize = sizeof(size_t) + sizeof(short int);
+ unsigned int sinoSize = numberOfDetectorsPerModule_ * numberOfProjectionsPerPacket_;
+
+ memset(msg, 0, sizeof(msg));
+ memset(msgvec, 0, sizeof(msgvec));
+ for (int i = 0; i < packets; i++) {
+ unsigned int bufferSizeIndex = index_ % 1000;
+
+ char *ptr = sendBuffer_[i].data();
+
+ msgvec[i].iov_base = sendBuffer_[i].data();
+ msgvec[i].iov_len = sendBuffer_[i].size();
+ msg[i].msg_hdr.msg_iov = &msgvec[i];
+ msg[i].msg_hdr.msg_iovlen = 1;
+
+
+ *reinterpret_cast<size_t*>(ptr) = index_ * numberOfParts + partID_;
+ *reinterpret_cast<unsigned short*>(ptr + sizeof(size_t)) = partID_;
+ memcpy(ptr + hdrSize, buffer_.data() + sinoSize * (bufferSizeIndex * numberOfParts + partID_), sinoSize * sizeof(unsigned short));
+
+ partID_ = (partID_ + 1) % numberOfParts;
+ if (partID_ == 0) ++index_;
+ }
+
+ client_.msend(packets, msg);
+
+ auto ts = std::chrono::high_resolution_clock::now();
+ std::chrono::nanoseconds d = ts - ts_;
+ counter_ += packets;
+ if (d.count() >= 1000000000) {
+ printf("Packets %i (%zu bytes, %.3lf GBit/s) in %.3lf ms\n", counter_, sendBuffer_[0].size(), 8. * counter_ * sendBuffer_[0].size() / 1024 / 1024 / 1024, 1. * d.count() / 1000000);
+ counter_ = 0;
+ ts_ = ts;
+ }
}
auto DetectorModule::sendPeriodically(unsigned int timeIntervall) -> void {
- std::function<void(void)> f = [=]() {
- this->send();
+ counter_ = 0;
+ ips_ = 1000000. / ((double)timeIntervall);
+ ts_ = std::chrono::high_resolution_clock::now();
+
+ std::function<void(int)> f = [=](int packets = 1) {
+ this->send(packets);
};
- timer_start(f, timeIntervall);
+ timer_start(f, timeIntervall, max_packets_);
}
auto DetectorModule::readInput() -> void {
diff --git a/src/DetectorModule/DetectorModule.h b/src/DetectorModule/DetectorModule.h
index afe4d04..f959857 100644
--- a/src/DetectorModule/DetectorModule.h
+++ b/src/DetectorModule/DetectorModule.h
@@ -27,7 +27,7 @@ public:
private:
std::vector<unsigned short> buffer_;
- std::vector<char> sendBuffer_;
+ std::vector<std::vector<char>> sendBuffer_;
int detectorID_;
UDPClient client_;
@@ -40,12 +40,16 @@ private:
unsigned int numberOfFrames_;
std::string path_, fileName_, fileEnding_;
+ unsigned int max_packets_;
std::size_t index_;
+ std::size_t ips_;
+ std::size_t counter_;
+ std::chrono::high_resolution_clock::time_point ts_;
unsigned short partID_{0};
auto readConfig(const std::string& configFile) -> bool;
auto readInput() -> void;
- auto send() -> void;
+ auto send(int packets) -> void;
};
diff --git a/src/UDPClient/UDPClient.cpp b/src/UDPClient/UDPClient.cpp
index 1d427ba..b9d55d0 100644
--- a/src/UDPClient/UDPClient.cpp
+++ b/src/UDPClient/UDPClient.cpp
@@ -64,12 +64,21 @@
{
throw udp_client_server_runtime_error(("invalid address or port: \"" + addr + ":" + decimal_port + "\"").c_str());
}
+
f_socket = socket(f_addrinfo->ai_family, SOCK_DGRAM | SOCK_CLOEXEC, IPPROTO_UDP);
if(f_socket == -1)
{
freeaddrinfo(f_addrinfo);
throw udp_client_server_runtime_error(("could not create socket for: \"" + addr + ":" + decimal_port + "\"").c_str());
}
+
+ if (connect(f_socket, f_addrinfo->ai_addr, f_addrinfo->ai_addrlen))
+ {
+ close(f_socket);
+ freeaddrinfo(f_addrinfo);
+ throw udp_client_server_runtime_error(("could not connect socket for: \"" + addr + ":" + decimal_port + "\"").c_str());
+ }
+
printf("Created client %d\n", f_port);
}
@@ -140,3 +149,7 @@
int UDPClient::send(const char *msg, std::size_t size){
return sendto(f_socket, msg, size, 0, f_addrinfo->ai_addr, f_addrinfo->ai_addrlen);
}
+
+ int UDPClient::msend(int n, struct mmsghdr *msg){
+ return sendmmsg(f_socket, msg, n, 0);
+ }
diff --git a/src/UDPClient/UDPClient.h b/src/UDPClient/UDPClient.h
index f6cf0d6..f1c8a8d 100644
--- a/src/UDPClient/UDPClient.h
+++ b/src/UDPClient/UDPClient.h
@@ -33,6 +33,7 @@ public:
std::string get_addr() const;
int send(const char *msg, size_t size);
+ int msend(int n, struct mmsghdr *msg);
private:
int f_socket;