2020-06-21

6: A Python Objects Pipe, Which Conveys Objects Serially

<The previous article in this series | The table of contents of this series | The next article in this series>

Serially because otherwise, the memory would be used up. A for-objects Python equivalent to 'java.io.PipedWriter' + 'java.io.PipedReader'.

Topics


About: The Python programming language

The table of contents of this article


Starting Context


  • The reader has basic knowledge on the Python programming language.

Target Context


  • The reader will know an objects pipe.

Main Body


1: The Motivation and the Plan Are the Same for an Already Introduced Java Objects Pipe


Hypothesizer 7
In fact, an article of another series has already introduced a Java objects pipe.

I refrain from repeating the motivation and the plan that are described there.

Although there is no Python equivalent for Java 'java.io.PipedWriter' + 'java.io.PipedReader' (as far as I know), this objects pipe can be used instead, because any character is an object.


2: The Code and Its Explanation


Hypothesizer 7
This is the code of my objects pipe class and its related classes.

'theBiasPlanet/coreUtilities/pipes/ObjectsPipe.py'

@Python Source Code
from typing import Generic
from typing import List
from typing import Optional
from typing import Type
from typing import TypeVar
from typing import cast
import sys
from threading import Condition
from theBiasPlanet.coreUtilities.constantsGroups.GeneralConstantsConstantsGroup import GeneralConstantsConstantsGroup
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException import NoMoreDataException
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException import NoMoreNeedsException
from theBiasPlanet.coreUtilities.messagingHandling.Publisher import Publisher
from theBiasPlanet.coreUtilities.timersHandling.TimeOutException import TimeOutException

T = TypeVar ("T")

class ObjectsPipe (Generic [T]):
	def __init__ (a_this: "ObjectsPipe", a_bufferLength: int, a_notificationIsDelayed: bool) -> None:
		a_this.i_threadCondition: Condition
		a_this.i_objects: List [object]
		a_this.i_bufferLength: int = 0
		# No data: i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber
		a_this.i_dataStartIndex: int = GeneralConstantsConstantsGroup.c_iterationStartNumber
		a_this.i_dataUntilIndex: int = GeneralConstantsConstantsGroup.c_iterationStartNumber
		a_this.i_isFinishedWriting: bool = False
		a_this.i_isFinishedReading: bool = False
		a_this.i_notificationIsDelayed: bool = False
		
		a_this.i_threadCondition = Condition ()
		a_this.i_bufferLength = a_bufferLength
		a_this.i_objects = [None] * a_this.i_bufferLength
		a_this.i_notificationIsDelayed = a_notificationIsDelayed
	
	def __del__ (a_this: "ObjectsPipe") -> None:
		None
	
	def isEmptyWithoutLocking (a_this: "ObjectsPipe") -> bool:
		return a_this.i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber and a_this.i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber
	
	def isFullWithoutLocking (a_this: "ObjectsPipe") -> bool:
		return (a_this.i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber and a_this.i_dataUntilIndex == a_this.i_bufferLength) or (a_this.i_dataStartIndex != GeneralConstantsConstantsGroup.c_iterationStartNumber and a_this.i_dataStartIndex == a_this.i_dataUntilIndex)
	
	# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	def writeWithoutLocking (a_this: "ObjectsPipe", a_object: Optional [T], a_timeOutPeriodInMilliseconds: int = -1) -> None:
		if a_this.i_isFinishedReading:
			raise NoMoreNeedsException ("")
		if a_this.i_isFinishedWriting:
			a_this.i_isFinishedWriting = False
		while (True):
			if a_this.isFullWithoutLocking ():
				try:
					if a_timeOutPeriodInMilliseconds == -1:
						a_this.i_threadCondition.wait ()
					elif a_timeOutPeriodInMilliseconds == 0:
						None
					else:
						a_this.i_threadCondition.wait (a_timeOutPeriodInMilliseconds / 1000)
				except (Exception) as l_exception:
					Publisher.logErrorInformation (l_exception)
			# Checked again because the status may have changed while this thread was waiting.
			if a_this.i_isFinishedReading:
				raise NoMoreNeedsException ("")
			if not a_this.isFullWithoutLocking ():
				l_wasEmpty: bool = a_this.isEmptyWithoutLocking ()
				if a_this.i_dataUntilIndex == a_this.i_bufferLength:
					a_this.i_objects [GeneralConstantsConstantsGroup.c_iterationStartNumber] = a_object
					a_this.i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber + 1
				else:
					a_this.i_objects [a_this.i_dataUntilIndex] = a_object
					a_this.i_dataUntilIndex = a_this.i_dataUntilIndex + 1
				if ((not a_this.i_notificationIsDelayed) and l_wasEmpty) or (a_this.i_notificationIsDelayed and a_this.isFullWithoutLocking ()):
					a_this.i_threadCondition.notifyAll ()
				return
			else:
				if a_timeOutPeriodInMilliseconds != -1:
					raise TimeOutException ("")
	
	# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	def readWithoutLocking (a_this: "ObjectsPipe", a_timeOutPeriodInMilliseconds: int = -1) -> Optional [T]:
		l_readObject: Optional [T] = None
		if a_this.i_isFinishedReading:
			a_this.i_isFinishedReading = False
		while True:
			if a_this.isEmptyWithoutLocking ():
				if not a_this.i_isFinishedWriting:
					try:
						if a_timeOutPeriodInMilliseconds == -1:
							a_this.i_threadCondition.wait ()
						elif a_timeOutPeriodInMilliseconds == 0:
							None
						else:
							a_this.i_threadCondition.wait (a_timeOutPeriodInMilliseconds / 1000)
					except (Exception) as l_exception:
						Publisher.logErrorInformation (l_exception)
				else:
					raise NoMoreDataException ("")
			# Checked again because the status may have changed while this thread was waiting.
			if not a_this.isEmptyWithoutLocking ():
				l_wasFull: bool = a_this.isFullWithoutLocking ()
				l_readObject = cast (T, a_this.i_objects [a_this.i_dataStartIndex])
				a_this.i_objects [a_this.i_dataStartIndex] = None
				a_this.i_dataStartIndex = a_this.i_dataStartIndex + 1
				if a_this.i_dataStartIndex == a_this.i_dataUntilIndex:
					a_this.i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber
					a_this.i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber
				else:
					if a_this.i_dataStartIndex == a_this.i_bufferLength:
						a_this.i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber
				if ((not a_this.i_notificationIsDelayed) and l_wasFull) or (a_this.i_notificationIsDelayed and a_this.isEmptyWithoutLocking ()):
					a_this.i_threadCondition.notifyAll ()
				return l_readObject
			else:
				if a_this.i_isFinishedWriting:
					raise NoMoreDataException ("")
				if a_timeOutPeriodInMilliseconds != -1:
					raise TimeOutException ("")
	
	def isEmpty (a_this: "ObjectsPipe") -> bool:
		try:
			a_this.i_threadCondition.acquire ()
			return a_this.isEmptyWithoutLocking ()
		finally:
			a_this.i_threadCondition.release ()
	
	def isFull (a_this: "ObjectsPipe") -> bool:
		try:
			a_this.i_threadCondition.acquire ()
			return a_this.isFullWithoutLocking ()
		finally:
			a_this.i_threadCondition.release ()
	
	# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	def write (a_this: "ObjectsPipe", a_object: Optional [T], a_timeOutPeriodInMilliseconds: int = -1) -> None:
		try:
			a_this.i_threadCondition.acquire ()
			a_this.writeWithoutLocking (a_object, a_timeOutPeriodInMilliseconds)
		finally:
			a_this.i_threadCondition.release ()
	
	# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	def write_1 (a_this: "ObjectsPipe", a_objects: List [Optional [T]], a_offset: int, a_length: int, a_timeOutPeriodInMilliseconds: int = -1) -> int:
		try:
			a_this.i_threadCondition.acquire ()
			l_writtenLength: int = 0
			for l_writtenLength in range (0, a_length, 1):
				try:
					if (l_writtenLength == 0) or not a_this.isFullWithoutLocking ():
						a_this.writeWithoutLocking (a_objects [a_offset + l_writtenLength], a_timeOutPeriodInMilliseconds)
				except (NoMoreNeedsException) as l_exception:
					if l_writtenLength == 0:
						raise l_exception
					else:
						break
			return l_writtenLength
		finally:
			a_this.i_threadCondition.release ()
	
	# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	def read (a_this: "ObjectsPipe", a_timeOutPeriodInMilliseconds: int = -1) -> Optional [T]:
		try:
			a_this.i_threadCondition.acquire ()
			return a_this.readWithoutLocking (a_timeOutPeriodInMilliseconds)
		finally:
			a_this.i_threadCondition.release ()
	
	# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
	def read_1 (a_this: "ObjectsPipe", a_objects: List [Optional [T]], a_offset: int, a_length: int, a_timeOutPeriodInMilliseconds: int = -1) -> int:
		try:
			a_this.i_threadCondition.acquire ()
			l_readLength: int = 0
			for l_readLength in range (0, a_length, 1):
				if (l_readLength == 0) or not a_this.isEmptyWithoutLocking ():
					a_objects [a_offset + l_readLength] = a_this.readWithoutLocking (a_timeOutPeriodInMilliseconds)
				else:
					break
			return l_readLength
		finally:
			a_this.i_threadCondition.release ()
	
	
	def readWholeData (a_this: "ObjectsPipe") -> List [Optional [T]]:
		try:
			a_this.i_threadCondition.acquire ()
			l_objectsList: List [Optional [T]] = []
			while True:
				try:
					l_objectsList.append (a_this.readWithoutLocking ())
				except (NoMoreDataException) as l_exception:
					break
			return l_objectsList
		finally:
			a_this.i_threadCondition.release ()
	
	def finishWriting (a_this: "ObjectsPipe") -> None:
		try:
			a_this.i_threadCondition.acquire ()
			a_this.i_isFinishedWriting = True
			a_this.i_threadCondition.notifyAll ()
		finally:
			a_this.i_threadCondition.release ()
	
	def finishReading (a_this: "ObjectsPipe") -> None:
		try:
			a_this.i_threadCondition.acquire ()
			a_this.i_isFinishedReading = True
			a_this.i_threadCondition.notifyAll ()
		finally:
			a_this.i_threadCondition.release ()
	
	def reset (a_this: "ObjectsPipe") -> None:
		try:
			a_this.i_threadCondition.acquire ()
			a_this.i_isFinishedWriting = False
			a_this.i_isFinishedReading = False
			a_this.i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber
			a_this.i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber
		finally:
			a_this.i_threadCondition.release ()


'theBiasPlanet/coreUtilities/constantsGroups/GeneralConstantsConstantsGroup.py'

@Python Source Code
~

class GeneralConstantsConstantsGroup:
	~
	c_iterationStartNumber: int = 0
	~


'theBiasPlanet/coreUtilities/inputsHandling/NoMoreDataException.py'

@Python Source Code

class NoMoreDataException (Exception):
	def __init__ (a_this: "NoMoreDataException", a_message: str) -> None:
		
		super ().__init__ (a_message)


'theBiasPlanet/coreUtilities/inputsHandling/NoMoreNeedsException.py'

@Python Source Code

class NoMoreNeedsException (Exception):
	def __init__ (a_this: "NoMoreNeedsException", a_message: str) -> None:
		
		super ().__init__ (a_message)


'theBiasPlanet/coreUtilities/messagingHandling/Publisher.py' is totally omitted because the used method just writes logs.

'theBiasPlanet/coreUtilities/timersHandling/TimeOutException.py'

@Python Source Code

class TimeOutException (Exception):
	def __init__ (a_this: "TimeOutException", a_message: str) -> None:
		
		super ().__init__ (a_message)


The explanation of the code is almost the same for the Java version, but I will do the explanation including the duplication.

Note that the code has mypy annotations (as my any Python code does).

The constructor takes the buffer size and the algorithm choice for awaking the readers.

'i_objects' is the buffer.

'i_dataStartIndex' and 'i_dataUntilIndex' are the start index and the until (without including) index of the valued area; when the buffer is empty, they are '0' and '0', respectively.

All the supposedly-public methods except the constructor are synchronized on the 'a_this.i_threadCondition' object ("supposedly-public" means being intended (by me) to be used from outside the class, although Python does not allow any non-public method). In fact, the '~WithoutLocking' methods are supposedly-protected.

The 'readWithoutLocking (a_this: "ObjectsPipe", a_timeOutPeriodInMilliseconds: int = -1)' method has the loop on the basis that there may be some multiple readers: a reader that had been waiting on the empty buffer may have been awaken to find out that the written objects have been already snatched by the other readers, being required to wait once more.

Likewise, the 'writeWithoutLocking (a_this: "ObjectsPipe", a_object: Optional [T], a_timeOutPeriodInMilliseconds: int = -1)' method has the loop on the basis that there may be some multiple writers.

The behavior of the 'writeWithoutLocking (a_this: "ObjectsPipe", a_object: Optional [T], a_timeOutPeriodInMilliseconds: int = -1)' method when the pipe has been declared finished reading is somehow different from the behavior of the 'readWithoutLocking (a_this: "ObjectsPipe", a_timeOutPeriodInMilliseconds: int = -1)' method when the pipe has been declared finished writing, because the writer will not need to write any more at all, while the reader will want to read up the objects already stored in the buffer.

Be aware that I have not intensively tested the code, yet, although I have used it in some simple cases, one of which is shown in the next section.


3: An Example Usage and an Execution Result


Hypothesizer 7
This is a sample program that uses the objects pipe.

@Python Source Code
from typing import Optional
import sys
from threading import Thread
import traceback
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException import NoMoreDataException
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException import NoMoreNeedsException
from theBiasPlanet.coreUtilities.performanceMeasuringHandling.PerformanceMeasurer import PerformanceMeasurer
from theBiasPlanet.coreUtilities.pipes.ObjectsPipe import ObjectsPipe

~
	~
	
	@staticmethod
	def prepareIntegers (a_writer: "ObjectsPipe [int]") -> None:
		l_iterationIndex: int
		for l_iterationIndex in range (0, 512, 1):
			try:
				a_writer.write (l_iterationIndex)
			except (NoMoreNeedsException) as l_exception:
				break
			sys.stdout.write ("### written: {0:d}\n".format (l_iterationIndex))
			sys.stdout.flush ()
	
	@staticmethod
	def processIntegers (a_reader: "ObjectsPipe [int]") -> None:
		l_integer: Optional [int] = None
		l_numberOfMultipleOf10s: int = 0
		while True:
			try:
				l_integer = a_reader.read ()
			except (NoMoreDataException) as l_exception:
				break
			sys.stdout.write ("### read: {0:d}\n".format (l_integer))
			sys.stdout.flush ();
			if l_integer % 10 == 0:
				l_numberOfMultipleOf10s = l_numberOfMultipleOf10s + 1
				sys.stdout.write ("### a multiple of 10s is found.\n".format ())
				sys.stdout.flush ();
		sys.stdout.write ("### the number of multiple of 10s is {0:d}.\n".format (l_numberOfMultipleOf10s))
		sys.stdout.flush ()
	
	@staticmethod
	def  test2 () -> None:
		l_integersPipe: "ObjectsPipe [int]" = ObjectsPipe [int] (16, True)
		PerformanceMeasurer.setStartTime ()
		def l_subThreadFunction () -> None:
			try:
				Test1Test.prepareIntegers (l_integersPipe)
			except (Exception) as l_exception:
				sys.stdout.write ("{0:s}: {1:s}\n".format (str (l_exception), traceback.format_exc ()))
			finally:
				try:
					l_integersPipe.finishWriting ()
				except (Exception) as l_exception:
					sys.stdout.write ("{0:s}: {1:s}\n".format (str (l_exception), traceback.format_exc ()))
		l_subThread: Thread = Thread (target = l_subThreadFunction)
		l_subThread.start ()
		Test1Test.processIntegers (l_integersPipe)
		l_subThread.join ()
		sys.stdout.write ("### The elapsed time is {0:,d} ns.\n".format (PerformanceMeasurer.getElapseTimeInNanoSeconds ()))

~

This is an output, in my single-core single-CPU Linux computer (which may not be influencing the output, because Python has the appalling Global Interpreter Lock).

@Output
### written: 0
### read: 0
### a multiple of 10s is found.
### read: 1
### written: 1
### written: 2
### written: 3
### written: 4
### written: 5
### written: 6
### written: 7
### written: 8
### written: 9
### written: 10
### written: 11
### written: 12
### written: 13
### written: 14
### written: 15
### written: 16
### written: 17
### read: 2
### read: 3
### read: 4
### read: 5
### read: 6
### read: 7
### read: 8
### read: 9
### read: 10
### a multiple of 10s is found.
### read: 11
### read: 12
### read: 13
### read: 14
### read: 15
~
### written: 496
### written: 497
### written: 498
### written: 499
### written: 500
### written: 501
### written: 502
### written: 503
### read: 487
### read: 488
### read: 489
### read: 490
### a multiple of 10s is found.
### read: 491
### read: 492
### read: 493
### read: 494
### read: 495
### read: 496
### read: 497
### read: 498
### read: 499
### read: 500
### a multiple of 10s is found.
### read: 501
### read: 502
### read: 503
### written: 504
### written: 505
### written: 506
### written: 507
### written: 508
### written: 509
### written: 510
### written: 511
### read: 504
### read: 505
### read: 506
### read: 507
### read: 508
### read: 509
### read: 510
### a multiple of 10s is found.
### read: 511
### the number of multiple of 10s is 52.

. . . Well, what happened is that the writer acted first and wrote '0' and '1'; the reader read '0' and '1' and was made waiting because the pipe became empty; the writer wrote to the full and awoke the reader; the reader began to read; and so on, I guess. . . . Note that '1' should have been written before the reader started although "### written: 1" was delayed shown: immediately after the 'write' method was finished, the control switched to the reading thread, with the messaging delayed until the control was returned to the writing thread;

When the notification timing mode is set 'False', this is the output.

@Output
### written: 0
### read: 0
### a multiple of 10s is found.
### read: 1
### written: 1
### written: 2
### read: 2
### read: 3
### written: 3
### written: 4
### read: 4
### read: 5
### written: 5
### written: 6
### read: 6
### read: 7
### written: 7
### written: 8
### read: 8
### read: 9
### written: 9
### written: 10
### read: 10
### a multiple of 10s is found.
### read: 11
### written: 11
### written: 12
### read: 12
### read: 13
### written: 13
### written: 14
### read: 14
### read: 15
### written: 15
~
### written: 496
### written: 497
### written: 498
### written: 499
### written: 500
### written: 501
### written: 502
### written: 503
### written: 504
### written: 505
### written: 506
### written: 507
### written: 508
### read: 493
### read: 494
### read: 495
### read: 496
### read: 497
### read: 498
### read: 499
### read: 500
### a multiple of 10s is found.
### read: 501
### read: 502
### read: 503
### read: 504
### read: 505
### read: 506
### read: 507
### read: 508
### written: 509
### written: 510
### written: 511
### read: 509
### read: 510
### a multiple of 10s is found.
### read: 511
### the number of multiple of 10s is 52.

. . . The writer acted first and wrote '0' and '1'; the reader read '0' and '1' and was made waiting because the pipe became empty; the writer wrote '2', awoke the reader, and wrote '3'; the reader began reading; and so on, I guess. . . . Again, "### written: 1" was delayed shown.

Anyway, only small number of slots in the buffer seem to tend to be used.

As I have measured the times taken (with the costly messagings eliminated) in the 2 modes (5 times each mode), the times were {'14,296,000', '15,188,000', '14,926,000', '15,518,000', and '14,495,000': the average -> '14,884,600'} and {'15,321,000', '13,819,000', '15,230,000', '14,191,000', and '14,009,000': the average -> '14,514,000'}, respectively, in nanoseconds. I guess that the difference is not significant. . . . By the way, I have noticed that the Python version is much faster than the Java version, which is because I am using CPython, which is C based? Probably.


4: The Conclusion and Beyond


Hypothesizer 7
Now, I have an objects pipe.

The benefit of any objects pipe is that the memory space will not be exhausted however many objects are processed. In fact, the above sample program does not cause any buffer overflow however large the number of iteration is set.

The objects pipe can be used a string pipe as it is, but as some extra methods are useful for handling string pipe instances, I will create a specific string pipe class that inherits the objects pipe class, in the next article.


References


<The previous article in this series | The table of contents of this series | The next article in this series>