Showing posts with label Let Me Understand the Java Programming Language. Show all posts
Showing posts with label Let Me Understand the Java Programming Language. Show all posts

2020-04-05

3: Java Threads Synchronization with 'wait'/'notify'/'notifyAll'

<The previous article in this series | The table of contents of this series | The next article in this series>

These are some points about threads synchronization with 'wait', 'notify', and 'notifyAll' in Java, which may spare one some slips.

Topics


About: Java

The table of contents of this article


Starting Context


  • The reader has a basic knowledge on the Java programming language.

Target Context


  • The reader will know some points to note about threads synchronization with 'wait', 'notify', and 'notifyAll' in Java.

Orientation


Hypothesizer 7
I tend to slip on threads synchronization with 'wait', 'notify', and 'notifyAll' in Java.

Is that my problem you are not concerned with? . . . Congratulations! . . . However, I guess that if so, you will not be reading this.

I assert that threads synchronization without 'wait' is not any problem: it is just that only a single thread can enter the guarded blocks.

However, when 'wait' is involved, the situation becomes more complicated, on which I tend to slip.

Basically, it is just a matter of carelessness, but vowing to be more careful next time is not any effective strategy, according to my experiences.

So, I have decided that I should record some points to note as a reference to turn to for threads synchronization.


Main Body


1: The Relation Between Lock and Monitor


Hypothesizer 7
First, let me clarify the relation between lock and monitor.

As a note, I talk only about Java lock and Java monitor. As more general concepts, lock and monitor may not be as are described in this article.

I clarify it not because I enjoy pointing out wrong usages of those terms by some people, but because it helps in clarifying the behaviors of threads synchronization.

Lock is a binary semaphore, which can be owned by only a single thread.

So, why is monitor required? . . . It is because just a bare lock is not enough for synchronizing threads.

For one thing, there has to be a list in which the threads anticipating to acquire the lock have to stay (note that I have not used the term, 'waiting'). I will call the list 'anticipating threads list' (is not any widely-accepted term).

For another thing, there has to be another list in which the threads waiting to acquire the lock have to stay (I use the term, 'wait', in a specific meaning, not in the general meaning). I will call the list 'waiting threads list' (is not any widely-accepted term).

'wait' here means to have called the 'wait' method without the waiting time (if specified) expired. I mean, any waiting thread is not really anticipating to acquire the lock, but anticipating to be notified to move to the anticipating threads list.

Here, I have assumed that any waiting thread first moves to the anticipating threads list and then acquires the lock. In fact, implementation-wise, one waiting thread may directly acquire the lock without taking the trouble of moving to the anticipating threads list, but at least the others of all the waiting threads notified by 'notifyAll' have to move to the anticipating threads list (because only one of all the waiting threads can directly acquire the lock), and concept-wise, it will be good to think that all the waiting threads first move to the anticipating threads list, even if one of them really skip the part.

So, it is important to know that there are 3 states for threads: owning the lock (only one thread can be owning the lock at the same time), anticipating to acquire the lock, and waiting to become able to anticipate to acquire the lock.

To return to what monitor is, monitor is a structure that manages the concerned threads in the 3 states.

In fact, I am not sure whether I should say "I use a monitor, which uses a lock" or should say "I use a monitor and a lock", but the difference will be a matter of the Java standard libraries implementation that has no consequence for users either way.


2: The Relation Between a Monitor and an Object


Hypothesizer 7
A monitor (with a lock) is built-in in the 'Object' class.

However, what are guarded in the guarded blocks are not necessarily the state of the object of the monitor.

Enigmatic? Let me look at an example.

@Java Source Code
public class ClassA {
	private ClassB i_classB = new ClassB ();
	private int i_integer = 0;
	
	public void synchronizedNoWaitingNoNotifyingMethod (int a_threadIdentification, int a_sleepTime) throws InterruptedException {
		System.out.println (String.format ("### the thread -> '%d' before the guarded block               : 'i_integer' -> '%d'", a_threadIdentification, i_integer));
		System.out.flush ();
		synchronized (i_classB) {
			i_integer ++;
			System.out.println (String.format ("### the thread -> '%d' in     the guarded block               : 'i_integer' -> '%d'", a_threadIdentification, i_integer));
			System.out.flush ();
			Thread.sleep (a_sleepTime);
		}
		System.out.println (String.format ("### the thread -> '%d' after  the guarded block               : 'i_integer' -> '%d'", a_threadIdentification, i_integer));
		System.out.flush ();
	}
	
	public void synchronizedWaitingMethod (int a_threadIdentification, int a_sleepTime) throws InterruptedException {
		System.out.println (String.format ("### the thread -> '%d' before the guarded block               : 'i_integer' -> '%d'", a_threadIdentification, i_integer));
		System.out.flush ();
		synchronized (i_classB) {
			i_integer ++;
			System.out.println (String.format ("### the thread -> '%d' in     the guarded block before waiting: 'i_integer' -> '%d'", a_threadIdentification, i_integer));
			System.out.flush ();
			i_classB.wait ();
			System.out.println (String.format ("### the thread -> '%d' in     the guarded block after  waiting: 'i_integer' -> '%d'", a_threadIdentification, i_integer));
			System.out.flush ();
			Thread.sleep (a_sleepTime);
		}
		System.out.println (String.format ("### the thread -> '%d' after  the guarded block               : 'i_integer' -> '%d'", a_threadIdentification, i_integer));
		System.out.flush ();
	}
	
	public void synchronizedNotifyingMethod (int a_threadIdentification, int a_sleepTime) throws InterruptedException {
		System.out.println (String.format ("### the thread -> '%d' before the guarded block               : 'i_integer' -> '%d'", a_threadIdentification, i_integer));
		System.out.flush ();
		synchronized (i_classB) {
			i_integer ++;
			System.out.println (String.format ("### the thread -> '%d' in     the guarded block               : 'i_integer' -> '%d'", a_threadIdentification, i_integer));
			System.out.flush ();
			i_classB.notifyAll ();
			Thread.sleep (a_sleepTime);
		}
		System.out.println (String.format ("### the thread -> '%d' after  the guarded block               : 'i_integer' -> '%d'", a_threadIdentification, i_integer));
		System.out.flush ();
	}
}

public class ClassB {
}

public class Test1Test {
	private Test1Test () {
	}
	
	public static void main (String [] a_arguments) throws Exception {
		Test1Test.test ();
	}
	
	public static void test () throws Exception {
		ClassA l_classA = new ClassA ();
		Thread l_subThread0 = new Thread (() -> {
			try {
				l_classA.synchronizedNoWaitingNoNotifyingMethod (0, 10000);
			}
			catch (Exception l_exception) {
				l_exception.printStackTrace ();
			}
		});
		Thread l_subThread1 = new Thread (() -> {
			try {
				l_classA.synchronizedNoWaitingNoNotifyingMethod (1, 0);
			}
			catch (Exception l_exception) {
				l_exception.printStackTrace ();
			}
		});
		Thread l_subThread2 = new Thread (() -> {
			try {
				l_classA.synchronizedWaitingMethod (2, 0);
			}
			catch (Exception l_exception) {
				l_exception.printStackTrace ();
			}
		});
		Thread l_subThread3 = new Thread (() -> {
			try {
				l_classA.synchronizedWaitingMethod (3, 0);
			}
			catch (Exception l_exception) {
				l_exception.printStackTrace ();
			}
		});
		Thread l_subThread4 = new Thread (() -> {
			try {
				l_classA.synchronizedNotifyingMethod (4, 0);
			}
			catch (Exception l_exception) {
				l_exception.printStackTrace ();
			}
		});
		Thread l_subThread5 = new Thread (() -> {
			try {
				l_classA.synchronizedNoWaitingNoNotifyingMethod (5, 0);
			}
			catch (Exception l_exception) {
				l_exception.printStackTrace ();
			}
		});
		l_subThread0.start ();
		Thread.sleep (1000);
		l_subThread5.start ();
		Thread.sleep (1000);
		l_subThread4.start ();
		Thread.sleep (1000);
		l_subThread3.start ();
		Thread.sleep (1000);
		l_subThread2.start ();
		Thread.sleep (1000);
		l_subThread1.start ();
		l_subThread0.join ();
		l_subThread1.join ();
		l_subThread2.join ();
		l_subThread3.join ();
		l_subThread4.join ();
		l_subThread5.join ();
	}
}

Certainly, that is unnatural: why do not I use 'this' instead of 'i_classB'? But anyway, what is guarded in the guarded blocks is the state of a 'ClassA' instance, not the state of 'i_classB', whose monitor is used: 'i_classB' is used solely for monitoring purpose.

And I have to use 'i_classB.wait ()', etc., not just 'wait ()', which means 'this.wait ()', in the above example.


3: The Number of Threads Inside the Guarded Blocks


Hypothesizer 7
I have to get rid of the notion, "only one thread can enter the guarded blocks at the same time".

Instead, I have to adopt this: "only one thread can be running in the guarded blocks at the same time".

In fact, if a thread enters a guarded block and waits inside it, another thread can enter any guarded block and also may wait inside it. So, any number of threads can be inside the guarded blocks.

Howeve, I can be sure that only one thread is really running inside the guarded blocks.


4: The State Change After the Waiting


Hypothesizer 7
This is obvious in careful thought, but seems a usual stumbling block for me: the state recognition before going into waiting is no longer valid after the waiting.

After the thread had gone into waiting, some other threads may have entered the guarded blocks and changed the status (in fact, usually, that is the purpose of waiting).

So, the resuming thread has to assume that it is in a new world order now, and look around the world in the unprejudiced eyes.


5: What 'notify' or 'notifyAll' does


Hypothesizer 7
I cannot assume that 'notify' or 'notifyAll' makes a notified thread begin to run.

First, 'notify' or 'notifyAll' does not release the lock, so, until the notifying thread leaves the guarded blocks, there is no possibility that any notified thread begins to run.

Second, even after the notifying thread leaves the guarded blocks, the notifying thread may just keep running in a single core single CPU system, because why not? Or a non-notified anticipating thread, instead of a notified thread, may acquire the lock.

I understand that notifying means just moving a waiting thread or all the waiting threads into the anticipating threads list; which thread is given the lock among the anticipating threads including the non-notified ones is another story.

In fact, this is a result of the above code in a single core single CPU Linux computer, which, of course, may be different in another environment or timing.

@Output
### the thread -> '0' before the guarded block               : 'i_integer' -> 0.
### the thread -> '0' in     the guarded block               : 'i_integer' -> 1.
### the thread -> '5' before the guarded block               : 'i_integer' -> 1.
### the thread -> '4' before the guarded block               : 'i_integer' -> 1.
### the thread -> '3' before the guarded block               : 'i_integer' -> 1.
### the thread -> '2' before the guarded block               : 'i_integer' -> 1.
### the thread -> '1' before the guarded block               : 'i_integer' -> 1.
### the thread -> '0' after  the guarded block               : 'i_integer' -> 1.
### the thread -> '1' in     the guarded block               : 'i_integer' -> 2.
### the thread -> '2' in     the guarded block before waiting: 'i_integer' -> 3.
### the thread -> '3' in     the guarded block before waiting: 'i_integer' -> 4.
### the thread -> '4' in     the guarded block               : 'i_integer' -> 5.
### the thread -> '1' after  the guarded block               : 'i_integer' -> 2.
### the thread -> '4' after  the guarded block               : 'i_integer' -> 5.
### the thread -> '5' in     the guarded block               : 'i_integer' -> 6.
### the thread -> '5' after  the guarded block               : 'i_integer' -> 6.
### the thread -> '3' in     the guarded block after  waiting: 'i_integer' -> 6.
### the thread -> '2' in     the guarded block after  waiting: 'i_integer' -> 6.
### the thread -> '3' after  the guarded block               : 'i_integer' -> 6.
### the thread -> '2' after  the guarded block               : 'i_integer' -> 6.

That result means that 1) the thread, '0', entered the guarded blocks and stayed there for a long time; 2) meanwhile, the threads, '5', '4', '3', '2', '1', entered the anticipating threads list, in that order; 3) the thread, '0', left the guarded blocks; 4) the thread, '1', entered and left the guarded blocks (although the "after the guarded block" message for the thread, '1', comes later, the thread, '1', should have left the guarded blocks immediately, which is proven by the value of 'i_integer'); 5) the thread, '2', entered the guarded blocks and began to wait; 6) the thread, '3', entered the guarded blocks and began to wait; 7) the thread, '4', entered and left the guarded blocks, while notifying the threads, '2' and '3'; 8) the thread, '5', entered and left the guarded blocks; 9) the thread, '3', left the guarded blocks; 10) the thread, '2', left the guarded blocks.

Well, the relation between the messaging order and the displayed messages order is uncertain (as "the thread -> '1' after the guarded block" comes later), it is certain that the thread, '5', resumed to run earlier than the notified threads , '2' and '3', did, because the notified threads saw the state changed by the thread, '5'.

As an aside, unexpectedly, threads seem to tend to be given the lock in the LIFO order than in the FIFO order, in my environment.


6: The Conclusion and Beyond


Hypothesizer 7
Now, I have some points to turn to when I code threads synchronization.

As some bugs related with threads synchronization tend to cause time-consuming debugging, writing the code right from the begining will be very beneficial.

I will record such points to note also for each of some other Java features in some future articles.


References


<The previous article in this series | The table of contents of this series | The next article in this series>

2020-03-29

2: A Java Objects Pipe, Which Conveys Objects Serially

<The previous article in this series | The table of contents of this series | The next article in this series>

Serially because otherwise, the memory would be used up. Does for objects what 'java.io.PipedWriter' + 'java.io.PipedReader' does for characters.

Topics


About: The Java programming language

The table of contents of this article


Starting Context


  • The reader has a basic knowledge on the Java programming language.

Target Context


  • The reader will know an objects pipe.

Orientation



Main Body


1: The Harm of Using Up the Memory


Hypothesizer 7
A major harm that can be caused by a program is using up the memory, which could paralyze the whole computer.

It is all the more annoying because the computer is occupied doing what is futile: thrashing. . . . If the system was busy doing my calculations, I would appreciate the effort.

The memory has been used up because the program has hoarded many data, do which, I have to ask, really have to have been piled up in the memory?

Let me think of an example.

@Java Source Code
import java.util.ArrayList;
import java.util.List;

~
	~
	
	public static ArrayList <Integer> prepareAndHoardIntegers () {
		ArrayList <Integer> l_integers = new ArrayList <Integer> ();
		for (int l_iterationIndex = 0; l_iterationIndex < 512; l_iterationIndex ++) {
			l_integers.add (Integer.valueOf (l_iterationIndex));
		}
		return l_integers;
	}
	
	public static void processPreparedAndHoardedIntegers (List <Integer> a_integers)  {
		int l_numberOfMultipleOf10s = 0;
		for (Integer l_integer: a_integers) {
			System.out.println (String.format ("### read: %d", l_integer.intValue ()));
			System.out.flush ();
			if (l_integer.intValue () % 10 == 0) {
				l_numberOfMultipleOf10s ++;
				System.out.println (String.format ("### a multiple of 10s is found."));
				System.out.flush ();
			}
		}
		System.out.println (String.format ("### the number of multiple of 10s is %d.", l_numberOfMultipleOf10s));
		System.out.flush ();
	}
	
	public static void test00 () throws Exception {
		ArrayList <Integer> l_integers = prepareAndHoardIntegers ();
		processPreparedAndHoardedIntegers (l_integers);
	}
~

That uses up the memory (if the data amount is large enough) because all the objects are stored up in the memory first, before beginning to be processed.

So, should it do just like this?

@Java Source Code
~
	~
	
	public static void prepareAndProcessIntegers () {
		Integer l_integer = null;
		int l_numberOfMultipleOf10s = 0;
		for (int l_iterationIndex = 0; l_iterationIndex < 512; l_iterationIndex ++) {
			l_integer = Integer.valueOf (l_iterationIndex);
			System.out.println (String.format ("### read: %d", l_integer.intValue ()));
			System.out.flush ();
			if (l_integer.intValue () % 10 == 0) {
				l_numberOfMultipleOf10s ++;
				System.out.println (String.format ("### a multiple of 10s is found."));
				System.out.flush ();
			}
		}
		System.out.println (String.format ("### the number of multiple of 10s is %d.", l_numberOfMultipleOf10s));
		System.out.flush ();
	}
	
	public static void test01 () {
		prepareAndProcessIntegers ();
	}
~

. . . Well, the problem is, the preparing function is meant to just prepare the objects, not process the objects, as the role. . . . In fact, the preparing function does not know anything about what to do with the objects.

Certainly, there is an end run, in which the function takes a processor instance of a specific interface, like this.

@Java Source Code
~
	~
	
	static interface IntegerProcessor {
		void process (Integer a_integer);
		void summarize ();
	}
	
	static public class AIntegerProcessor implements IntegerProcessor {
		private int i_numberOfMultipleOf10s = 0;
		
		@Override
		public void process (Integer a_integer) {
			System.out.println (String.format ("### read: %d", a_integer.intValue ()));
			System.out.flush ();
			if (a_integer.intValue () % 10 == 0) {
				i_numberOfMultipleOf10s ++;
				System.out.println (String.format ("### a multiple of 10s is found."));
				System.out.flush ();
			}
		}
		
		@Override
		public void summarize () {
			System.out.println (String.format ("### the number of multiple of 10s is %d.", i_numberOfMultipleOf10s));
			System.out.flush ();
		}
	}
	
	public static void prepareAndProcessIntegers (IntegerProcessor a_integerProcessor) {
		Integer l_integer = null;
		int l_numberOfMultipleOf10s = 0;
		for (int l_iterationIndex = 0; l_iterationIndex < 512; l_iterationIndex ++) {
			l_integer = Integer.valueOf (l_iterationIndex);
			a_integerProcessor.process (l_integer);
		}
		a_integerProcessor.summarize ();
	}
	
	public static void test02 () {
		prepareAndProcessIntegers (new AIntegerProcessor ());
	}
~

. . . However, the problem is, it is not necessarily that all the objects can be processed at a sitting: in the above code, the timing for each object to be processed is inescapably trapped inside the 'prepareAndProcessIntegers (IntegerProcessor a_integerProcessor)' function.

More generally speaking, as the preparing function is meant to be purely for preparing objects, the design should not be deformed cheap-jack-ly, or the deformation could erode the whole code.

In order to solve the problem without deforming anything, I need an objects pipe, which conveys objects serially from a place to another.


2: 'java.io.PipedWriter' + 'java.io.PipedReader', as an Inspiration


Hypothesizer 7
In fact, Java has 'java.io.PipedWriter' + 'java.io.PipedReader', if what are to be conveyed are characters.

They are used in a pair, and when some characters are serially written into a 'java.io.PipedWriter' instance, they are serially read from the 'java.io.PipedReader' instance that is connected to the 'java.io.PipedWriter' instance.

This is an example usage.

@Java Source Code
import java.io.PipedReader;
import java.io.PipedWriter;
import java.io.Reader;
import java.io.Writer;
~
	
	public static void prepareString (Writer a_writer) throws Exception {
		for (int l_iterationIndex = 0; l_iterationIndex < 512; l_iterationIndex ++) {
			a_writer.write (String.format ("%d", l_iterationIndex));
			System.out.println (String.format ("### written: %d", l_iterationIndex));
			System.out.flush ();
		}
	}
	
	public static void processString (Reader a_reader) throws Exception {
		int l_character = -1;
		int l_numberOf0s = 0;
		int l_codeOf0 = '0';
		while ((l_character = a_reader.read ()) != -1) {
			System.out.println (String.format ("### read: %c", (char) l_character));
			if (l_character == l_codeOf0) {
				l_numberOf0s ++;
				System.out.println (String.format ("### a 0 is found."));
				System.out.flush ();
			}
		}
		System.out.println (String.format ("### the number of 0s is %d.", l_numberOf0s));
		System.out.flush ();
	}
	
	public static void test1 () throws Exception {
		PipedWriter l_pipedStringWriter = new PipedWriter ();
		PipedReader l_pipedStringReader = new PipedReader (l_pipedStringWriter, 16);
		Thread l_subThread = new Thread (() -> {
			try {
				prepareString (l_pipedStringWriter);
			}
			catch (Exception l_exception) {
				l_exception.printStackTrace ();
			}
			finally {
				try {
					l_pipedStringWriter.close ();
				}
				catch (Exception l_exception) {
					l_exception.printStackTrace ();
				}
			}
		});
		l_subThread.start ();
		processString (l_pipedStringReader);
		l_pipedStringReader.close ();
		l_subThread.join ();
	}
~

This is an output.

@Output
### written: 0
### written: 1
### written: 2
### written: 3
### written: 4
### written: 5
### written: 6
### written: 7
### written: 8
### written: 9
### written: 10
### written: 11
### written: 12
### read: 0
### 0 is found.
### read: 1
### read: 2
### read: 3
### read: 4
### read: 5
### read: 6
### read: 7
### read: 8
### read: 9
### read: 1
### read: 0
### 0 is found.
### read: 1
### read: 1
### read: 1
### read: 2
~
### written: 506
### written: 507
### written: 508
### written: 509
### written: 510
### read: 5
### read: 0
### 0 is found.
### read: 6
### read: 5
### read: 0
### 0 is found.
### read: 7
### read: 5
### read: 0
### 0 is found.
### read: 8
### read: 5
### read: 0
### 0 is found.
### read: 9
### read: 5
### read: 1
### read: 0
### 0 is found.
### read: 5
### written: 511
### read: 1
### read: 1
### the number of 0s is 102.

What is good about it?

That "16" is the buffer size of the pipe, which means that the pipe never hoards data more than that. The writer writes if the pipe is not filled, or waits; the reader reads if the pipe is not empty (of course, the read data are disposed from the pipe), or waits.

The point is, the reader can read the pipe at the timing of its liking: the reader is not forced to immediately read a datum just because the writer has written the datum; the buffer may be filled if the reader neglects to read for a while, but the reader can just let the writer wait; on the other hand, if the reader happens to try to read the pipe to find the pipe empty, the reader will wait, which will let the writer do writing.


3: The Plan


Hypothesizer 7
So, is the object pipe just a buffer to which the writer writes and from which the reader reads?

Well, basically, yes, but the buffer has to be intelligent enough to appropriately handle concurrent accesses, allowing an access, making an access wait, awaking a waiting access, etc..

Besides, the writer has to have a means to notice that it has finished writing; the reader to notice that it has finished reading.

Besides, the writer or the reader may want to timeout in writing or reading.

The buffer has to be used cyclically in order to avoid moving data around: for example, when the buffer size is 16 and the writer writes 10 objects, the reader reads 5 objects, and the writer writes 10 objects, the valued area changes from '' to '0->9' to '5->9' to '5->15;0->3' (which I represent as just '5->3').

As for the algorithm to prioritize the writer over the reader, there may be some sophisticated options, but I have implemented 2 rather simple options: 1) the reader is awaken when the buffer has become full and 2) the reader is awaken when the buffer has become non-empty, while the writer is always awaken when the buffer has become non-full. . . . I have to be aware that the former option may make the reader wait unnecessarily for a long time if the writer does not write diligently, although it may be more efficient when the writer does so.

Usually, there are supposed to be a single writer and a single reader, but the possibility of multiple writers or multiple readers is not particularly ruled out, although I have not tested for such cases, yet.


4: The Code and Its Explanation


Hypothesizer 7
This is the code of my objects pipe class and its related classes.

'theBiasPlanet/coreUtilities/pipes/ObjectsPipe.java'

@Java Source Code
package theBiasPlanet.coreUtilities.pipes;

import java.util.ArrayList;
import java.util.List;
import theBiasPlanet.coreUtilities.constantsGroups.GeneralConstantsConstantsGroup;
import theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException;
import theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException;
import theBiasPlanet.coreUtilities.messagingHandling.Publisher;
import theBiasPlanet.coreUtilities.timersHandling.TimeOutException;

public class ObjectsPipe <T> {
	protected Object [] i_objects;
	protected int i_bufferLength = 0;
	// No data: i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber
	protected int i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
	protected int i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
	protected boolean i_isFinishedWriting = false;
	protected boolean i_isFinishedReading = false;
	protected boolean i_notificationIsDelayed = false;
	
	public ObjectsPipe (int a_bufferLength, boolean a_notificationIsDelayed) {
		i_bufferLength = a_bufferLength;
		i_objects = new Object [i_bufferLength];
		i_notificationIsDelayed = a_notificationIsDelayed;
	}
	
	@Override
	protected void finalize () {
	}
	
	protected boolean isEmptyWithoutLocking () {
		return i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber;
	}
	
	protected boolean isFullWithoutLocking () {
		return (i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == i_bufferLength) || (i_dataStartIndex != GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataStartIndex == i_dataUntilIndex);
	}
	
	// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	protected void writeWithoutLocking (T a_object, long a_timeOutPeriodInMilliseconds) throws NoMoreNeedsException, TimeOutException {
		if (i_isFinishedReading) {
			throw new NoMoreNeedsException ("");
		}
		if (i_isFinishedWriting) {
			i_isFinishedWriting = false;
		}
		while (true) {
			if (isFullWithoutLocking ()) {
				try {
					if (a_timeOutPeriodInMilliseconds == -1) {
						this.wait ();
					}
					else if (a_timeOutPeriodInMilliseconds == 0) {
					}
					else {
						this.wait (a_timeOutPeriodInMilliseconds);
					}
				}
				catch (InterruptedException l_exception) {
					Publisher.logErrorInformation (l_exception);
				}
			}
			// Checked again because the status may have changed while this thread was waiting.
			if (i_isFinishedReading) {
				throw new NoMoreNeedsException ("");
			}
			if (!isFullWithoutLocking ()) {
				boolean l_wasEmpty = isEmptyWithoutLocking ();
				if (i_dataUntilIndex == i_bufferLength) {
					i_objects [GeneralConstantsConstantsGroup.c_iterationStartNumber] = a_object;
					i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber + 1;
				}
				else {
					i_objects [i_dataUntilIndex] = a_object;
					i_dataUntilIndex ++;
				}
				if ( (!i_notificationIsDelayed && l_wasEmpty) || (i_notificationIsDelayed && isFullWithoutLocking ())) {
					this.notifyAll ();
				}
				return;
			}
			else {
				if (a_timeOutPeriodInMilliseconds != -1) {
					throw new TimeOutException ("");
				}
			}
		}
	}
	
	@SuppressWarnings("unchecked")
	// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	protected T readWithoutLocking (long a_timeOutPeriodInMilliseconds) throws NoMoreDataException, TimeOutException {
		T l_readObject = null;
		if (i_isFinishedReading) {
			i_isFinishedReading = false;
		}
		while (true) {
			if (isEmptyWithoutLocking ()) {
				if (!i_isFinishedWriting) {
					try {
						if (a_timeOutPeriodInMilliseconds == -1) {
							this.wait ();
						}
						else if (a_timeOutPeriodInMilliseconds == 0) {
						}
						else {
							this.wait (a_timeOutPeriodInMilliseconds);
						}
					}
					catch (InterruptedException l_exception) {
						Publisher.logErrorInformation (l_exception);
					}
				}
				else {
					throw new NoMoreDataException ("");
				}
			}
			// Checked again because the status may have changed while this thread was waiting.
			if (!isEmptyWithoutLocking ()) {
				boolean l_wasFull = isFullWithoutLocking ();
				l_readObject = (T) i_objects [i_dataStartIndex];
				i_objects [i_dataStartIndex] = null;
				i_dataStartIndex ++;
				if (i_dataStartIndex == i_dataUntilIndex) {
					i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
					i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
				}
				else {
					if (i_dataStartIndex == i_bufferLength) {
						i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
					}
				}
				if ( (!i_notificationIsDelayed && l_wasFull) || (i_notificationIsDelayed && isEmptyWithoutLocking ())) {
					this.notifyAll ();
				}
				return l_readObject;
			}
			else {
				if (i_isFinishedWriting) {
					throw new NoMoreDataException ("");
				}
				if (a_timeOutPeriodInMilliseconds != -1) {
					throw new TimeOutException ("");
				}
			}
		}
	}
	
	public synchronized boolean isEmpty () {
		return isEmptyWithoutLocking ();
	}
	
	public synchronized boolean isFull () {
		return isFullWithoutLocking ();
	}
	
	// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	public synchronized void write (T a_object, long a_timeOutPeriodInMilliseconds) throws NoMoreNeedsException, TimeOutException {
		writeWithoutLocking (a_object, a_timeOutPeriodInMilliseconds);
	}
	
	public synchronized void write (T a_object) throws NoMoreNeedsException {
		try {
			write (a_object, -1);
		}
		catch (TimeOutException l_exception) {
			// impossible
		}
	}
	
	// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	public synchronized int write (T [] a_objects, int a_offset, int a_length, long a_timeOutPeriodInMilliseconds) throws NoMoreNeedsException, TimeOutException {
		int l_writtenLength = 0;
		for (l_writtenLength = 0; l_writtenLength < a_length; l_writtenLength ++) {
			try {
				if (l_writtenLength == 0 || ! (isFullWithoutLocking ())) {
					writeWithoutLocking (a_objects [a_offset + l_writtenLength], a_timeOutPeriodInMilliseconds);
				}
			}
			catch (NoMoreNeedsException l_exception) {
				if (l_writtenLength == 0) {
					throw l_exception;
				}
				else {
					break;
				}
			}
		}
		return l_writtenLength;
	}
	
	public synchronized int write (T [] a_objects, int a_offset, int a_length) throws NoMoreNeedsException {
		try {
			return write (a_objects, a_offset, a_length, -1);
		}
		catch (TimeOutException l_exception) {
			// impossible
			return 0;
		}
	}
	
	// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	public synchronized T read (long a_timeOutPeriodInMilliseconds) throws NoMoreDataException, TimeOutException {
		return readWithoutLocking (a_timeOutPeriodInMilliseconds);
	}
	
	public synchronized T read () throws NoMoreDataException {
		try {
			return read (-1);
		}
		catch (TimeOutException l_exception) {
			// impossible
			return null;
		}
	}
	
	// a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	public synchronized int read (T [] a_objects, int a_offset, int a_length, long a_timeOutPeriodInMilliseconds) throws NoMoreDataException, TimeOutException {
		int l_readLength = 0;
		for (; l_readLength < a_length; l_readLength ++) {
			if ( (l_readLength == 0) || !isEmptyWithoutLocking ()) {
				a_objects [a_offset + l_readLength] = readWithoutLocking (a_timeOutPeriodInMilliseconds);
			}
			else {
				break;
			}
		}
		return l_readLength;
	}
	
	public synchronized int read (T [] a_objects, int a_offset, int a_length) throws NoMoreDataException {
		try {
			return read (a_objects,  a_offset,  a_length, -1);
		}
		catch (TimeOutException l_exception) {
			// impossible
			return 0;
		}
	}
	
	public synchronized List <T> readWholeData () {
		List <T> l_objectsList = new ArrayList <T> ();
		while (true) {
			try {
				l_objectsList.add (readWithoutLocking (-1));
			}
			catch (TimeOutException l_exception) {
				// impossible
			}
			catch (NoMoreDataException l_exception) {
				break;
			}
		}
		return l_objectsList;
	}
	
	public synchronized void finishWriting () {
		i_isFinishedWriting = true;
		this.notifyAll ();
	}
	
	public synchronized void finishReading () {
		i_isFinishedReading = true;
		this.notifyAll ();
	}
	
	public synchronized void reset () {
		i_isFinishedWriting = false;
		i_isFinishedReading = false;
		i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
		i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber;
	}
}

'theBiasPlanet/coreUtilities/constantsGroups/GeneralConstantsConstantsGroup.java'

@Java Source Code
package theBiasPlanet.coreUtilities.constantsGroups;

~

public interface GeneralConstantsConstantsGroup {
	~
	int c_iterationStartNumber = 0;
	~
}

'theBiasPlanet/coreUtilities/inputsHandling/NoMoreDataException.java'

@Java Source Code
package theBiasPlanet.coreUtilities.inputsHandling;

public class NoMoreDataException extends Exception {
	public NoMoreDataException (String a_message) {
		super (a_message);
	}
}

'theBiasPlanet/coreUtilities/inputsHandling/NoMoreNeedsException.java'

@Java Source Code
package theBiasPlanet.coreUtilities.inputsHandling;

public class NoMoreNeedsException extends Exception {
	public NoMoreNeedsException (String a_message) {
		super (a_message);
	}
}

'theBiasPlanet/coreUtilities/messagingHandling/Publisher.java' is totally omitted because the used method just writes logs.

'theBiasPlanet/coreUtilities/timersHandling/TimeOutException.java'

@Java Source Code
package theBiasPlanet.coreUtilities.timersHandling;

public class TimeOutException extends Exception {
	public TimeOutException (String a_message) {
		super (a_message);
	}
}

Note that because of the restriction that any Java generics type parameter is not allowed to be any primitive datum type, the objects pipe is an 'Objects' pipe, but of course, the wrapper class of any primitive datum type can be used.

The constructor takes the buffer size and the algorithm choice for awaking the readers.

'i_objects' is the buffer.

'i_dataStartIndex' and 'i_dataUntilIndex' are the start index and the until (without including) index of the valued area; when the buffer is empty, they are '0' and '0', respectively.

All the public methods except the constructor are synchronized on the 'this' object.

The 'readWithoutLocking (long a_timeOutPeriodInMilliseconds)' method has the loop on the basis that there may be some multiple readers: a reader that had been waiting on the empty buffer may have been awoken to find out that the written objects have been already snatched by the other readers, being required to wait once more (such points to note about threads synchronization will be discussed in the next article).

Likewise, the 'writeWithoutLocking (T a_object, long a_timeOutPeriodInMilliseconds)' method has the loop on the basis that there may be some multiple writers.

The behavior of the 'writeWithoutLocking (T a_object, long a_timeOutPeriodInMilliseconds)' method when the pipe has been declared finished reading is somehow different from the behavior of the 'readWithoutLocking (long a_timeOutPeriodInMilliseconds)' method when the pipe has been declared finished writing, because the writer will not need to write any more at all, while the reader will want to read up the objects already stored in the buffer.

Be aware that I have not intensively tested the code, yet, although I have used it in some simple cases, one of which is shown in the next section.


5: An Example Usage and an Execution Result


Hypothesizer 7
This is a sample program that uses the objects pipe.

@Java Source Code
import theBiasPlanet.coreUtilities.pipes.ObjectsPipe;
import theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException;
import theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException;

~
	~
	
	public static void prepareIntegers (ObjectsPipe <Integer> a_writer) throws Exception {
		for (int l_iterationIndex = 0; l_iterationIndex < 512; l_iterationIndex ++) {
			try {
				a_writer.write (Integer.valueOf (l_iterationIndex));
			}
			catch (NoMoreNeedsException l_exception) {
				break;
			}
			System.out.println (String.format ("### written: %d", l_iterationIndex));
			System.out.flush ();
		}
	}
	
	public static void processIntegers (ObjectsPipe <Integer> a_reader) throws Exception {
		Integer l_integer = null;
		int l_numberOfMultipleOf10s = 0;
		while (true) {
			try {
				l_integer = a_reader.read ();
			}
			catch (NoMoreDataException l_exception) {
				break;
			}
			System.out.println (String.format ("### read: %d", l_integer.intValue ()));
			System.out.flush ();
			if (l_integer.intValue () % 10 == 0) {
				l_numberOfMultipleOf10s ++;
				System.out.println (String.format ("### a multiple of 10s is found."));
				System.out.flush ();
			}
		}
		System.out.println (String.format ("### the number of multiple of 10s is %d.", l_numberOfMultipleOf10s));
		System.out.flush ();
	}
	
	public static void test2 () throws Exception {
		ObjectsPipe <Integer> l_integersPipe = new ObjectsPipe <Integer> (16, true);
		Thread l_subThread = new Thread (() -> {
			try {
				prepareIntegers (l_integersPipe);
			}
			catch (Exception l_exception) {
				l_exception.printStackTrace ();
			}
			finally {
				try {
					l_integersPipe.finishWriting ();
				}
				catch (Exception l_exception) {
					l_exception.printStackTrace ();
				}
			}
		});
		l_subThread.start ();
		processIntegers (l_integersPipe);
		l_subThread.join ();
	}
~

This is an output, in my single-core single-CPU Linux computer (which should be influencing the output).

@Output
### written: 0
### written: 1
### written: 2
### written: 3
### written: 4
### written: 5
### written: 6
### written: 7
### written: 8
### written: 9
### written: 10
### written: 11
### written: 12
### written: 13
### written: 14
### read: 0
### a multiple of 10s is found.
### read: 1
### read: 2
### read: 3
### read: 4
### read: 5
### read: 6
### written: 15
### written: 16
### written: 17
### read: 7
### read: 8
### written: 18
### written: 19
### written: 20
### written: 21
### read: 9
### read: 10
### a multiple of 10s is found.
### written: 22
### written: 23
### read: 11
### read: 12
### read: 13
### read: 14
### written: 24
### written: 25
### written: 26
### read: 15
~
### written: 496
### written: 497
### written: 498
### written: 499
### written: 500
### written: 501
### written: 502
### read: 486
### read: 487
### read: 488
### read: 489
### read: 490
### a multiple of 10s is found.
### read: 491
### read: 492
### read: 493
### read: 494
### read: 495
### read: 496
### read: 497
### read: 498
### read: 499
### read: 500
### a multiple of 10s is found.
### read: 501
### written: 503
### written: 504
### written: 505
### written: 506
### written: 507
### written: 508
### written: 509
### written: 510
### written: 511
### read: 502
### read: 503
### read: 504
### read: 505
### read: 506
### read: 507
### read: 508
### read: 509
### read: 510
### a multiple of 10s is found.
### read: 511
### the number of multiple of 10s is 52.

. . . Well, what happened is that the reader acted first and was made waiting because the pipe was empty; the writer wrote to the full and awoke the reader; the reader began to read; and so on, I guess. . . . Note that '15' should have been written before the reader was awoken although "### written: 15" was delayed shown: immediately after the 'write' method was finished, the control switched to the reading thread, with the messaging delayed until the control was returned to the writing thread.

When the notification timing mode is set 'false', this is the output.

@Output
### read: 0
### a multiple of 10s is found.
### written: 0
### written: 1
### written: 2
### written: 3
### read: 1
### read: 2
### read: 3
### read: 4
### written: 4
### written: 5
### written: 6
### written: 7
### read: 5
### read: 6
### read: 7
### read: 8
### written: 8
### written: 9
### written: 10
### written: 11
### read: 9
### written: 12
### written: 13
### read: 10
### a multiple of 10s is found.
### read: 11
### read: 12
### written: 14
### written: 15
### written: 16
### written: 17
### read: 13
### read: 14
### read: 15
~
### written: 496
### read: 481
### read: 482
### read: 483
### read: 484
### read: 485
### read: 486
### read: 487
### read: 488
### read: 489
### written: 497
### written: 498
### written: 499
### written: 500
### written: 501
### written: 502
### written: 503
### written: 504
### written: 505
### written: 506
### read: 490
### a multiple of 10s is found.
### read: 491
### read: 492
### read: 493
### read: 494
### read: 495
### read: 496
### read: 497
### read: 498
### read: 499
### read: 500
### a multiple of 10s is found.
### read: 501
### read: 502
### written: 507
### written: 508
### written: 509
### written: 510
### written: 511
### read: 503
### read: 504
### read: 505
### read: 506
### read: 507
### read: 508
### read: 509
### read: 510
### a multiple of 10s is found.
### read: 511
### the number of multiple of 10s is 52.

. . . The reader acted first and was made waiting because the pipe was empty; the writer wrote '0' and awoke the reader; the reader began to read; and so on, I guess. . . . Again, "### written: 0" was delayed shown.

Anyway, only small number of slots in the buffer seem to tend to be used.

As I have measured the times taken (with the costly messagings eliminated) in the 2 modes (5 times each mode), the times were {'123,607,654', '113,359,567', '129,854,341', '87,619,575', and '99,663,616': the average -> '110,820,951'} and {'108,841,378', '154,165,815', '140,925,679', '111,987,546', and '113,260,213': the average -> '125,836,126'}, respectively, in nanoseconds. Is the difference significant? . . . I have not assessed it statistically.


6: The Conclusion and Beyond


Hypothesizer 7
Now, I have an objects pipe.

The benefit of any objects pipe is that the memory space will not be exhausted however many objects are processed. In fact, the above example program does not cause any buffer overflow however large the number of iteration is set.

I will translate the Java version into a C++ version, a C# version, and a Python version in some future articles.


References


<The previous article in this series | The table of contents of this series | The next article in this series>