It has to be serially because otherwise, the memory would be used up. A C++ for-objects equivalent of 'java.io.PipedWriter' + 'java.io.PipedReader'.
Topics
About: C++
The table of contents of this article
- Starting Context
- Target Context
- Orientation
- Main Body
- 1: The Motivation and the Plan Are the Same for an Already Introduced Java Objects Pipe
- 2: The Code and Its Explanation
- 3: An Example Usage and an Execution Result
Starting Context
- The reader has a basic knowledge on C++.
Target Context
- The reader will know an objects pipe.
Orientation
A C++ string pipe will be introduced in the next article.
A Java objects pipe has been introduced in an article of another series.
A C# objects pipe will be introduced in an article of another series.
A Python objects pipe has been introduced in an article of another series.
Main Body
1: The Motivation and the Plan Are the Same for an Already Introduced Java Objects Pipe
Hypothesizer 7
In fact, an article of another series has already introduced a Java objects pipe.
I refrain from repeating the motivation and the plan that are described there.
2: The Code and Its Explanation
Hypothesizer 7
This is the code of my objects pipe class template and its related classes.
'theBiasPlanet/coreUtilities/pipes/ObjectsPipe.hpp'
@C++ Source Code
#ifndef __theBiasPlanet_coreUtilities_pipes_ObjectsPipe_hpp__
#define __theBiasPlanet_coreUtilities_pipes_ObjectsPipe_hpp__
#include <condition_variable>
#include <list>
#include <mutex>
#include <optional>
#include "theBiasPlanet/coreUtilities/constantsGroups/GeneralConstantsConstantsGroup.hpp"
using namespace ::std;
using namespace ::theBiasPlanet::coreUtilities::constantsGroups;
namespace theBiasPlanet {
namespace coreUtilities {
namespace pipes {
template <typename T> class ObjectsPipe {
protected:
recursive_mutex i_mutex;
condition_variable_any i_threadCondition;
T * i_objects;
int i_bufferSize = 0;
// No data: i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber
int i_dataStartIndex = GeneralConstantsConstantsGroup::c_iterationStartNumber;
int i_dataUntilIndex = GeneralConstantsConstantsGroup::c_iterationStartNumber;
bool i_isFinishedWriting = false;
bool i_isFinishedReading = false;
bool i_notificationIsDelayed = false;
virtual bool isEmptyWithoutLocking ();
virtual bool isFullWithoutLocking ();
// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
virtual void writeWithoutLocking (T const & a_object, unique_lock <recursive_mutex> * a_lock, long const & a_timeOutPeriodInMilliseconds = -1);
// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
virtual T readWithoutLocking (unique_lock <recursive_mutex> * a_lock, long const & a_timeOutPeriodInMilliseconds = -1);
public:
ObjectsPipe (int const & a_bufferSize, bool const & a_notificationIsDelayed);
virtual ~ObjectsPipe ();
virtual bool isEmpty ();
virtual bool isFull ();
// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
virtual void write (T const & a_object, long const & a_timeOutPeriodInMilliseconds = -1);
// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
virtual int write (T const * const a_objects, int const & a_offset, int const & a_length, long const & a_timeOutPeriodInMilliseconds = 0);
// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
virtual T read (long const & a_timeOutPeriodInMilliseconds = -1);
// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
virtual int read (T * const a_objects, int const & a_offset, int const & a_length, long const & a_timeOutPeriodInMilliseconds = -1);
virtual list <T> readWholeData ();
virtual void finishWriting ();
virtual void finishReading ();
virtual void reset ();
};
}
}
}
#endif
'theBiasPlanet/coreUtilities/pipes/ObjectsPipe.tpp'
@C++ Source Code
#include "theBiasPlanet/coreUtilities/pipes/ObjectsPipe.hpp"
#include <chrono>
#include <exception>
#include "theBiasPlanet/coreUtilities/inputsHandling/NoMoreDataException.hpp"
#include "theBiasPlanet/coreUtilities/inputsHandling/NoMoreNeedsException.hpp"
#include "theBiasPlanet/coreUtilities/messagingHandling/Publisher.hpp"
#include "theBiasPlanet/coreUtilities/timersHandling/TimeOutException.hpp"
using namespace ::std::chrono;
using namespace ::theBiasPlanet::coreUtilities::inputsHandling;
using namespace ::theBiasPlanet::coreUtilities::messagingHandling;
using namespace ::theBiasPlanet::coreUtilities::timersHandling;
namespace theBiasPlanet {
namespace coreUtilities {
namespace pipes {
template <typename T> ObjectsPipe <T>::ObjectsPipe (int const & a_bufferSize, bool const & a_notificationIsDelayed): i_objects (new T [a_bufferSize]), i_bufferSize (a_bufferSize), i_notificationIsDelayed (a_notificationIsDelayed) {
}
template <typename T> ObjectsPipe <T>::~ObjectsPipe () {
delete i_objects;
}
template <typename T> bool ObjectsPipe <T>::isEmptyWithoutLocking () {
return i_dataStartIndex == GeneralConstantsConstantsGroup::c_iterationStartNumber && i_dataUntilIndex == GeneralConstantsConstantsGroup::c_iterationStartNumber;
}
template <typename T> bool ObjectsPipe <T>::isFullWithoutLocking () {
return (i_dataStartIndex == GeneralConstantsConstantsGroup::c_iterationStartNumber && i_dataUntilIndex == i_bufferSize) || (i_dataStartIndex != GeneralConstantsConstantsGroup::c_iterationStartNumber && i_dataStartIndex == i_dataUntilIndex);
}
template <typename T> void ObjectsPipe <T>::writeWithoutLocking (T const & a_object, unique_lock <recursive_mutex> * a_lock, long const & a_timeOutPeriodInMilliseconds) {
if (i_isFinishedReading) {
throw NoMoreNeedsException ("");
}
if (i_isFinishedWriting) {
i_isFinishedWriting = false;
}
while (true) {
if (isFullWithoutLocking ()) {
try {
if (a_timeOutPeriodInMilliseconds == -1) {
i_threadCondition.wait (*a_lock);
}
else if (a_timeOutPeriodInMilliseconds == 0) {
}
else {
i_threadCondition.wait_for (*a_lock, milliseconds (a_timeOutPeriodInMilliseconds));
}
}
catch (exception & l_exception) {
Publisher::logErrorInformation (l_exception);
}
}
// Checked again because the status may have changed while this thread was waiting.
if (i_isFinishedReading) {
throw NoMoreNeedsException ("");
}
if (!isFullWithoutLocking ()) {
bool l_wasEmpty = isEmptyWithoutLocking ();
if (i_dataUntilIndex == i_bufferSize) {
i_objects [GeneralConstantsConstantsGroup::c_iterationStartNumber] = a_object;
i_dataUntilIndex = GeneralConstantsConstantsGroup::c_iterationStartNumber + 1;
}
else {
i_objects [i_dataUntilIndex] = a_object;
i_dataUntilIndex ++;
}
if ( (!i_notificationIsDelayed && l_wasEmpty) || (i_notificationIsDelayed && isFullWithoutLocking ())) {
i_threadCondition.notify_all ();
}
return;
}
else {
if (a_timeOutPeriodInMilliseconds != -1) {
throw TimeOutException ("");
}
}
}
}
template <typename T> T ObjectsPipe <T>::readWithoutLocking (unique_lock <recursive_mutex> * a_lock, long const & a_timeOutPeriodInMilliseconds) {
T l_readObject;
if (i_isFinishedReading) {
i_isFinishedReading = false;
}
while (true) {
if (isEmptyWithoutLocking ()) {
if (!i_isFinishedWriting) {
try {
if (a_timeOutPeriodInMilliseconds == -1) {
i_threadCondition.wait (*a_lock);
}
else if (a_timeOutPeriodInMilliseconds == 0) {
}
else {
i_threadCondition.wait_for (*a_lock, milliseconds (a_timeOutPeriodInMilliseconds));
}
}
catch (exception & l_exception) {
Publisher::logErrorInformation (l_exception);
}
}
else {
throw NoMoreDataException ("");
}
}
// Checked again because the status may have changed while this thread was waiting.
if (!isEmptyWithoutLocking ()) {
bool l_wasFull = isFullWithoutLocking ();
l_readObject = i_objects [i_dataStartIndex];
// 'i_objects [i_dataStartIndex]' is left although is invalidated.
i_dataStartIndex ++;
if (i_dataStartIndex == i_dataUntilIndex) {
i_dataStartIndex = GeneralConstantsConstantsGroup::c_iterationStartNumber;
i_dataUntilIndex = GeneralConstantsConstantsGroup::c_iterationStartNumber;
}
else {
if (i_dataStartIndex == i_bufferSize) {
i_dataStartIndex = GeneralConstantsConstantsGroup::c_iterationStartNumber;
}
}
if ( (!i_notificationIsDelayed && l_wasFull) || (i_notificationIsDelayed && isEmptyWithoutLocking ())) {
i_threadCondition.notify_all ();
}
return l_readObject;
}
else {
if (i_isFinishedWriting) {
throw NoMoreDataException ("");
}
if (a_timeOutPeriodInMilliseconds != -1) {
throw TimeOutException ("");
}
}
}
}
template <typename T> bool ObjectsPipe <T>::isEmpty () {
unique_lock <recursive_mutex> l_lock (i_mutex);
return isEmptyWithoutLocking ();
}
template <typename T> bool ObjectsPipe <T>::isFull () {
unique_lock <recursive_mutex> l_lock (i_mutex);
return isFullWithoutLocking ();
}
template <typename T> void ObjectsPipe <T>::write (T const & a_object, long const & a_timeOutPeriodInMilliseconds) {
unique_lock <recursive_mutex> l_lock (i_mutex);
writeWithoutLocking (a_object, &l_lock, a_timeOutPeriodInMilliseconds);
}
template <typename T> int ObjectsPipe <T>::write (T const * const a_objects, int const & a_offset, int const & a_length, long const & a_timeOutPeriodInMilliseconds) {
unique_lock <recursive_mutex> l_lock (i_mutex);
int l_writtenLength = 0;
for (l_writtenLength = 0; l_writtenLength < a_length; l_writtenLength ++) {
try {
if (l_writtenLength == 0 || ! (isFullWithoutLocking ())) {
writeWithoutLocking (a_objects [a_offset + l_writtenLength], &l_lock, a_timeOutPeriodInMilliseconds);
}
}
catch (NoMoreNeedsException & l_exception) {
if (l_writtenLength == 0) {
throw l_exception;
}
else {
break;
}
}
}
return l_writtenLength;
}
template <typename T> T ObjectsPipe <T>::read (long const & a_timeOutPeriodInMilliseconds) {
unique_lock <recursive_mutex> l_lock (i_mutex);
return readWithoutLocking (&l_lock, a_timeOutPeriodInMilliseconds);
}
template <typename T> int ObjectsPipe <T>::read (T * const a_objects, int const & a_offset, int const & a_length, long const & a_timeOutPeriodInMilliseconds) {
unique_lock <recursive_mutex> l_lock (i_mutex);
int l_readLength = 0;
for (; l_readLength < a_length; l_readLength ++) {
if ( (l_readLength == 0) || ! (isEmptyWithoutLocking ())) {
a_objects [a_offset + l_readLength] = readWithoutLocking (&l_lock, a_timeOutPeriodInMilliseconds);
}
else {
break;
}
}
return l_readLength;
}
template <typename T> list <T> ObjectsPipe <T>::readWholeData () {
unique_lock <recursive_mutex> l_lock (i_mutex);
list <T> l_objectsList;
while (true) {
try {
l_objectsList.push_back (readWithoutLocking (&l_lock));
}
catch (NoMoreDataException & l_exception) {
break;
}
}
return l_objectsList;
}
template <typename T> void ObjectsPipe <T>::finishWriting () {
unique_lock <recursive_mutex> l_lock (i_mutex);
i_isFinishedWriting = true;
i_threadCondition.notify_all ();
}
template <typename T> void ObjectsPipe <T>::finishReading () {
unique_lock <recursive_mutex> l_lock (i_mutex);
i_isFinishedReading = true;
i_threadCondition.notify_all ();
}
template <typename T> void ObjectsPipe <T>::reset () {
unique_lock <recursive_mutex> l_lock (i_mutex);
i_isFinishedWriting = false;
i_isFinishedReading = false;
i_dataStartIndex = GeneralConstantsConstantsGroup::c_iterationStartNumber;
i_dataUntilIndex = GeneralConstantsConstantsGroup::c_iterationStartNumber;
}
}
}
}
'theBiasPlanet/coreUtilities/constantsGroups/GeneralConstantsConstantsGroup.hpp'
@C++ Source Code
#ifndef __theBiasPlanet_coreUtilities_constantsGroups_GeneralConstantsConstantsGroup_hpp__
#define __theBiasPlanet_coreUtilities_constantsGroups_GeneralConstantsConstantsGroup_hpp__
#include <string>
~
#include "theBiasPlanet/coreUtilities/visualCplusplusSpecificHeaders/VisualCplusplusSpecificDefinitions.hpp"
using namespace ::std;
~
namespace theBiasPlanet {
namespace coreUtilities {
namespace constantsGroups {
class __theBiasPlanet_coreUtilities_symbolExportingOrImportingForVisualCplusplus__ GeneralConstantsConstantsGroup {
public:
~
static int const c_iterationStartNumber;
~
};
}
}
}
#endif
'theBiasPlanet/coreUtilities/staticVariablesInitializer/StaticVariablesInitializer.cpp'
@C++ Source Code
~
#include "theBiasPlanet/coreUtilities/constantsGroups/GeneralConstantsConstantsGroup.hpp"
~
~
namespace theBiasPlanet {
namespace coreUtilities {
namespace constantsGroups {
~
int const GeneralConstantsConstantsGroup::c_iterationStartNumber (0);
~
}
~
}
}
'theBiasPlanet/coreUtilities/inputsHandling/NoMoreDataException.hpp'
@C++ Source Code
#ifndef __theBiasPlanet_coreUtilities_inputsHandling_NoMoreDataException_hpp__
#define __theBiasPlanet_coreUtilities_inputsHandling_NoMoreDataException_hpp__
#include <exception>
#include <string>
#include "theBiasPlanet/coreUtilities/visualCplusplusSpecificHeaders/VisualCplusplusSpecificDefinitions.hpp"
using namespace ::std;
namespace theBiasPlanet {
namespace coreUtilities {
namespace inputsHandling {
class __theBiasPlanet_coreUtilities_symbolExportingOrImportingForVisualCplusplus__ NoMoreDataException : public exception {
private:
string i_message;
public:
NoMoreDataException (string a_message);
virtual ~NoMoreDataException ();
virtual char const * what () const throw () override;
};
}
}
}
#endif
'theBiasPlanet/coreUtilities/inputsHandling/NoMoreDataException.cpp'
@C++ Source Code
#include "theBiasPlanet/coreUtilities/inputsHandling/NoMoreDataException.hpp"
namespace theBiasPlanet {
namespace coreUtilities {
namespace inputsHandling {
NoMoreDataException::NoMoreDataException (string a_message) : exception (), i_message (a_message) {
}
NoMoreDataException::~NoMoreDataException () {
}
char const * NoMoreDataException::what () const throw () {
return i_message.c_str ();
}
}
}
}
'theBiasPlanet/coreUtilities/inputsHandling/NoMoreNeedsException.hpp'
@C++ Source Code
#ifndef __theBiasPlanet_coreUtilities_inputsHandling_NoMoreNeedsException_hpp__
#define __theBiasPlanet_coreUtilities_inputsHandling_NoMoreNeedsException_hpp__
#include <exception>
#include <string>
#include "theBiasPlanet/coreUtilities/visualCplusplusSpecificHeaders/VisualCplusplusSpecificDefinitions.hpp"
using namespace ::std;
namespace theBiasPlanet {
namespace coreUtilities {
namespace inputsHandling {
class __theBiasPlanet_coreUtilities_symbolExportingOrImportingForVisualCplusplus__ NoMoreNeedsException : public exception {
private:
string i_message;
public:
NoMoreNeedsException (string a_message);
virtual ~NoMoreNeedsException ();
virtual char const * what () const throw () override;
};
}
}
}
#endif
'theBiasPlanet/coreUtilities/inputsHandling/NoMoreNeedsException.cpp'
@C++ Source Code
#include "theBiasPlanet/coreUtilities/inputsHandling/NoMoreNeedsException.hpp"
namespace theBiasPlanet {
namespace coreUtilities {
namespace inputsHandling {
NoMoreNeedsException::NoMoreNeedsException (string a_message) : exception (), i_message (a_message) {
}
NoMoreNeedsException::~NoMoreNeedsException () {
}
char const * NoMoreNeedsException::what () const throw () {
return i_message.c_str ();
}
}
}
}
'theBiasPlanet/coreUtilities/messagingHandling/Publisher.hpp' and 'theBiasPlanet/coreUtilities/messagingHandling/Publisher.cpp' are totally omitted because the used method just writes logs.
'theBiasPlanet/coreUtilities/timersHandling/TimeOutException.hpp'
@C++ Source Code
#ifndef __theBiasPlanet_coreUtilities_inputsHandling_TimeOutException_hpp__
#define __theBiasPlanet_coreUtilities_inputsHandling_TimeOutException_hpp__
#include <exception>
#include <string>
#include "theBiasPlanet/coreUtilities/visualCplusplusSpecificHeaders/VisualCplusplusSpecificDefinitions.hpp"
using namespace ::std;
namespace theBiasPlanet {
namespace coreUtilities {
namespace timersHandling {
class __theBiasPlanet_coreUtilities_symbolExportingOrImportingForVisualCplusplus__ TimeOutException : public exception {
private:
string i_message;
public:
TimeOutException (string a_message);
virtual ~TimeOutException ();
virtual char const * what () const throw () override;
};
}
}
}
#endif
'theBiasPlanet/coreUtilities/timersHandling/TimeOutException.cpp'
@C++ Source Code
#include "theBiasPlanet/coreUtilities/timersHandling/TimeOutException.hpp"
namespace theBiasPlanet {
namespace coreUtilities {
namespace timersHandling {
TimeOutException::TimeOutException (string a_message) : exception (), i_message (a_message) {
}
TimeOutException::~TimeOutException () {
}
char const * TimeOutException::what () const throw () {
return i_message.c_str ();
}
}
}
}
The explanation of the code is almost the same for the Java version, but I will do the explanation including the duplication.
The constructor takes the buffer size and the algorithm choice for awaking the readers.
'i_objects' is the buffer.
'i_dataStartIndex' and 'i_dataUntilIndex' are the start index and the until (without including) index of the valued area; when the buffer is empty, they are '0' and '0', respectively.
All the public method templates except the constructor are synchronized on the 'i_mutex' mutex.
The 'readWithoutLocking (unique_lock <recursive_mutex> * a_lock, long const & a_timeOutPeriodInMilliseconds)' method template has the loop on the basis that there may be some multiple readers: a reader that had been waiting on the empty buffer may have been awaken to find out that the written objects have been already snatched by the other readers, being required to wait once more.
Likewise, the 'writeWithoutLocking (T const & a_object, unique_lock <recursive_mutex> * a_lock, long const & a_timeOutPeriodInMilliseconds)' method template has the loop on the basis that there may be some multiple writers.
The behavior of the 'writeWithoutLocking (T const & a_object, unique_lock <recursive_mutex> * a_lock, long const & a_timeOutPeriodInMilliseconds)' method template when the pipe has been declared finished reading is somehow different from the behavior of the 'readWithoutLocking (unique_lock <recursive_mutex> * a_lock, long const & a_timeOutPeriodInMilliseconds = -1)' method template when the pipe has been declared finished writing, because the writer will not need to write any more at all, while the reader will want to read up the objects already stored in the buffer.
Be aware that I have not intensively tested the code, yet, although I have used it in some simple cases, one of which is shown in the next section.
3: An Example Usage and an Execution Result
Hypothesizer 7
This is a sample program that uses the objects pipe.
@C++ Source Code
#ifndef __theBiasPlanet_coreUtilitiesTests_pipesTest1_Test1Test_hpp__
#define __theBiasPlanet_coreUtilitiesTests_pipesTest1_Test1Test_hpp__
#include <string>
#include "theBiasPlanet/coreUtilities/pipes/ObjectsPipe.hpp"
~
using namespace ::std;
using namespace ::theBiasPlanet::coreUtilities::pipes;
namespace theBiasPlanet {
namespace coreUtilitiesTests {
namespace pipesTest1 {
class Test1Test {
public:
static int main (int const & a_numberOfArguments, char const * const a_argumentsArray []);
static void test2 ();
~
static void prepareIntegers (ObjectsPipe <int> * a_writer);
static void processIntegers (ObjectsPipe <int> * a_reader);
~
};
}
}
}
#endif
#include "theBiasPlanet/coreUtilitiesTests/pipesTest1/Test1Test.hpp"
#include <iostream>
#include <thread>
#include "theBiasPlanet/coreUtilities/inputsHandling/NoMoreDataException.hpp"
#include "theBiasPlanet/coreUtilities/inputsHandling/NoMoreNeedsException.hpp"
#include "theBiasPlanet/coreUtilities/performanceMeasuringHandling/PerformanceMeasurer.hpp"
using namespace ::theBiasPlanet::coreUtilities::inputsHandling;
using namespace ::theBiasPlanet::coreUtilities::performanceMeasuringHandling;
namespace theBiasPlanet {
namespace coreUtilitiesTests {
namespace pipesTest1 {
int Test1Test::main (int const & a_numberOfArguments, char const * const a_argumentsArray []) {
test2 ();
return 0;
}
void Test1Test::prepareIntegers (ObjectsPipe <int> * a_writer) {
for (int l_iterationIndex = 0; l_iterationIndex < 512; l_iterationIndex ++) {
try {
a_writer->write (l_iterationIndex);
}
catch (NoMoreNeedsException const & l_exception) {
break;
}
cout << "### written: " << l_iterationIndex << endl << flush;
}
}
void Test1Test::processIntegers (ObjectsPipe <int> * a_reader) {
int l_integer = -1;
int l_numberOfMultipleOf10s = 0;
while (true) {
try {
l_integer = a_reader->read ();
}
catch (NoMoreDataException const & l_exception) {
break;
}
cout << "### read: " << l_integer << endl << flush;
if (l_integer % 10 == 0) {
l_numberOfMultipleOf10s ++;
cout << "### a multiple of 10s is found." << endl << flush;
}
}
cout << "### the number of multiple of 10s is " << l_numberOfMultipleOf10s << "." << endl << flush;
}
void Test1Test::test2 () {
ObjectsPipe <int> l_integersPipe (16, true);
PerformanceMeasurer::setStartTime ();
thread l_subThread = thread ( [&l_integersPipe] () -> void {
try {
prepareIntegers (&l_integersPipe);
}
catch (exception const & l_exception) {
cout << l_exception.what () << endl << flush;
}
try {
l_integersPipe.finishWriting ();
}
catch (exception const & l_exception) {
cout << l_exception.what () << endl << flush;
}
});
processIntegers (&l_integersPipe);
l_subThread.join ();
cout << "### The elapsed time is " << PerformanceMeasurer::getElapseTimeInNanoSeconds () << " ns." << endl << flush;
}
~
}
}
}
This is an output, in my single-core single-CPU Linux computer (which should be influencing the output).
@Output
### written: 0
### written: 1
### written: 2
### written: 3
### written: 4
### written: 5
### written: 6
### written: 7
### written: 8
### written: 9
### written: 10
### written: 11
### written: 12
### written: 13
### written: 14
### read: 0
### a multiple of 10s is found.
### read: 1
### read: 2
### read: 3
### read: 4
### read: 5
### read: 6
### read: 7
### read: 8
### read: 9
### read: 10
### a multiple of 10s is found.
### read: 11
### read: 12
### read: 13
### read: 14
### read: 15
### written: 15
~
### written: 496
### written: 497
### written: 498
### written: 499
### written: 500
### written: 501
### written: 502
### written: 503
### written: 504
### written: 505
### written: 506
### written: 507
### written: 508
### written: 509
### written: 510
### written: 511
### read: 496
### read: 497
### read: 498
### read: 499
### read: 500
### a multiple of 10s is found.
### read: 501
### read: 502
### read: 503
### read: 504
### read: 505
### read: 506
### read: 507
### read: 508
### read: 509
### read: 510
### a multiple of 10s is found.
### read: 511
### the number of multiple of 10s is 52.
. . . Well, what happened is that the reader acted first and was made waiting because the pipe was empty; the writer wrote to the full and awoke the reader; the reader began to read; and so on, I guess. . . . Note that '15' should have been written before the reader was awoken although "### written: 15" was delayed shown: immediately after the 'write' method was finished, the control switched to the reading thread, with the messaging delayed until the control was returned to the writing thread.
When the notification timing mode is set 'false', this is an output.
@Output
### read: 0
### a multiple of 10s is found.
### written: 0
### read: 1
### written: 1
### read: 2
### written: 2
### read: 3
### written: 3
### read: 4
### written: 4
### read: 5
### written: 5
### read: 6
### written: 6
### read: 7
### written: 7
### read: 8
### written: 8
### read: 9
### written: 9
### read: 10
### a multiple of 10s is found.
### written: 10
### read: 11
### written: 11
### read: 12
### written: 12
### read: 13
### written: 13
### read: 14
### written: 14
### read: 15
### written: 15
~
### written: 496
### written: 497
### written: 498
### written: 499
### written: 500
### written: 501
### written: 502
### written: 503
### written: 504
### written: 505
### read: 490
### a multiple of 10s is found.
### read: 491
### read: 492
### read: 493
### read: 494
### read: 495
### read: 496
### read: 497
### read: 498
### read: 499
### read: 500
### a multiple of 10s is found.
### read: 501
### read: 502
### read: 503
### read: 504
### read: 505
### written: 506
### written: 507
### written: 508
### written: 509
### written: 510
### written: 511
### read: 506
### read: 507
### read: 508
### read: 509
### read: 510
### a multiple of 10s is found.
### read: 511
### the number of multiple of 10s is 52.
. . . The reader acted first and was made waiting because the pipe was empty; the writer wrote '0' and awoke the reader; the reader began to read; and so on, I guess. . . . Again, "### written: 0" was delayed shown.
Anyway, only small number of slots in the buffer seem to tend to be used.
As I have measured the times taken (with the costly messagings eliminated) in the 2 modes (5 times each mode), the times were {'16,074,118', '15,751,967', '14,949,643', '2,771,212', and '11,706,119'} and {'15,099,104', '15,159,545', '3,928,233', '32,723,356', and '13,755,261'}, respectively, in nanoseconds. From '2,771,212' to '16,074,118' and from '3,928,233' to '32,723,356'? The results are too unstable to allow any serious analysis. . . .