Three Threads Stream
About
Many applications are performing an editing of their input. Especially in image processing. They all need to buffer their input, if they want to handle the processing. Especially, if you want to process the reading, modify and writing of data not in one thread. So, you need buffers between the threads.Here it describes the problem and gives a solution in form of an implementation.
Afterwards this document discuss the timing problematic with too slow or too fast threads.
Below, you can add some comments about your opinion. Don't hesitate to ask some questions.
Problem
As in "about" described, we want to process data for reading, modifying and writing. Of course, we can use one single data address to process the reading, modifying and writing in one single thread. But now we want to do this with three different threads. One for reading, one for modifying and one for writing.So you can say: lets use one data address for all three threads. To avoid a race condition (simultaneous writing and reading), we use a mutex, which we can lock.
The issue is, that when we use a mutex, it could be, that only the input thread is accessing the data, while the modify thread is every time again waiting on the lock. This happens, because when the modify thread is checking, if the mutex is free again, the input thread has lock the mutex again. In that way, the input thread is much more faster than the modifying thread. And in this way, we omit a lot of data to modify and write.
The modifying thread is called working thread further on. And the writing thread is called output thread in the implementation.
So it is not suitable to use a mutex here. Because, we have too fast access on the mutex. More detailed, the threads run in different speed, so it can be, that one thread is running twice as fast as the other. Or one thread is running at whole, while the other thread do nothing.
So at least we need two conditions: not_full and not_empty to indicate the different states of the cyclic buffer, if only one thread is running. This buffer and conditions are invented by batptiste-wicht[1].
The implementation
The buffer "BoundedBuffer"[1] is slightly modified. It is now a template based class.We need two buffers. One to transfer the input data to working thread, called inputToWorkingBuffer, and one to transfer modified data to output thread called workingToOutputBuffer. There are two operations on that buffers. One for fetching the last data and one for depositing new data.
There are streams handling the fetching and depositing of the buffers. See the operators << and >> therefore. The code is also improved by r-value references. The r-value references permit the data to be moved into the buffer and moved from the buffer. So the copy process is reduced as much as possible. In the end, there are only one copy from input stream to buffer and one copy to output stream.
The BoundedBuffer
The main
There are three threads in the main declared: inputThread, workingThread and outputThread. With get(), we wait of finishing the threads in reverse order of creation. std::ref(buffer) is needed on calling threads, because the buffer should be changed outside of the threads.
If all works correctly, we get the output of only even numbers from 0 to ttt::inputMax.
Timing Issues
There are 2^3 timing issues. This is because, every thread can be too slow or too fast. Those, timing issues are handled by some tricks. For example, if the outputThread is too fast, we can repeat the last frame.All tricks are:
- If working is too slow or input too fast, overwrite old data in inputToWorkingBuffer.
- If input is too slow or working too fast, repeat return of last data in inputToWorkingBuffe.
- If working is too slow or output too fast, repeat display of last data.
- If output is too slow or working too fast, overwrite old data in workingToOutputBuffer.
fetch() must be changed to:
And we have to change deposit() slightly. All correct code will be in the gitHub[2].
The front size_t needs to be changed to atomic, because it is used in fetch() and deposit().
Have a lot of fun, Sebastian
References
[1] http://baptiste-wicht.com/posts/2012/04/c11-concurrency-tutorial-advanced-locking-and-condition-variables.html
[2] https://github.com/SebastianBoehmer/example/tree/master/threeThreadsStream