I am trying to improve my code that involves a thread (threadA) handling some UDP communication and another thread (threadB) that works with the output of that communication.
My basic idea is to .push() new data into a std::queue within threadA.
threadB is looking for new data periodically - if it gets new data it processes it. Then .pop is called and the data is deleted.
Implementation:
threadAwill calladdNewElement()any time new UDP data arrives.threadBwill callgetNewestPtr()to access the data.when done with the data,
threadBwill calldoneWithNewest().
I am managing the access to the data (which comes in the form of std::array) through the use of std::shared_ptr. I know that the shared_ptr will manage the access to the pointer in a thread safe manner, but the underlying object is not going to be managed. Here lies my problem - in my implementation of the class, I am not sure where to put in memory locks, to ensure thread safety. My implementation already works, I have been testing it for a few days without problems, but I have the feeling I am doing something wrong, as I have not used any std::mutex. Anyway, here is my code:
commbufferfifo.h
#ifndef COMMBUFFERFIFO_H
#define COMMBUFFERFIFO_H
#include <queue>
#include <array>
#include <memory>
const size_t COMMLENGTH = 56;
/**
* @brief The commbufferfifo class
* @details used to safely share data between the comm thread and the visu thread
*/
class commbufferfifo
{
public:
commbufferfifo();
/**
* @brief getNewestPtr only accessed in visu thread
* @return returns nullptr if fifo is empty!
*/
std::shared_ptr<std::array<int16_t, COMMLENGTH>> getNewestPtr();///< @brief
void doneWithNewest();///< @brief call when newest object can be deleted
void addNewElement(const std::array<int16_t, COMMLENGTH> &commBuffer); ///< @brief only accessed in comm thread
private:
std::queue<std::shared_ptr<std::array<int16_t, COMMLENGTH>>> m_fifo;
};
#endif // COMMBUFFERFIFO_H
commbufferfifo.cpp
#include "commbufferfifo.h"
commbufferfifo::commbufferfifo()
{
}
std::shared_ptr<std::array<int16_t, COMMLENGTH> > commbufferfifo::getNewestPtr()
{
if (m_fifo.empty())
return nullptr;
else
return m_fifo.front();
}
void commbufferfifo::doneWithNewest()
{
if (m_fifo.empty())
return;
else
m_fifo.pop();
}
void commbufferfifo::addNewElement(const std::array<int16_t, COMMLENGTH> &commBuffer)
{
m_fifo.push(std::make_shared<std::array<int16_t, COMMLENGTH>>(commBuffer));
}
some pseudo usage: main.cpp
#include "includes/commbufferfifo.h"
#include <unistd.h>
void threadA(std::shared_ptr<commbufferfifo> myPointer){
std::array<int16_t, COMMLENGTH> arr;
for (int iteration = 0; iteration < 1000; iteration++){
arr.at(0) = iteration;
myPointer->addNewElement(arr);
usleep(100);
}
}
int main()
{
auto sharedFifo = std::make_shared<commbufferfifo>();
std::thread tA(&threadA, sharedFifo);
usleep(10000);
for(;;){
if (sharedFifo->getNewestPtr() == nullptr)
break;
std::cout << "sharedFifo->getNewestPtr()->at(0) = " << sharedFifo->getNewestPtr()->at(0) << "\n"; //use data
sharedFifo->doneWithNewest(); //data will be deleted
usleep(1000);
}
tA.join();
std::cout << "------------------------------------\n";
std::cout << "done" << std::endl;
return 0;
}
```