2020-03-29

2: A Java Objects Pipe, Which Conveys Objects Serially

<The previous article in this series | The table of contents of this series | The next article in this series>

Serially because otherwise, the memory would be used up. Does for objects what 'java.io.PipedWriter' + 'java.io.PipedReader' does for characters.

Topics


About: The Java programming language

The table of contents of this article


Starting Context


  • The reader has a basic knowledge on the Java programming language.

Target Context


  • The reader will know an objects pipe.

Orientation



Main Body


1: The Harm of Using Up the Memory


Hypothesizer 7
A major harm that can be caused by a program is using up the memory, which could paralyze the whole computer.

It is all the more annoying because the computer is occupied doing what is futile: thrashing. . . . If the system was busy doing my calculations, I would appreciate the effort.

The memory has been used up because the program has hoarded many data, do which, I have to ask, really have to have been piled up in the memory?

Let me think of an example.

@Java Source Code
import java.util.ArrayList;
import java.util.List;

~
	~
	
	public static ArrayList <Integer> prepareAndHoardIntegers () {
		ArrayList <Integer> l_integers = new ArrayList <Integer> ();
		for (int l_iterationIndex = 0; l_iterationIndex < 512; l_iterationIndex ++) {
			l_integers.add (Integer.valueOf (l_iterationIndex));
		}
		return l_integers;
	}
	
	public static void processPreparedAndHoardedIntegers (List <Integer> a_integers)  {
		int l_numberOfMultipleOf10s = 0;
		for (Integer l_integer: a_integers) {
			System.out.println (String.format ("### read: %d", l_integer.intValue ()));
			System.out.flush ();
			if (l_integer.intValue () % 10 == 0) {
				l_numberOfMultipleOf10s ++;
				System.out.println (String.format ("### a multiple of 10s is found."));
				System.out.flush ();
			}
		}
		System.out.println (String.format ("### the number of multiple of 10s is %d.", l_numberOfMultipleOf10s));
		System.out.flush ();
	}
	
	public static void test00 () throws Exception {
		ArrayList <Integer> l_integers = prepareAndHoardIntegers ();
		processPreparedAndHoardedIntegers (l_integers);
	}
~

That uses up the memory (if the data amount is large enough) because all the objects are stored up in the memory first, before beginning to be processed.

So, should it do just like this?

@Java Source Code
~
	~
	
	public static void prepareAndProcessIntegers () {
		Integer l_integer = null;
		int l_numberOfMultipleOf10s = 0;
		for (int l_iterationIndex = 0; l_iterationIndex < 512; l_iterationIndex ++) {
			l_integer = Integer.valueOf (l_iterationIndex);
			System.out.println (String.format ("### read: %d", l_integer.intValue ()));
			System.out.flush ();
			if (l_integer.intValue () % 10 == 0) {
				l_numberOfMultipleOf10s ++;
				System.out.println (String.format ("### a multiple of 10s is found."));
				System.out.flush ();
			}
		}
		System.out.println (String.format ("### the number of multiple of 10s is %d.", l_numberOfMultipleOf10s));
		System.out.flush ();
	}
	
	public static void test01 () {
		prepareAndProcessIntegers ();
	}
~

. . . Well, the problem is, the preparing function is meant to just prepare the objects, not process the objects, as the role. . . . In fact, the preparing function does not know anything about what to do with the objects.

Certainly, there is an end run, in which the function takes a processor instance of a specific interface, like this.

@Java Source Code
~
	~
	
	static interface IntegerProcessor {
		void process (Integer a_integer);
		void summarize ();
	}
	
	static public class AIntegerProcessor implements IntegerProcessor {
		private int i_numberOfMultipleOf10s = 0;
		
		@Override
		public void process (Integer a_integer) {
			System.out.println (String.format ("### read: %d", a_integer.intValue ()));
			System.out.flush ();
			if (a_integer.intValue () % 10 == 0) {
				i_numberOfMultipleOf10s ++;
				System.out.println (String.format ("### a multiple of 10s is found."));
				System.out.flush ();
			}
		}
		
		@Override
		public void summarize () {
			System.out.println (String.format ("### the number of multiple of 10s is %d.", i_numberOfMultipleOf10s));
			System.out.flush ();
		}
	}
	
	public static void prepareAndProcessIntegers (IntegerProcessor a_integerProcessor) {
		Integer l_integer = null;
		int l_numberOfMultipleOf10s = 0;
		for (int l_iterationIndex = 0; l_iterationIndex < 512; l_iterationIndex ++) {
			l_integer = Integer.valueOf (l_iterationIndex);
			a_integerProcessor.process (l_integer);
		}
		a_integerProcessor.summarize ();
	}
	
	public static void test02 () {
		prepareAndProcessIntegers (new AIntegerProcessor ());
	}
~

. . . However, the problem is, it is not necessarily that all the objects can be processed at a sitting: in the above code, the timing for each object to be processed is inescapably trapped inside the 'prepareAndProcessIntegers (IntegerProcessor a_integerProcessor)' function.

More generally speaking, as the preparing function is meant to be purely for preparing objects, the design should not be deformed cheap-jack-ly, or the deformation could erode the whole code.

In order to solve the problem without deforming anything, I need an objects pipe, which conveys objects serially from a place to another.


2: 'java.io.PipedWriter' + 'java.io.PipedReader', as an Inspiration


Hypothesizer 7
In fact, Java has 'java.io.PipedWriter' + 'java.io.PipedReader', if what are to be conveyed are characters.

They are used in a pair, and when some characters are serially written into a 'java.io.PipedWriter' instance, they are serially read from the 'java.io.PipedReader' instance that is connected to the 'java.io.PipedWriter' instance.

This is an example usage.

@Java Source Code
import java.io.PipedReader;
import java.io.PipedWriter;
import java.io.Reader;
import java.io.Writer;
~
	
	public static void prepareString (Writer a_writer) throws Exception {
		for (int l_iterationIndex = 0; l_iterationIndex < 512; l_iterationIndex ++) {
			a_writer.write (String.format ("%d", l_iterationIndex));
			System.out.println (String.format ("### written: %d", l_iterationIndex));
			System.out.flush ();
		}
	}
	
	public static void processString (Reader a_reader) throws Exception {
		int l_character = -1;
		int l_numberOf0s = 0;
		int l_codeOf0 = '0';
		while ((l_character = a_reader.read ()) != -1) {
			System.out.println (String.format ("### read: %c", (char) l_character));
			if (l_character == l_codeOf0) {
				l_numberOf0s ++;
				System.out.println (String.format ("### a 0 is found."));
				System.out.flush ();
			}
		}
		System.out.println (String.format ("### the number of 0s is %d.", l_numberOf0s));
		System.out.flush ();
	}
	
	public static void test1 () throws Exception {
		PipedWriter l_pipedStringWriter = new PipedWriter ();
		PipedReader l_pipedStringReader = new PipedReader (l_pipedStringWriter, 16);
		Thread l_subThread = new Thread (() -> {
			try {
				prepareString (l_pipedStringWriter);
			}
			catch (Exception l_exception) {
				l_exception.printStackTrace ();
			}
			finally {
				try {
					l_pipedStringWriter.close ();
				}
				catch (Exception l_exception) {
					l_exception.printStackTrace ();
				}
			}
		});
		l_subThread.start ();
		processString (l_pipedStringReader);
		l_pipedStringReader.close ();
		l_subThread.join ();
	}
~

This is an output.

@Output
### written: 0
### written: 1
### written: 2
### written: 3
### written: 4
### written: 5
### written: 6
### written: 7
### written: 8
### written: 9
### written: 10
### written: 11
### written: 12
### read: 0
### 0 is found.
### read: 1
### read: 2
### read: 3
### read: 4
### read: 5
### read: 6
### read: 7
### read: 8
### read: 9
### read: 1
### read: 0
### 0 is found.
### read: 1
### read: 1
### read: 1
### read: 2
~
### written: 506
### written: 507
### written: 508
### written: 509
### written: 510
### read: 5
### read: 0
### 0 is found.
### read: 6
### read: 5
### read: 0
### 0 is found.
### read: 7
### read: 5
### read: 0
### 0 is found.
### read: 8
### read: 5
### read: 0
### 0 is found.
### read: 9
### read: 5
### read: 1
### read: 0
### 0 is found.
### read: 5
### written: 511
### read: 1
### read: 1
### the number of 0s is 102.

What is good about it?

That "16" is the buffer size of the pipe, which means that the pipe never hoards data more than that. The writer writes if the pipe is not filled, or waits; the reader reads if the pipe is not empty (of course, the read data are disposed from the pipe), or waits.

The point is, the reader can read the pipe at the timing of its liking: the reader is not forced to immediately read a datum just because the writer has written the datum; the buffer may be filled if the reader neglects to read for a while, but the reader can just let the writer wait; on the other hand, if the reader happens to try to read the pipe to find the pipe empty, the reader will wait, which will let the writer do writing.


3: The Plan


Hypothesizer 7
So, is the object pipe just a buffer to which the writer writes and from which the reader reads?

Well, basically, yes, but the buffer has to be intelligent enough to appropriately handle concurrent accesses, allowing an access, making an access wait, awaking a waiting access, etc..

Besides, the writer has to have a means to notice that it has finished writing; the reader to notice that it has finished reading.

Besides, the writer or the reader may want to timeout in writing or reading.

The buffer has to be used cyclically in order to avoid moving data around: for example, when the buffer size is 16 and the writer writes 10 objects, the reader reads 5 objects, and the writer writes 10 objects, the valued area changes from '' to '0->9' to '5->9' to '5->15;0->3' (which I represent as just '5->3').

As for the algorithm to prioritize the writer over the reader, there may be some sophisticated options, but I have implemented 2 rather simple options: 1) the reader is awaken when the buffer has become full and 2) the reader is awaken when the buffer has become non-empty, while the writer is always awaken when the buffer has become non-full. . . . I have to be aware that the former option may make the reader wait unnecessarily for a long time if the writer does not write diligently, although it may be more efficient when the writer does so.

Usually, there are supposed to be a single writer and a single reader, but the possibility of multiple writers or multiple readers is not particularly ruled out, although I have not tested for such cases, yet.


4: The Code and Its Explanation


Hypothesizer 7
This is the code of my objects pipe class and its related classes.

'theBiasPlanet/coreUtilities/pipes/ObjectsPipe.java'

@Java Source Code
package theBiasPlanet.coreUtilities.pipes;

import java.util.ArrayList;
import java.util.List;
import theBiasPlanet.coreUtilities.constantsGroups.GeneralConstantsConstantsGroup;
import theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException;
import theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException;
import theBiasPlanet.coreUtilities.messagingHandling.Publisher;
import theBiasPlanet.coreUtilities.timersHandling.TimeOutException;

public class ObjectsPipe <T> {
	protected Object [] i_objects;
	protected int i_bufferLength = 0;
	// No data: i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber
	protected int i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
	protected int i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
	protected boolean i_isFinishedWriting = false;
	protected boolean i_isFinishedReading = false;
	protected boolean i_notificationIsDelayed = false;
	
	public ObjectsPipe (int a_bufferLength, boolean a_notificationIsDelayed) {
		i_bufferLength = a_bufferLength;
		i_objects = new Object [i_bufferLength];
		i_notificationIsDelayed = a_notificationIsDelayed;
	}
	
	@Override
	protected void finalize () {
	}
	
	protected boolean isEmptyWithoutLocking () {
		return i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber;
	}
	
	protected boolean isFullWithoutLocking () {
		return (i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == i_bufferLength) || (i_dataStartIndex != GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataStartIndex == i_dataUntilIndex);
	}
	
	// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	protected void writeWithoutLocking (T a_object, long a_timeOutPeriodInMilliseconds) throws NoMoreNeedsException, TimeOutException {
		if (i_isFinishedReading) {
			throw new NoMoreNeedsException ("");
		}
		if (i_isFinishedWriting) {
			i_isFinishedWriting = false;
		}
		while (true) {
			if (isFullWithoutLocking ()) {
				try {
					if (a_timeOutPeriodInMilliseconds == -1) {
						this.wait ();
					}
					else if (a_timeOutPeriodInMilliseconds == 0) {
					}
					else {
						this.wait (a_timeOutPeriodInMilliseconds);
					}
				}
				catch (InterruptedException l_exception) {
					Publisher.logErrorInformation (l_exception);
				}
			}
			// Checked again because the status may have changed while this thread was waiting.
			if (i_isFinishedReading) {
				throw new NoMoreNeedsException ("");
			}
			if (!isFullWithoutLocking ()) {
				boolean l_wasEmpty = isEmptyWithoutLocking ();
				if (i_dataUntilIndex == i_bufferLength) {
					i_objects [GeneralConstantsConstantsGroup.c_iterationStartNumber] = a_object;
					i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber + 1;
				}
				else {
					i_objects [i_dataUntilIndex] = a_object;
					i_dataUntilIndex ++;
				}
				if ( (!i_notificationIsDelayed && l_wasEmpty) || (i_notificationIsDelayed && isFullWithoutLocking ())) {
					this.notifyAll ();
				}
				return;
			}
			else {
				if (a_timeOutPeriodInMilliseconds != -1) {
					throw new TimeOutException ("");
				}
			}
		}
	}
	
	@SuppressWarnings("unchecked")
	// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	protected T readWithoutLocking (long a_timeOutPeriodInMilliseconds) throws NoMoreDataException, TimeOutException {
		T l_readObject = null;
		if (i_isFinishedReading) {
			i_isFinishedReading = false;
		}
		while (true) {
			if (isEmptyWithoutLocking ()) {
				if (!i_isFinishedWriting) {
					try {
						if (a_timeOutPeriodInMilliseconds == -1) {
							this.wait ();
						}
						else if (a_timeOutPeriodInMilliseconds == 0) {
						}
						else {
							this.wait (a_timeOutPeriodInMilliseconds);
						}
					}
					catch (InterruptedException l_exception) {
						Publisher.logErrorInformation (l_exception);
					}
				}
				else {
					throw new NoMoreDataException ("");
				}
			}
			// Checked again because the status may have changed while this thread was waiting.
			if (!isEmptyWithoutLocking ()) {
				boolean l_wasFull = isFullWithoutLocking ();
				l_readObject = (T) i_objects [i_dataStartIndex];
				i_objects [i_dataStartIndex] = null;
				i_dataStartIndex ++;
				if (i_dataStartIndex == i_dataUntilIndex) {
					i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
					i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
				}
				else {
					if (i_dataStartIndex == i_bufferLength) {
						i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
					}
				}
				if ( (!i_notificationIsDelayed && l_wasFull) || (i_notificationIsDelayed && isEmptyWithoutLocking ())) {
					this.notifyAll ();
				}
				return l_readObject;
			}
			else {
				if (i_isFinishedWriting) {
					throw new NoMoreDataException ("");
				}
				if (a_timeOutPeriodInMilliseconds != -1) {
					throw new TimeOutException ("");
				}
			}
		}
	}
	
	public synchronized boolean isEmpty () {
		return isEmptyWithoutLocking ();
	}
	
	public synchronized boolean isFull () {
		return isFullWithoutLocking ();
	}
	
	// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	public synchronized void write (T a_object, long a_timeOutPeriodInMilliseconds) throws NoMoreNeedsException, TimeOutException {
		writeWithoutLocking (a_object, a_timeOutPeriodInMilliseconds);
	}
	
	public synchronized void write (T a_object) throws NoMoreNeedsException {
		try {
			write (a_object, -1);
		}
		catch (TimeOutException l_exception) {
			// impossible
		}
	}
	
	// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	public synchronized int write (T [] a_objects, int a_offset, int a_length, long a_timeOutPeriodInMilliseconds) throws NoMoreNeedsException, TimeOutException {
		int l_writtenLength = 0;
		for (l_writtenLength = 0; l_writtenLength < a_length; l_writtenLength ++) {
			try {
				if (l_writtenLength == 0 || ! (isFullWithoutLocking ())) {
					writeWithoutLocking (a_objects [a_offset + l_writtenLength], a_timeOutPeriodInMilliseconds);
				}
			}
			catch (NoMoreNeedsException l_exception) {
				if (l_writtenLength == 0) {
					throw l_exception;
				}
				else {
					break;
				}
			}
		}
		return l_writtenLength;
	}
	
	public synchronized int write (T [] a_objects, int a_offset, int a_length) throws NoMoreNeedsException {
		try {
			return write (a_objects, a_offset, a_length, -1);
		}
		catch (TimeOutException l_exception) {
			// impossible
			return 0;
		}
	}
	
	// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	public synchronized T read (long a_timeOutPeriodInMilliseconds) throws NoMoreDataException, TimeOutException {
		return readWithoutLocking (a_timeOutPeriodInMilliseconds);
	}
	
	public synchronized T read () throws NoMoreDataException {
		try {
			return read (-1);
		}
		catch (TimeOutException l_exception) {
			// impossible
			return null;
		}
	}
	
	// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	public synchronized int read (T [] a_objects, int a_offset, int a_length, long a_timeOutPeriodInMilliseconds) throws NoMoreDataException, TimeOutException {
		int l_readLength = 0;
		for (; l_readLength < a_length; l_readLength ++) {
			if ( (l_readLength == 0) || !isEmptyWithoutLocking ()) {
				a_objects [a_offset + l_readLength] = readWithoutLocking (a_timeOutPeriodInMilliseconds);
			}
			else {
				break;
			}
		}
		return l_readLength;
	}
	
	public synchronized int read (T [] a_objects, int a_offset, int a_length) throws NoMoreDataException {
		try {
			return read (a_objects,  a_offset,  a_length, -1);
		}
		catch (TimeOutException l_exception) {
			// impossible
			return 0;
		}
	}
	
	public synchronized List <T> readWholeData () {
		List <T> l_objectsList = new ArrayList <T> ();
		while (true) {
			try {
				l_objectsList.add (readWithoutLocking (-1));
			}
			catch (TimeOutException l_exception) {
				// impossible
			}
			catch (NoMoreDataException l_exception) {
				break;
			}
		}
		return l_objectsList;
	}
	
	public synchronized void finishWriting () {
		i_isFinishedWriting = true;
		this.notifyAll ();
	}
	
	public synchronized void finishReading () {
		i_isFinishedReading = true;
		this.notifyAll ();
	}
	
	public synchronized void reset () {
		i_isFinishedWriting = false;
		i_isFinishedReading = false;
		i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
		i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
	}
}

'theBiasPlanet/coreUtilities/constantsGroups/GeneralConstantsConstantsGroup.java'

@Java Source Code
package theBiasPlanet.coreUtilities.constantsGroups;

~

public interface GeneralConstantsConstantsGroup {
	~
	int c_iterationStartNumber = 0;
	~
}

'theBiasPlanet/coreUtilities/inputsHandling/NoMoreDataException.java'

@Java Source Code
package theBiasPlanet.coreUtilities.inputsHandling;

public class NoMoreDataException extends Exception {
	public NoMoreDataException (String a_message) {
		super (a_message);
	}
}

'theBiasPlanet/coreUtilities/inputsHandling/NoMoreNeedsException.java'

@Java Source Code
package theBiasPlanet.coreUtilities.inputsHandling;

public class NoMoreNeedsException extends Exception {
	public NoMoreNeedsException (String a_message) {
		super (a_message);
	}
}

'theBiasPlanet/coreUtilities/messagingHandling/Publisher.java' is totally omitted because the used method just writes logs.

'theBiasPlanet/coreUtilities/timersHandling/TimeOutException.java'

@Java Source Code
package theBiasPlanet.coreUtilities.timersHandling;

public class TimeOutException extends Exception {
	public TimeOutException (String a_message) {
		super (a_message);
	}
}

Note that because of the restriction that any Java generics type parameter is not allowed to be any primitive datum type, the objects pipe is an 'Objects' pipe, but of course, the wrapper class of any primitive datum type can be used.

The constructor takes the buffer size and the algorithm choice for awaking the readers.

'i_objects' is the buffer.

'i_dataStartIndex' and 'i_dataUntilIndex' are the start index and the until (without including) index of the valued area; when the buffer is empty, they are '0' and '0', respectively.

All the public methods except the constructor are synchronized on the 'this' object.

The 'readWithoutLocking (long a_timeOutPeriodInMilliseconds)' method has the loop on the basis that there may be some multiple readers: a reader that had been waiting on the empty buffer may have been awoken to find out that the written objects have been already snatched by the other readers, being required to wait once more (such points to note about threads synchronization will be discussed in the next article).

Likewise, the 'writeWithoutLocking (T a_object, long a_timeOutPeriodInMilliseconds)' method has the loop on the basis that there may be some multiple writers.

The behavior of the 'writeWithoutLocking (T a_object, long a_timeOutPeriodInMilliseconds)' method when the pipe has been declared finished reading is somehow different from the behavior of the 'readWithoutLocking (long a_timeOutPeriodInMilliseconds)' method when the pipe has been declared finished writing, because the writer will not need to write any more at all, while the reader will want to read up the objects already stored in the buffer.

Be aware that I have not intensively tested the code, yet, although I have used it in some simple cases, one of which is shown in the next section.


5: An Example Usage and an Execution Result


Hypothesizer 7
This is a sample program that uses the objects pipe.

@Java Source Code
import theBiasPlanet.coreUtilities.pipes.ObjectsPipe;
import theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException;
import theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException;

~
	~
	
	public static void prepareIntegers (ObjectsPipe <Integer> a_writer) throws Exception {
		for (int l_iterationIndex = 0; l_iterationIndex < 512; l_iterationIndex ++) {
			try {
				a_writer.write (Integer.valueOf (l_iterationIndex));
			}
			catch (NoMoreNeedsException l_exception) {
				break;
			}
			System.out.println (String.format ("### written: %d", l_iterationIndex));
			System.out.flush ();
		}
	}
	
	public static void processIntegers (ObjectsPipe <Integer> a_reader) throws Exception {
		Integer l_integer = null;
		int l_numberOfMultipleOf10s = 0;
		while (true) {
			try {
				l_integer = a_reader.read ();
			}
			catch (NoMoreDataException l_exception) {
				break;
			}
			System.out.println (String.format ("### read: %d", l_integer.intValue ()));
			System.out.flush ();
			if (l_integer.intValue () % 10 == 0) {
				l_numberOfMultipleOf10s ++;
				System.out.println (String.format ("### a multiple of 10s is found."));
				System.out.flush ();
			}
		}
		System.out.println (String.format ("### the number of multiple of 10s is %d.", l_numberOfMultipleOf10s));
		System.out.flush ();
	}
	
	public static void test2 () throws Exception {
		ObjectsPipe <Integer> l_integersPipe = new ObjectsPipe <Integer> (16, true);
		Thread l_subThread = new Thread (() -> {
			try {
				prepareIntegers (l_integersPipe);
			}
			catch (Exception l_exception) {
				l_exception.printStackTrace ();
			}
			finally {
				try {
					l_integersPipe.finishWriting ();
				}
				catch (Exception l_exception) {
					l_exception.printStackTrace ();
				}
			}
		});
		l_subThread.start ();
		processIntegers (l_integersPipe);
		l_subThread.join ();
	}
~

This is an output, in my single-core single-CPU Linux computer (which should be influencing the output).

@Output
### written: 0
### written: 1
### written: 2
### written: 3
### written: 4
### written: 5
### written: 6
### written: 7
### written: 8
### written: 9
### written: 10
### written: 11
### written: 12
### written: 13
### written: 14
### read: 0
### a multiple of 10s is found.
### read: 1
### read: 2
### read: 3
### read: 4
### read: 5
### read: 6
### written: 15
### written: 16
### written: 17
### read: 7
### read: 8
### written: 18
### written: 19
### written: 20
### written: 21
### read: 9
### read: 10
### a multiple of 10s is found.
### written: 22
### written: 23
### read: 11
### read: 12
### read: 13
### read: 14
### written: 24
### written: 25
### written: 26
### read: 15
~
### written: 496
### written: 497
### written: 498
### written: 499
### written: 500
### written: 501
### written: 502
### read: 486
### read: 487
### read: 488
### read: 489
### read: 490
### a multiple of 10s is found.
### read: 491
### read: 492
### read: 493
### read: 494
### read: 495
### read: 496
### read: 497
### read: 498
### read: 499
### read: 500
### a multiple of 10s is found.
### read: 501
### written: 503
### written: 504
### written: 505
### written: 506
### written: 507
### written: 508
### written: 509
### written: 510
### written: 511
### read: 502
### read: 503
### read: 504
### read: 505
### read: 506
### read: 507
### read: 508
### read: 509
### read: 510
### a multiple of 10s is found.
### read: 511
### the number of multiple of 10s is 52.

. . . Well, what happened is that the reader acted first and was made waiting because the pipe was empty; the writer wrote to the full and awoke the reader; the reader began to read; and so on, I guess. . . . Note that '15' should have been written before the reader was awoken although "### written: 15" was delayed shown: immediately after the 'write' method was finished, the control switched to the reading thread, with the messaging delayed until the control was returned to the writing thread.

When the notification timing mode is set 'false', this is the output.

@Output
### read: 0
### a multiple of 10s is found.
### written: 0
### written: 1
### written: 2
### written: 3
### read: 1
### read: 2
### read: 3
### read: 4
### written: 4
### written: 5
### written: 6
### written: 7
### read: 5
### read: 6
### read: 7
### read: 8
### written: 8
### written: 9
### written: 10
### written: 11
### read: 9
### written: 12
### written: 13
### read: 10
### a multiple of 10s is found.
### read: 11
### read: 12
### written: 14
### written: 15
### written: 16
### written: 17
### read: 13
### read: 14
### read: 15
~
### written: 496
### read: 481
### read: 482
### read: 483
### read: 484
### read: 485
### read: 486
### read: 487
### read: 488
### read: 489
### written: 497
### written: 498
### written: 499
### written: 500
### written: 501
### written: 502
### written: 503
### written: 504
### written: 505
### written: 506
### read: 490
### a multiple of 10s is found.
### read: 491
### read: 492
### read: 493
### read: 494
### read: 495
### read: 496
### read: 497
### read: 498
### read: 499
### read: 500
### a multiple of 10s is found.
### read: 501
### read: 502
### written: 507
### written: 508
### written: 509
### written: 510
### written: 511
### read: 503
### read: 504
### read: 505
### read: 506
### read: 507
### read: 508
### read: 509
### read: 510
### a multiple of 10s is found.
### read: 511
### the number of multiple of 10s is 52.

. . . The reader acted first and was made waiting because the pipe was empty; the writer wrote '0' and awoke the reader; the reader began to read; and so on, I guess. . . . Again, "### written: 0" was delayed shown.

Anyway, only small number of slots in the buffer seem to tend to be used.

As I have measured the times taken (with the costly messagings eliminated) in the 2 modes (5 times each mode), the times were {'123,607,654', '113,359,567', '129,854,341', '87,619,575', and '99,663,616': the average -> '110,820,951'} and {'108,841,378', '154,165,815', '140,925,679', '111,987,546', and '113,260,213': the average -> '125,836,126'}, respectively, in nanoseconds. Is the difference significant? . . . I have not assessed it statistically.


6: The Conclusion and Beyond


Hypothesizer 7
Now, I have an objects pipe.

The benefit of any objects pipe is that the memory space will not be exhausted however many objects are processed. In fact, the above example program does not cause any buffer overflow however large the number of iteration is set.

I will translate the Java version into a C++ version, a C# version, and a Python version in some future articles.


References


<The previous article in this series | The table of contents of this series | The next article in this series>