I'm working on a Windows TCP socket-based listener in C++ where the Listener() function runs in a separate thread. It waits for image frames from a detector device. I have implemented an Abort() function that is supposed to interrupt the Listener() thread safely mid-transfer, even when no data is being received. My issue is: If I call Abort() during idle time between detector data transmissions, the Listener() function doesn’t exit immediately. It continues waiting inside recv()/select() until data actually arrives from the detector. Only then it proceeds and exits.
This works correctly if I call Abort() during an active data transmission phase (i.e., when detector is actively sending data). Expected Behavior: When Abort() is triggered, it sets abortFlag = true and shuts down ClientSocket.
The Listener() should detect this abort and return immediately, regardless of whether the detector is currently sending data or not. Actual Behavior: If detector is idle when Abort() is triggered, Listener() hangs until next data is received.
Only then the function proceeds and exits.
Constraints: I cannot flush or trigger the detector from software—it only sends data in hardware-triggered chunks.
I need the system to behave robustly even when the detector is not sending any data.
What I’ve Tried: Added select() with timeout in all recv() loops (100ms).
Set socket to non-blocking using ioctlsocket(ClientSocket, FIONBIO, &mode);.
Explicitly call shutdown(ClientSocket, SD_BOTH) in Abort() to unblock the listener.
Checked that abortFlag is being set and checked properly. Also I am using QT signal and slots mechanism to send abort signal.
void DETECTOR::Abort(){
abortFlag.store(true);
qDebug() << "Abort flag set";
if (ClientSocket != INVALID_SOCKET) {
shutdown(ClientSocket, SD_BOTH);
closesocket(ClientSocket);
ClientSocket = INVALID_SOCKET;
qDebug() << "Socket closed during abort";
}
StopSequence(); // Optional hardware-specific cleanup
}
int DETECTOR::Listener(XcAPI& det, const API_SETTINGS& api, const DETECTOR_INFO& info)
{
abortFlag.store(false);
SOCKET ListenSocket = INVALID_SOCKET;
ClientSocket = INVALID_SOCKET;
sockaddr_in service;
char* recvbuf = NULL;
char* framebuf = NULL;
uint8_t* images = NULL;
bool complete = false;
std::cout << "Listener starting...\n";
qDebug() << "Listener starting...\n";
int rc = 0;
try
{
ListenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (ListenSocket == INVALID_SOCKET) {
throw std::runtime_error("socket failed with error: " + std::to_string(WSAGetLastError()));
}
service.sin_family = AF_INET;
if (inet_pton(AF_INET, info.host_ip, &service.sin_addr) <= 0) {
throw std::runtime_error("inet_pton error occurred on host address " + std::string(info.host_ip));
}
service.sin_port = htons(info.port);
std::cout << "Binding socket...\n";
int iResult = bind(ListenSocket, (SOCKADDR*)&service, sizeof(service));
if (iResult == SOCKET_ERROR) {
closesocket(ListenSocket);
throw std::runtime_error("bind failed for host address " + std::string(info.host_ip));
}
std::cout << "Done binding\n";
qDebug() << "Started Listening...";
iResult = listen(ListenSocket, 1);
if (iResult == SOCKET_ERROR) {
closesocket(ListenSocket);
throw std::runtime_error("listen failed with error: " + std::to_string(WSAGetLastError()));
}
XGET_ReturnCode xrc = det.Acquire();
if (xrc != XGET_RC_OK) {
closesocket(ListenSocket);
PrintError(det);
throw std::runtime_error("API Acquire error");
}
ClientSocket = accept(ListenSocket, NULL, NULL);
if (ClientSocket == INVALID_SOCKET) {
closesocket(ListenSocket);
throw std::runtime_error("accept failed with error: " + std::to_string(WSAGetLastError()));
}
closesocket(ListenSocket);
// Set ClientSocket to non-blocking
u_long mode = 1;
ioctlsocket(ClientSocket, FIONBIO, &mode);
recvbuf = new char[info.sequence_header_size];
int i = 0, j = 0;
// Read sequence header
while (j < info.sequence_header_size && !abortFlag.load())
{
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(ClientSocket, &rfds);
timeval tv = { 0, 100000 }; // 100ms
int sel = select(0, &rfds, NULL, NULL, &tv);
if (sel > 0 && FD_ISSET(ClientSocket, &rfds)) {
i = recv(ClientSocket, recvbuf + j, info.sequence_header_size - j, 0);
if (i <= 0) break;
j += i;
} else if (abortFlag.load()) {
qDebug() << "Aborted during header reception.";
throw std::runtime_error("Aborted during header reception.");
}
}
SCAN_HEADER_V2* SequenceHeaderStruc = (SCAN_HEADER_V2*)recvbuf;
int ImageByteSize = GetFrameByteSize(*SequenceHeaderStruc);
int iImageByteSizeWithDescriptor = GetFrameByteSizeWithDescriptor(*SequenceHeaderStruc, info.frame_descriptor_size);
int nrImages = api.number_of_continious_frames;
int iWidth = SequenceHeaderStruc->size_x;
int iHeight = SequenceHeaderStruc->size_y;
int iEnergyMode = SequenceHeaderStruc->Energy_mode;
PrintSequenceHeaderInfo(*SequenceHeaderStruc);
framebuf = new char[iImageByteSizeWithDescriptor];
images = new uint8_t[(size_t)ImageByteSize * nrImages];
size_t recv_frames = 0;
while (!abortFlag.load() && recv_frames < nrImages)
{
j = 0;
memset(framebuf, 0, ImageByteSize);
while (j < iImageByteSizeWithDescriptor && !abortFlag.load())
{
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(ClientSocket, &rfds);
timeval tv = { 0, 100000 };
int sel = select(0, &rfds, NULL, NULL, &tv);
if (sel > 0 && FD_ISSET(ClientSocket, &rfds)) {
i = recv(ClientSocket, framebuf + j, iImageByteSizeWithDescriptor - j, 0);
if (i <= 0) break;
j += i;
} else if (abortFlag.load()) {
emit DetectorInfoLog("DET", "Acquisition Aborted (during frame recv)");
complete = true;
qDebug() << "++++++aborting detector++++" << complete;
break;
}
}
if (j == iImageByteSizeWithDescriptor) {
memcpy(images + (recv_frames * ImageByteSize), framebuf, ImageByteSize);
if (recv_frames % 59 == 0) std::cout << "Frame num " << recv_frames << "\n";
recv_frames++;
double percent = (static_cast<double>(recv_frames) / nrImages) * 100;
emit progress(percent);
emit transferCurrentFrame(complete, images, recv_frames, iEnergyMode == 0 ? 1 : 2);
}
QCoreApplication::processEvents();
}
// Save images if not aborted or some frames were received
bool interlaced = (!api.TDS_Enabled || api.TDS_Step_Size == 0) ? info.dual_frame_mode_interlaced : true;
if (recv_frames > 0) {
if (abortFlag.load()) {
nrImages = 59 * (int(recv_frames / 59));
}
SaveImages(images, iWidth, iHeight, nrImages, SequenceHeaderStruc->pixel_size, iEnergyMode == 0 ? 1 : 2, interlaced, api);
emit progress(100);
}
}
catch (std::exception& ex)
{
std::cout << "Exception at Listener: " << ex.what() << "\n";
rc = -2;
}
delete[] recvbuf;
delete[] framebuf;
delete[] images;
if (ClientSocket != INVALID_SOCKET) {
shutdown(ClientSocket, SD_BOTH);
closesocket(ClientSocket);
ClientSocket = INVALID_SOCKET;
}
return rc;
}
Also tried to use QT network framework like below but its still having same problem
int DETECTOR::Listener(XcAPI& det, const API_SETTINGS& api, const DETECTOR_INFO& info)
{
abortFlag.store(false);
QTcpServer server;
QTcpSocket* socket = nullptr;
char* recvbuf = nullptr;
char* framebuf = nullptr;
uint8_t* images = nullptr;
bool complete = false;
int rc = 0;
qDebug() << "Listener starting...";
try {
if (!server.listen(QHostAddress(info.host_ip), info.port)) {
throw std::runtime_error("Failed to bind: " + server.errorString().toStdString());
}
qDebug() << "Started Listening...";
XGET_ReturnCode xrc = det.Acquire();
if (xrc != XGET_RC_OK) {
server.close();
PrintError(det);
throw std::runtime_error("API Acquire error");
}
// Wait for incoming connection (with abort check)
while (!server.hasPendingConnections()) {
if (abortFlag.load()) throw std::runtime_error("Aborted before connection");
QCoreApplication::processEvents();
}
socket = server.nextPendingConnection();
server.close();
if (!socket) throw std::runtime_error("Failed to accept connection");
socket->setReadBufferSize(10 * 1024 * 1024);
recvbuf = new char[info.sequence_header_size];
int totalRead = 0;
while (totalRead < info.sequence_header_size && !abortFlag.load()) {
if (!socket->waitForReadyRead(100)) continue;
int bytesRead = socket->read(recvbuf + totalRead, info.sequence_header_size - totalRead);
if (bytesRead <= 0) throw std::runtime_error("Failed reading sequence header");
totalRead += bytesRead;
}
SCAN_HEADER_V2* SequenceHeaderStruc = (SCAN_HEADER_V2*)recvbuf;
int ImageByteSize = GetFrameByteSize(*SequenceHeaderStruc);
int iImageByteSizeWithDescriptor = GetFrameByteSizeWithDescriptor(*SequenceHeaderStruc, info.frame_descriptor_size);
int nrImages = api.number_of_continious_frames;
int iWidth = SequenceHeaderStruc->size_x;
int iHeight = SequenceHeaderStruc->size_y;
int iEnergyMode = SequenceHeaderStruc->Energy_mode;
PrintSequenceHeaderInfo(*SequenceHeaderStruc);
framebuf = new char[iImageByteSizeWithDescriptor];
images = new uint8_t[(size_t)ImageByteSize * (size_t)nrImages];
size_t recv_frames = 0;
while (!abortFlag.load() && recv_frames < nrImages) {
memset(framebuf, 0, ImageByteSize);
totalRead = 0;
while (totalRead < iImageByteSizeWithDescriptor && !abortFlag.load()) {
if (!socket->waitForReadyRead(100)) continue;
int bytesRead = socket->read(framebuf + totalRead, iImageByteSizeWithDescriptor - totalRead);
if (bytesRead <= 0) throw std::runtime_error("Error receiving frame");
totalRead += bytesRead;
}
memcpy(images + (recv_frames * ImageByteSize), framebuf, ImageByteSize);
if (recv_frames % 59 == 0) qDebug() << "Frame num" << recv_frames;
recv_frames++;
double percent = (static_cast<double>(recv_frames) / nrImages) * 100;
emit progress(percent);
emit transferCurrentFrame(complete, images, recv_frames, iEnergyMode == 0 ? 1 : 2);
QCoreApplication::processEvents();
}
bool interlaced = (!api.TDS_Enabled || api.TDS_Step_Size == 0) ? info.dual_frame_mode_interlaced : true;
if (recv_frames > 0) {
if (abortFlag.load()) {
nrImages = 59 * static_cast<int>(recv_frames / 59);
}
SaveImages(images, iWidth, iHeight, nrImages, SequenceHeaderStruc->pixel_size,
iEnergyMode == 0 ? 1 : 2, interlaced, api);
emit progress(100);
}
} catch (std::exception& ex) {
qDebug() << "Exception at Listener:" << ex.what();
rc = -2;
}
delete[] recvbuf;
delete[] framebuf;
delete[] images;
if (socket) {
socket->disconnectFromHost();
socket->waitForDisconnected();
delete socket;
}
return rc;
}
Question: How can I make sure recv() or select()/recv() combination returns immediately when Abort() is called during detector idle periods?
Are there edge cases where shutdown() or closesocket() doesn't unblock recv() or select()? Should I use a different mechanism (e.g. signaling with eventfd or using a dummy pipe/socketpair)?
sendandrecvfunctions, which doesn't really have that kind of functionality.INVALID_SOCKETandclosesocket()implies you are targeting Windows, yes? If so, then consider usingWSAEventSelect()+WSAWaitForMultipleEvents()instead ofselect(). That way, you can wait on 2WSAEVENTs at the same time - one for the socket and one for the abort - and act on whichever on satisfies the wait.