1

I am working on a side project with friends and am building a thread pool with Boost threads and asio. I already have a basic pool up and running but wanted to be able to delete threads from the pool. The class below keeps a vector of std::shared_ptr<boost::thread>.

The issue I am having is when I run a simple test, the program never exits. It appears to reach the end (see output) but never terminates. I have a feeling it has to do with the threads still being alive and have tried to kill them but to no avail. I did some searches on clearing a vector of shared pointers and it seems like you really shouldn't have to. As well, the thread_group call to join_all() should join and end all threads, at least i thought so.

I am still fully learning boost threads, asio, and shared_ptr's so any help or advice would be greatly appreciated.

Compile:`

g++ -std=c++14 main.cpp -lboost_system -lboost_thread -lpthread

Class:

  class experimentalPool {
      private:
        boost::asio::io_service ioService;
        boost::asio::io_service::work work;
        boost::thread_group threads;
        std::vector<std::shared_ptr<boost::thread>> workers;
        std::size_t poolsize;
      public:
        experimentalPool(): work( ioService ) {
            poolsize = boost::thread::hardware_concurrency();
            for (size_t i = 0; i < poolsize; i++) {
                std::shared_ptr<boost::thread> t1(new boost::thread(
                    boost::bind(&boost::asio::io_service::run, &ioService)));
                threads.add_thread(t1.get());
                workers.push_back(std::move(t1));
            }
        }

        experimentalPool( std::size_t psize )
          : work( ioService ),
            poolsize( psize )
        {
            for (size_t i = 0; i < poolsize; i++) {
                std::shared_ptr<boost::thread> t1(new boost::thread(
                    boost::bind(&boost::asio::io_service::run, &ioService)));
                threads.add_thread(t1.get());
                workers.push_back(std::move(t1));
            }
        }

        template <typename F, typename... Args>
        void add_work(F f, Args... args){
            ioService.post(boost::bind(f, args...));
        };

        ~experimentalPool(){
          ioService.stop();
          try {
            std::cout << "here" << "\n";
            threads.join_all();
            //threads.interrupt_all();
            std::cout << "here" << "\n";
            //for (size_t i = 0; i < workers.size(); i++) {
              //(*workers[i]).interrupt();
            //}
          }
          catch ( const std::exception& ) {std::cout << "Caught Exception" << "\n";}
        }

        std::size_t size() const { return poolsize;}
    };

Main:

void add1(int& num){
    std::cout << ++num << std::endl;
}

int main(){

    int temp = 5;

    experimentalPool xpool(2);
    std::cout << "Xpool " << pool2.size() << "\n";
    xpool.add_work(add1, temp);

    std::this_thread::sleep_for (std::chrono::seconds(1));
    std::cout << "End" << std::endl;
    return 0;
}

Output:

Xpool 1
6
here
xpool here
xpool here
^C
ap@ubuntu:~/Desktop$ 
3
  • Have you checked if the io_service has successfully stopped? Commented Jan 24, 2018 at 21:24
  • Run with asan/ubsan and you'll see it actually crashes and burns, if it appears to "hang" it's merely because of Undefined Behaviour Commented Jan 24, 2018 at 22:07
  • I haven't checked if the io_service has actually stopped. I gets past that call so i just assumed, but that's a good point. I'll go ahead and compile it with asan and see if that helps. thanks all! Commented Jan 25, 2018 at 2:47

2 Answers 2

1

To be honest I looked at the code for a good few minutes and got more and more concerned.

Why are threads in two collections (thread_group and the vector)? Why are threads owned in another collection? Why is the work object permanent? Why are you axing the service instead of letting the pool drain?

Why is poolsize redundantly stored?

Why is the constructor 99% duplicated?

Why is add1 taking a mutable reference, but the bind is by value?

Simplify

Live On Coliru

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <thread>

namespace Experimental { 
    namespace ba = boost::asio;

    class Pool {
      private:
        ba::io_service ioService;
        boost::thread_group threads;
        boost::optional<ba::io_service::work> work { ioService };

      public:
        Pool(std::size_t psize = boost::thread::hardware_concurrency()) {
            for (size_t i = 0; i < psize; i++) {
                threads.create_thread(boost::bind(&ba::io_service::run, &ioService));
            }
        }

        template <typename F, typename... Args> void add_work(F f, Args... args) {
            ioService.post(boost::bind(f, args...));
        }

        ~Pool() {
            work.reset();
            threads.join_all();
        }

        std::size_t size() const { return threads.size(); }
    };
}

void add1(int &num) { std::cout << ++num << std::endl; }

int main() {
    int temp = 5;

    Experimental::Pool xpool(2);
    std::cout << "Xpool " << xpool.size() << "\n";
    xpool.add_work(add1, std::ref(temp));

    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "End" << std::endl;
}

Prints

Xpool 2
6
End
temp: 6

And it is only ~50% of the code.

Sign up to request clarification or add additional context in comments.

7 Comments

Loose other notes: handle exceptions emanating from io_service::run (see also).
Thanks for the great answer! So the reason I have threads in the thread_group and in the vector is because in order for thread_group to be able to remove threads it needs the thread object you want to remove. So i figured if I save the threads in a vector then I can build the capability to remove the thread by popping it from the vector and passing it to the thread_group. But I will have to try your solution tomorrow! :)
Also thanks for all your advice and points you made. I definitely, see what you are talking about.
If you insist on tracking the thread objects (why?) here's how you could write it: coliru.stacked-crooked.com/a/9318ce6b4cd3545d
Wow awesome! Thanks a bunch for help! So i guess the only reason I wanted to track the individual thread objects was so then i could pass it to boost::thread_group::remove(). Would you recommend against that feature? Or would it be better to use a different object instead of thread_group
|
0

You fatal problem is here:

std::shared_ptr<boost::thread> t1(new boost::thread(
    boost::bind(&boost::asio::io_service::run, &ioService)));
threads.add_thread(t1.get());

From the documentation of boost::thread_group(http://www.boost.org/doc/libs/1_42_0/doc/html/thread/thread_management.html#thread.thread_management.threadgroup.destructor), the effect of its destructor is

Destroy *this and delete all boost::thread objects in the group.

Thus boost::thread_group will exclusively participate in the resource management of its threads, you cannot store those threads in a shared_ptr.

To solve it, you need to read the linked documentation for boost::thread_group carefully, use create_thread() and store it as a raw pointer instead.

1 Comment

Thanks for the answer! I'll have to take a look at the documentation and get back!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.