2020-11-01

2: A C# Objects Pipe, Which Conveys Objects Serially

<The previous article in this series | The table of contents of this series | The next article in this series>

Serially because otherwise, the memory would be used up. A for-objects C# equivalent to 'java.io.PipedWriter' + 'java.io.PipedReader'.

Topics


About: C#

The table of contents of this article


Starting Context


  • The reader has a basic knowledge on C#.

Target Context


  • The reader will know an objects pipe.

Orientation


A C# string pipe will be introduced in the next article.

A Java objects pipe has been introduced in an article of another series.

A C++ objects pipe has been introduced in an article of another series.

A Python objects pipe has been introduced in an article of another series.


Main Body


1: The Motivation and the Plan Are the Same for an Already Introduced Java Objects Pipe


Hypothesizer 7
In fact, an article of another series has already introduced a Java objects pipe.

I refrain from repeating the motivation and the plan that are described there.


2: The Code and Its Explanation


Hypothesizer 7
This is the code of my objects pipe class and its related classes.

'theBiasPlanet/coreUtilities/pipes/ObjectsPipe.cs'

@C# Source Code
namespace theBiasPlanet {
	namespace coreUtilities {
		namespace pipes {
			using System;
			using System.Collections.Generic;
			using System.Runtime.CompilerServices;
			using System.Threading;
			using theBiasPlanet.coreUtilities.constantsGroups;
			using theBiasPlanet.coreUtilities.inputsHandling;
			using theBiasPlanet.coreUtilities.messagingHandling;
			using theBiasPlanet.coreUtilities.timersHandling;
			
			public class ObjectsPipe <T> {
				protected Object [] i_objects;
				protected Int32 i_bufferSize = 0;
				// No data: i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber
				protected Int32 i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
				protected Int32 i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
				protected Boolean i_isFinishedWriting = false;
				protected Boolean i_isFinishedReading = false;
				protected Boolean i_notificationIsDelayed = false;
				
				public ObjectsPipe (Int32 a_bufferSize, Boolean a_notificationIsDelayed) {
					i_bufferSize = a_bufferSize;
					i_objects = new Object [i_bufferSize];
					i_notificationIsDelayed = a_notificationIsDelayed;
				}
				
				~ObjectsPipe () {
				}
				
				protected Boolean isEmptyWithoutLocking () {
					return i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber;
				}
				
				protected Boolean isFullWithoutLocking () {
					return (i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == i_bufferSize) || (i_dataStartIndex != GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataStartIndex == i_dataUntilIndex);
				}
				
				// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
				protected void writeWithoutLocking (T a_object, Int32 a_timeOutPeriodInMilliseconds = -1) {
					if (i_isFinishedReading) {
						throw new NoMoreNeedsException ("");
					}
					if (i_isFinishedWriting) {
						i_isFinishedWriting = false;
					}
					while (true) {
						if (isFullWithoutLocking ()) {
							try {
								if (a_timeOutPeriodInMilliseconds == -1) {
									Monitor.Wait (this);
								}
								else if (a_timeOutPeriodInMilliseconds == 0) {
								}
								else {
									Monitor.Wait (this, a_timeOutPeriodInMilliseconds);
								}
							}
							catch (Exception l_exception) {
								Publisher.logErrorInformation (l_exception);
							}
						}
						// Checked again because the status may have changed while this thread was waiting.
						if (i_isFinishedReading) {
							throw new NoMoreNeedsException ("");
						}
						if (!isFullWithoutLocking ()) {
							Boolean l_wasEmpty = isEmptyWithoutLocking ();
							if (i_dataUntilIndex == i_bufferSize) {
								i_objects [GeneralConstantsConstantsGroup.c_iterationStartNumber] = a_object;
								i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber + 1;
							}
							else {
								i_objects [i_dataUntilIndex] = a_object;
								i_dataUntilIndex ++;
							}
							if ( (!i_notificationIsDelayed && l_wasEmpty) || (i_notificationIsDelayed && isFullWithoutLocking ())) {
								Monitor.PulseAll (this);
							}
							return;
						}
						else {
							if (a_timeOutPeriodInMilliseconds != -1) {
								throw new TimeOutException ("");
							}
						}
					}
				}
				
				// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
				protected T readWithoutLocking (Int32 a_timeOutPeriodInMilliseconds = -1) {
					T l_readObject;
					if (i_isFinishedReading) {
						i_isFinishedReading = false;
					}
					while (true) {
						if (isEmptyWithoutLocking ()) {
							if (!i_isFinishedWriting) {
								try {
									if (a_timeOutPeriodInMilliseconds == -1) {
										Monitor.Wait (this);
									}
									else if (a_timeOutPeriodInMilliseconds == 0) {
									}
									else {
										Monitor.Wait (this, a_timeOutPeriodInMilliseconds);
									}
								}
								catch (Exception l_exception) {
									Publisher.logErrorInformation (l_exception);
								}
							}
							else {
								throw new NoMoreDataException ("");
							}
						}
						// Checked again because the status may have changed while this thread was waiting.
						if (!isEmptyWithoutLocking ()) {
							Boolean l_wasFull = isFullWithoutLocking ();
							l_readObject = (T) i_objects [i_dataStartIndex];
							i_objects [i_dataStartIndex] = null;
							i_dataStartIndex ++;
							if (i_dataStartIndex == i_dataUntilIndex) {
								i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
								i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
							}
							else {
								if (i_dataStartIndex == i_bufferSize) {
									i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
								}
							}
							if ( (!i_notificationIsDelayed && l_wasFull) || (i_notificationIsDelayed && isEmptyWithoutLocking ())) {
								Monitor.PulseAll (this);
							}
							return l_readObject;
						}
						else {
							if (i_isFinishedWriting) {
								throw new NoMoreDataException ("");
							}
							if (a_timeOutPeriodInMilliseconds != -1) {
								throw new TimeOutException ("");
							}
						}
					}
				}
				
				[MethodImpl (MethodImplOptions.Synchronized)]
				public Boolean isEmpty () {
					return isEmptyWithoutLocking ();
				}
				
				[MethodImpl (MethodImplOptions.Synchronized)]
				public Boolean isFull () {
					return isFullWithoutLocking ();
				}
				
				// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
				[MethodImpl (MethodImplOptions.Synchronized)]
				public void write (T a_object, Int32 a_timeOutPeriodInMilliseconds = -1) {
					writeWithoutLocking (a_object, a_timeOutPeriodInMilliseconds);
				}
				
				// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
				[MethodImpl (MethodImplOptions.Synchronized)]
				public Int32 write (T [] a_objects, Int32 a_offset, Int32 a_length, Int32 a_timeOutPeriodInMilliseconds = -1) {
					Int32 l_writtenLength = 0;
					for (l_writtenLength = 0; l_writtenLength < a_length; l_writtenLength ++) {
						try {
							if (l_writtenLength == 0 || ! (isFullWithoutLocking ())) {
								writeWithoutLocking (a_objects [a_offset + l_writtenLength], a_timeOutPeriodInMilliseconds);
							}
						}
						catch (NoMoreNeedsException l_exception) {
							if (l_writtenLength == 0) {
								throw l_exception;
							}
							else {
								break;
							}
						}
					}
					return l_writtenLength;
				}
				
				// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
				[MethodImpl (MethodImplOptions.Synchronized)]
				public T read (Int32 a_timeOutPeriodInMilliseconds = -1) {
					return readWithoutLocking (a_timeOutPeriodInMilliseconds);
				}
				
				// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
				[MethodImpl (MethodImplOptions.Synchronized)]
				public Int32 read (T [] a_objects, Int32 a_offset, Int32 a_length, Int32 a_timeOutPeriodInMilliseconds = -1) {
					Int32 l_readLength = 0;
					for (; l_readLength < a_length; l_readLength ++) {
						if ( (l_readLength == 0) || !isEmptyWithoutLocking ()) {
							a_objects [a_offset + l_readLength] = readWithoutLocking (a_timeOutPeriodInMilliseconds);
						}
						else {
							break;
						}
					}
					return l_readLength;
				}
				
				[MethodImpl (MethodImplOptions.Synchronized)]
				public List <T> readWholeData () {
					List <T> l_objectsList = new List <T> ();
					while (true) {
						try {
							l_objectsList.Add (readWithoutLocking ());
						}
						catch (NoMoreDataException) {
							break;
						}
					}
					return l_objectsList;
				}
				
				[MethodImpl (MethodImplOptions.Synchronized)]
				public void finishWriting () {
					i_isFinishedWriting = true;
					Monitor.PulseAll (this);
				}
				
				[MethodImpl (MethodImplOptions.Synchronized)]
				public void finishReading () {
					i_isFinishedReading = true;
					Monitor.PulseAll (this);
				}
				
				[MethodImpl (MethodImplOptions.Synchronized)]
				public void reset () {
					i_isFinishedWriting = false;
					i_isFinishedReading = false;
					i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
					i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
				}
			}
		}
	}
}

'theBiasPlanet/coreUtilities/inputsHandling/NoMoreDataException.cs'

@C# Source Code
namespace theBiasPlanet {
	namespace coreUtilities {
		namespace inputsHandling {
			using System;
			
			public class NoMoreDataException : Exception {
				public NoMoreDataException (String a_message) : base (a_message) {
				}
			}
		}
	}
}

'theBiasPlanet/coreUtilities/inputsHandling/NoMoreNeedsException.cs'

@C# Source Code
namespace theBiasPlanet {
	namespace coreUtilities {
		namespace inputsHandling {
			using System;
			
			public class NoMoreNeedsException : Exception {
				public NoMoreNeedsException (String a_message) : base (a_message) {
				}
			}
		}
	}
}

'theBiasPlanet/coreUtilities/timersHandling/TimeOutException.cs'

@C# Source Code
namespace theBiasPlanet {
	namespace coreUtilities {
		namespace timersHandling {
			using System;
			
			public class TimeOutException : Exception {
				public TimeOutException (String a_message) : base (a_message) {
				}
			}
		}
	}
}

'theBiasPlanet/coreUtilities/messagingHandling/Publisher.cs' is totally omitted because the used method just writes logs.

The explanation of the code is almost the same for the Java version, but I will do the explanation including the duplication.

The constructor takes the buffer size and the algorithm choice for awaking the readers.

'i_objects' is the buffer.

'i_dataStartIndex' and 'i_dataUntilIndex' are the start index and the until (without including) index of the valued area; when the buffer is empty, they are '0' and '0', respectively.

All the public method templates except the constructor are synchronized on the 'this' object.

The 'readWithoutLocking (Int32 a_timeOutPeriodInMilliseconds = -1)' method has the loop on the basis that there may be some multiple readers: a reader that had been waiting on the empty buffer may have been awaken to find out that the written objects have been already snatched by the other readers, being required to wait once more.

Likewise, the 'writeWithoutLocking (T a_object, Int32 a_timeOutPeriodInMilliseconds = -1)' method has the loop on the basis that there may be some multiple writers.

The behavior of the 'writeWithoutLocking (T a_object, Int32 a_timeOutPeriodInMilliseconds = -1)' method when the pipe has been declared finished reading is somehow different from the behavior of the 'readWithoutLocking (Int32 a_timeOutPeriodInMilliseconds = -1)' method when the pipe has been declared finished writing, because the writer will not need to write any more at all, while the reader will want to read up the objects already stored in the buffer.

Be aware that I have not intensively tested the code, yet, although I have used it in some simple cases, one of which is shown in the next section.


3: An Example Usage and an Execution Result


Hypothesizer 7
This is a sample program that uses the objects pipe.

@C# Source Code
namespace theBiasPlanet {
	namespace coreUtilitiesTests {
		namespace pipesTest1 {
			using System;
			using System.Threading;
			using theBiasPlanet.coreUtilities.inputsHandling;
			using theBiasPlanet.coreUtilities.pipes;
			
			public class Test1Test {
				public static void main (String [] a_argumentsArray) {
					test2 ();
				}
				
				public static void prepareIntegers (ObjectsPipe <Int32> a_writer) {
					for (int l_iterationIndex = 0; l_iterationIndex < 512; l_iterationIndex ++) {
						try {
							a_writer.write (l_iterationIndex);
						}
						catch (NoMoreNeedsException) {
							break;
						}
						Console.Out.WriteLine (String.Format ("### written: {0:d}", l_iterationIndex));
						Console.Out.Flush ();
					}
				}
				
				private static void processIntegers (ObjectsPipe <Int32> a_reader) {
					Int32 l_integer = -1;
					int l_numberOfMultipleOf10s = 0;
					while (true) {
						try {
							l_integer = a_reader.read ();
						}
						catch (NoMoreDataException) {
							break;
						}
						Console.Out.WriteLine (String.Format ("### read: {0:d}", l_integer));
						Console.Out.Flush ();
						if (l_integer % 10 == 0) {
							l_numberOfMultipleOf10s ++;
							Console.Out.WriteLine (String.Format ("### a multiple of 10s is found."));
							Console.Out.Flush ();
						}
					}
					Console.Out.WriteLine (String.Format ("### the number of multiple of 10s is {0:d}.", l_numberOfMultipleOf10s));
					Console.Out.Flush ();
				}
				
				private static void test2 () {
					ObjectsPipe <Int32> l_integersPipe = new ObjectsPipe <Int32> (16, true);
					//ObjectsPipe <Int32> l_integersPipe = new ObjectsPipe <Int32> (16, false);
					Thread l_subThread = new Thread (() => {
						try {
							prepareIntegers (l_integersPipe);
						}
						catch (Exception l_exception) {
							Console.Out.WriteLine (l_exception.ToString ());
						}
						finally {
							try {
								l_integersPipe.finishWriting ();
							}
							catch (Exception l_exception) {
								Console.Out.WriteLine (l_exception.ToString ());
							}
						}
					});
					l_subThread.Start ();
					processIntegers (l_integersPipe);
					l_subThread.Join ();
				}
			}
		}
	}
}

This is an output, in my single-core single-CPU Linux computer (Mono is used) (which should be influencing the output).

@Output
### written: 0
### written: 1
### written: 2
### written: 3
### written: 4
### written: 5
### written: 6
### written: 7
### written: 8
### written: 9
### written: 10
### written: 11
### written: 12
### written: 13
### written: 14
### read: 0
### a multiple of 10s is found.
### read: 1
### read: 2
### read: 3
### read: 4
### read: 5
### read: 6
### read: 7
### read: 8
### read: 9
### read: 10
### a multiple of 10s is found.
### read: 11
### read: 12
### read: 13
### read: 14
### read: 15
### written: 15
~
### written: 496
### written: 497
### written: 498
### written: 499
### written: 500
### written: 501
### written: 502
### written: 503
### written: 504
### written: 505
### written: 506
### written: 507
### written: 508
### written: 509
### written: 510
### written: 511
### read: 496
### read: 497
### read: 498
### read: 499
### read: 500
### a multiple of 10s is found.
### read: 501
### read: 502
### read: 503
### read: 504
### read: 505
### read: 506
### read: 507
### read: 508
### read: 509
### read: 510
### a multiple of 10s is found.
### read: 511
### the number of multiple of 10s is 52.

. . . Well, what happened is that the reader acted first and was made waiting because the pipe was empty; the writer wrote to the full and awoke the reader; the reader began to read; and so on, I guess. . . . Note that '15' should have been written before the reader was awoken although "### written: 15" was delayed shown: immediately after the 'write' method was finished, the control switched to the reading thread, with the messaging delayed until the control was returned to the writing thread.

When the notification timing mode is set 'false', this is the output.

@Output
### read: 0
### a multiple of 10s is found.
### written: 0
### read: 1
### written: 1
### read: 2
### written: 2
### read: 3
### written: 3
### read: 4
### written: 4
### read: 5
### written: 5
### read: 6
### written: 6
### read: 7
### written: 7
### read: 8
### written: 8
### read: 9
### written: 9
### read: 10
### a multiple of 10s is found.
### written: 10
### written: 11
### written: 12
### written: 13
### written: 14
### written: 15
### written: 16
### written: 17
### written: 18
### written: 19
### written: 20
### written: 21
### written: 22
### written: 23
### written: 24
### written: 25
### written: 26
### read: 11
### read: 12
### read: 13
### read: 14
### read: 15
~
### written: 496
### written: 497
### written: 498
### written: 499
### written: 500
### written: 501
### written: 502
### written: 503
### written: 504
### written: 505
### written: 506
### written: 507
### written: 508
### written: 509
### written: 510
### written: 511
### read: 496
### read: 497
### read: 498
### read: 499
### read: 500
### a multiple of 10s is found.
### read: 501
### read: 502
### read: 503
### read: 504
### read: 505
### read: 506
### read: 507
### read: 508
### read: 509
### read: 510
### a multiple of 10s is found.
### read: 511
### the number of multiple of 10s is 52.

. . . The reader acted first and was made waiting because the pipe was empty; the writer wrote '0' and awoke the reader; the reader began to read; and so on, I guess. . . . Again, "### written: 0" was delayed shown.

In those executions, the clear difference had appeared only for a little while at the beginning.

As I have measured the times taken (with the costly messagings eliminated) in the 2 modes (5 times each mode), the times were {'176,234,000', '93,412,000', '107,652,000', '147,381,000', and '130,540,000'} and {'99,691,000', '260,851,000', '216,234,000', '147,965,000', and '208,565,000'}, respectively, in nanoseconds (obviously, there is no such precision, really). Due to the unstability of the times, I do not particularly draw any conclusion.


References


<The previous article in this series | The table of contents of this series | The next article in this series>