0

I'm working on a Windows TCP socket-based listener in C++ where the Listener() function runs in a separate thread. It waits for image frames from a detector device. I have implemented an Abort() function that is supposed to interrupt the Listener() thread safely mid-transfer, even when no data is being received. My issue is: If I call Abort() during idle time between detector data transmissions, the Listener() function doesn’t exit immediately. It continues waiting inside recv()/select() until data actually arrives from the detector. Only then it proceeds and exits.

This works correctly if I call Abort() during an active data transmission phase (i.e., when detector is actively sending data). Expected Behavior: When Abort() is triggered, it sets abortFlag = true and shuts down ClientSocket.

The Listener() should detect this abort and return immediately, regardless of whether the detector is currently sending data or not. Actual Behavior: If detector is idle when Abort() is triggered, Listener() hangs until next data is received.

Only then the function proceeds and exits.

Constraints: I cannot flush or trigger the detector from software—it only sends data in hardware-triggered chunks.

I need the system to behave robustly even when the detector is not sending any data.

What I’ve Tried: Added select() with timeout in all recv() loops (100ms).

Set socket to non-blocking using ioctlsocket(ClientSocket, FIONBIO, &mode);.

Explicitly call shutdown(ClientSocket, SD_BOTH) in Abort() to unblock the listener.

Checked that abortFlag is being set and checked properly. Also I am using QT signal and slots mechanism to send abort signal.

void DETECTOR::Abort(){
    abortFlag.store(true);
    qDebug() << "Abort flag set";

    if (ClientSocket != INVALID_SOCKET) {
        shutdown(ClientSocket, SD_BOTH);
        closesocket(ClientSocket);
        ClientSocket = INVALID_SOCKET;
        qDebug() << "Socket closed during abort";
    }

    StopSequence(); // Optional hardware-specific cleanup
}


int DETECTOR::Listener(XcAPI& det, const API_SETTINGS& api, const DETECTOR_INFO& info)
{
    abortFlag.store(false);
    SOCKET ListenSocket = INVALID_SOCKET;
    ClientSocket = INVALID_SOCKET;
    sockaddr_in service;
    char* recvbuf = NULL;
    char* framebuf = NULL;
    uint8_t* images = NULL;
    bool complete = false;

    std::cout << "Listener starting...\n";
    qDebug() << "Listener starting...\n";

    int rc = 0;
    try
    {
        ListenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
        if (ListenSocket == INVALID_SOCKET) {
            throw std::runtime_error("socket failed with error: " + std::to_string(WSAGetLastError()));
        }

        service.sin_family = AF_INET;
        if (inet_pton(AF_INET, info.host_ip, &service.sin_addr) <= 0) {
            throw std::runtime_error("inet_pton error occurred on host address " + std::string(info.host_ip));
        }
        service.sin_port = htons(info.port);

        std::cout << "Binding socket...\n";
        int iResult = bind(ListenSocket, (SOCKADDR*)&service, sizeof(service));
        if (iResult == SOCKET_ERROR) {
            closesocket(ListenSocket);
            throw std::runtime_error("bind failed for host address " + std::string(info.host_ip));
        }

        std::cout << "Done binding\n";
        qDebug() << "Started Listening...";
        iResult = listen(ListenSocket, 1);
        if (iResult == SOCKET_ERROR) {
            closesocket(ListenSocket);
            throw std::runtime_error("listen failed with error: " + std::to_string(WSAGetLastError()));
        }

        XGET_ReturnCode xrc = det.Acquire();
        if (xrc != XGET_RC_OK) {
            closesocket(ListenSocket);
            PrintError(det);
            throw std::runtime_error("API Acquire error");
        }

        ClientSocket = accept(ListenSocket, NULL, NULL);
        if (ClientSocket == INVALID_SOCKET) {
            closesocket(ListenSocket);
            throw std::runtime_error("accept failed with error: " + std::to_string(WSAGetLastError()));
        }

        closesocket(ListenSocket);

        // Set ClientSocket to non-blocking
        u_long mode = 1;
        ioctlsocket(ClientSocket, FIONBIO, &mode);

        recvbuf = new char[info.sequence_header_size];
        int i = 0, j = 0;

        // Read sequence header
        while (j < info.sequence_header_size && !abortFlag.load())
        {
            fd_set rfds;
            FD_ZERO(&rfds);
            FD_SET(ClientSocket, &rfds);
            timeval tv = { 0, 100000 }; // 100ms

            int sel = select(0, &rfds, NULL, NULL, &tv);
            if (sel > 0 && FD_ISSET(ClientSocket, &rfds)) {
                i = recv(ClientSocket, recvbuf + j, info.sequence_header_size - j, 0);
                if (i <= 0) break;
                j += i;
            } else if (abortFlag.load()) {
                qDebug() << "Aborted during header reception.";
                throw std::runtime_error("Aborted during header reception.");
            }
        }

        SCAN_HEADER_V2* SequenceHeaderStruc = (SCAN_HEADER_V2*)recvbuf;
        int ImageByteSize = GetFrameByteSize(*SequenceHeaderStruc);
        int iImageByteSizeWithDescriptor = GetFrameByteSizeWithDescriptor(*SequenceHeaderStruc, info.frame_descriptor_size);

        int nrImages = api.number_of_continious_frames;
        int iWidth = SequenceHeaderStruc->size_x;
        int iHeight = SequenceHeaderStruc->size_y;
        int iEnergyMode = SequenceHeaderStruc->Energy_mode;

        PrintSequenceHeaderInfo(*SequenceHeaderStruc);
        framebuf = new char[iImageByteSizeWithDescriptor];
        images = new uint8_t[(size_t)ImageByteSize * nrImages];

        size_t recv_frames = 0;
        while (!abortFlag.load() && recv_frames < nrImages)
        {
            j = 0;
            memset(framebuf, 0, ImageByteSize);

            while (j < iImageByteSizeWithDescriptor && !abortFlag.load())
            {
                fd_set rfds;
                FD_ZERO(&rfds);
                FD_SET(ClientSocket, &rfds);
                timeval tv = { 0, 100000 };

                int sel = select(0, &rfds, NULL, NULL, &tv);
                if (sel > 0 && FD_ISSET(ClientSocket, &rfds)) {
                    i = recv(ClientSocket, framebuf + j, iImageByteSizeWithDescriptor - j, 0);
                    if (i <= 0) break;
                    j += i;
                } else if (abortFlag.load()) {
                    emit DetectorInfoLog("DET", "Acquisition Aborted (during frame recv)");
                    complete = true;
                    qDebug() << "++++++aborting detector++++" << complete;
                    break;
                }
            }

            if (j == iImageByteSizeWithDescriptor) {
                memcpy(images + (recv_frames * ImageByteSize), framebuf, ImageByteSize);
                if (recv_frames % 59 == 0) std::cout << "Frame num " << recv_frames << "\n";
                recv_frames++;

                double percent = (static_cast<double>(recv_frames) / nrImages) * 100;
                emit progress(percent);
                emit transferCurrentFrame(complete, images, recv_frames, iEnergyMode == 0 ? 1 : 2);
            }
            QCoreApplication::processEvents();
        }

        // Save images if not aborted or some frames were received
        bool interlaced = (!api.TDS_Enabled || api.TDS_Step_Size == 0) ? info.dual_frame_mode_interlaced : true;
        if (recv_frames > 0) {
            if (abortFlag.load()) {
                nrImages = 59 * (int(recv_frames / 59));
            }
            SaveImages(images, iWidth, iHeight, nrImages, SequenceHeaderStruc->pixel_size, iEnergyMode == 0 ? 1 : 2, interlaced, api);
            emit progress(100);
        }
    }
    catch (std::exception& ex)
    {
        std::cout << "Exception at Listener: " << ex.what() << "\n";
        rc = -2;
    }

    delete[] recvbuf;
    delete[] framebuf;
    delete[] images;

    if (ClientSocket != INVALID_SOCKET) {
        shutdown(ClientSocket, SD_BOTH);
        closesocket(ClientSocket);
        ClientSocket = INVALID_SOCKET;
    }

    return rc;
}

Also tried to use QT network framework like below but its still having same problem

int DETECTOR::Listener(XcAPI& det, const API_SETTINGS& api, const DETECTOR_INFO& info)
{
    abortFlag.store(false);
    QTcpServer server;
    QTcpSocket* socket = nullptr;
    char* recvbuf = nullptr;
    char* framebuf = nullptr;
    uint8_t* images = nullptr;
    bool complete = false;
    int rc = 0;

    qDebug() << "Listener starting...";

    try {
        if (!server.listen(QHostAddress(info.host_ip), info.port)) {
            throw std::runtime_error("Failed to bind: " + server.errorString().toStdString());
        }
        qDebug() << "Started Listening...";

        XGET_ReturnCode xrc = det.Acquire();
        if (xrc != XGET_RC_OK) {
            server.close();
            PrintError(det);
            throw std::runtime_error("API Acquire error");
        }

        // Wait for incoming connection (with abort check)
        while (!server.hasPendingConnections()) {
            if (abortFlag.load()) throw std::runtime_error("Aborted before connection");
            QCoreApplication::processEvents();
        }

        socket = server.nextPendingConnection();
        server.close();
        if (!socket) throw std::runtime_error("Failed to accept connection");
        socket->setReadBufferSize(10 * 1024 * 1024);

        recvbuf = new char[info.sequence_header_size];
        int totalRead = 0;
        while (totalRead < info.sequence_header_size && !abortFlag.load()) {
            if (!socket->waitForReadyRead(100)) continue;
            int bytesRead = socket->read(recvbuf + totalRead, info.sequence_header_size - totalRead);
            if (bytesRead <= 0) throw std::runtime_error("Failed reading sequence header");
            totalRead += bytesRead;
        }

        SCAN_HEADER_V2* SequenceHeaderStruc = (SCAN_HEADER_V2*)recvbuf;
        int ImageByteSize = GetFrameByteSize(*SequenceHeaderStruc);
        int iImageByteSizeWithDescriptor = GetFrameByteSizeWithDescriptor(*SequenceHeaderStruc, info.frame_descriptor_size);
        int nrImages = api.number_of_continious_frames;
        int iWidth = SequenceHeaderStruc->size_x;
        int iHeight = SequenceHeaderStruc->size_y;
        int iEnergyMode = SequenceHeaderStruc->Energy_mode;

        PrintSequenceHeaderInfo(*SequenceHeaderStruc);
        framebuf = new char[iImageByteSizeWithDescriptor];
        images = new uint8_t[(size_t)ImageByteSize * (size_t)nrImages];

        size_t recv_frames = 0;
        while (!abortFlag.load() && recv_frames < nrImages) {
            memset(framebuf, 0, ImageByteSize);
            totalRead = 0;
            while (totalRead < iImageByteSizeWithDescriptor && !abortFlag.load()) {
                if (!socket->waitForReadyRead(100)) continue;
                int bytesRead = socket->read(framebuf + totalRead, iImageByteSizeWithDescriptor - totalRead);
                if (bytesRead <= 0) throw std::runtime_error("Error receiving frame");
                totalRead += bytesRead;
            }

            memcpy(images + (recv_frames * ImageByteSize), framebuf, ImageByteSize);
            if (recv_frames % 59 == 0) qDebug() << "Frame num" << recv_frames;
            recv_frames++;

            double percent = (static_cast<double>(recv_frames) / nrImages) * 100;
            emit progress(percent);
            emit transferCurrentFrame(complete, images, recv_frames, iEnergyMode == 0 ? 1 : 2);
            QCoreApplication::processEvents();
        }

        bool interlaced = (!api.TDS_Enabled || api.TDS_Step_Size == 0) ? info.dual_frame_mode_interlaced : true;
        if (recv_frames > 0) {
            if (abortFlag.load()) {
                nrImages = 59 * static_cast<int>(recv_frames / 59);
            }
            SaveImages(images, iWidth, iHeight, nrImages, SequenceHeaderStruc->pixel_size,
                    iEnergyMode == 0 ? 1 : 2, interlaced, api);
            emit progress(100);
        }

    } catch (std::exception& ex) {
        qDebug() << "Exception at Listener:" << ex.what();
        rc = -2;
    }

    delete[] recvbuf;
    delete[] framebuf;
    delete[] images;

    if (socket) {
        socket->disconnectFromHost();
        socket->waitForDisconnected();
        delete socket;
    }

    return rc;
}

Question: How can I make sure recv() or select()/recv() combination returns immediately when Abort() is called during detector idle periods?

Are there edge cases where shutdown() or closesocket() doesn't unblock recv() or select()? Should I use a different mechanism (e.g. signaling with eventfd or using a dummy pipe/socketpair)?

7
  • You seem to be using Qt. Why aren't you using the Qt network functionality? It might be possible using the Qt framework rather than the lower-level send and recv functions, which doesn't really have that kind of functionality. Commented Jul 9 at 20:54
  • @Someprogrammerdude thank you for responding I tried the QT framework but its still has same problem. I have added the code above let me know your thoughts. Or if you can guide me to any documentation or proper way of doing it. Commented Jul 10 at 22:33
  • 1
    Your use of INVALID_SOCKET and closesocket() implies you are targeting Windows, yes? If so, then consider using WSAEventSelect()+WSAWaitForMultipleEvents() instead of select(). That way, you can wait on 2 WSAEVENTs at the same time - one for the socket and one for the abort - and act on whichever on satisfies the wait. Commented Jul 10 at 22:51
  • @RemyLebeau I tried using your suggestion but its still having same problem. If you want I can share the code. But for some reason they socket keeps on waiting for the data in a loop and not exiting when abort is pressed. Its keep on checking for data only when new data arrives it checks abort flag and exits. Commented Jul 11 at 20:23
  • "its still having same problem" - then you are doing something fundamentally wrong. "I can share the code" - the relevant code belongs in your question, in a minimal reproducible example. "they socket keeps on waiting for the data in a loop and not exiting when abort is pressed" - then you are not using the abort correctly. I see a number of places in your code where adequate error handling is lacking... Commented Jul 12 at 2:12

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.