Serially because otherwise, the memory would be used up. A for-objects Python equivalent to 'java.io.PipedWriter' + 'java.io.PipedReader'.
Topics
About: The Python programming language
The table of contents of this article
- Starting Context
- Target Context
- Main Body
- 1: The Motivation and the Plan Are the Same for an Already Introduced Java Objects Pipe
- 2: The Code and Its Explanation
- 3: An Example Usage and an Execution Result
- 4: The Conclusion and Beyond
Starting Context
- The reader has basic knowledge on the Python programming language.
Target Context
- The reader will know an objects pipe.
Main Body
1: The Motivation and the Plan Are the Same for an Already Introduced Java Objects Pipe
Hypothesizer 7
In fact, an article of another series has already introduced a Java objects pipe.
I refrain from repeating the motivation and the plan that are described there.
Although there is no Python equivalent for Java 'java.io.PipedWriter' + 'java.io.PipedReader' (as far as I know), this objects pipe can be used instead, because any character is an object.
2: The Code and Its Explanation
Hypothesizer 7
This is the code of my objects pipe class and its related classes.
'theBiasPlanet/coreUtilities/pipes/ObjectsPipe.py'
@Python Source Code
from typing import Generic
from typing import List
from typing import Optional
from typing import Type
from typing import TypeVar
from typing import cast
import sys
from threading import Condition
from theBiasPlanet.coreUtilities.constantsGroups.GeneralConstantsConstantsGroup import GeneralConstantsConstantsGroup
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException import NoMoreDataException
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException import NoMoreNeedsException
from theBiasPlanet.coreUtilities.messagingHandling.Publisher import Publisher
from theBiasPlanet.coreUtilities.timersHandling.TimeOutException import TimeOutException
T = TypeVar ("T")
class ObjectsPipe (Generic [T]):
def __init__ (a_this: "ObjectsPipe", a_bufferLength: int, a_notificationIsDelayed: bool) -> None:
a_this.i_threadCondition: Condition
a_this.i_objects: List [object]
a_this.i_bufferLength: int = 0
# No data: i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber && i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber
a_this.i_dataStartIndex: int = GeneralConstantsConstantsGroup.c_iterationStartNumber
a_this.i_dataUntilIndex: int = GeneralConstantsConstantsGroup.c_iterationStartNumber
a_this.i_isFinishedWriting: bool = False
a_this.i_isFinishedReading: bool = False
a_this.i_notificationIsDelayed: bool = False
a_this.i_threadCondition = Condition ()
a_this.i_bufferLength = a_bufferLength
a_this.i_objects = [None] * a_this.i_bufferLength
a_this.i_notificationIsDelayed = a_notificationIsDelayed
def __del__ (a_this: "ObjectsPipe") -> None:
None
def isEmptyWithoutLocking (a_this: "ObjectsPipe") -> bool:
return a_this.i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber and a_this.i_dataUntilIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber
def isFullWithoutLocking (a_this: "ObjectsPipe") -> bool:
return (a_this.i_dataStartIndex == GeneralConstantsConstantsGroup.c_iterationStartNumber and a_this.i_dataUntilIndex == a_this.i_bufferLength) or (a_this.i_dataStartIndex != GeneralConstantsConstantsGroup.c_iterationStartNumber and a_this.i_dataStartIndex == a_this.i_dataUntilIndex)
# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
def writeWithoutLocking (a_this: "ObjectsPipe", a_object: Optional [T], a_timeOutPeriodInMilliseconds: int = -1) -> None:
if a_this.i_isFinishedReading:
raise NoMoreNeedsException ("")
if a_this.i_isFinishedWriting:
a_this.i_isFinishedWriting = False
while (True):
if a_this.isFullWithoutLocking ():
try:
if a_timeOutPeriodInMilliseconds == -1:
a_this.i_threadCondition.wait ()
elif a_timeOutPeriodInMilliseconds == 0:
None
else:
a_this.i_threadCondition.wait (a_timeOutPeriodInMilliseconds / 1000)
except (Exception) as l_exception:
Publisher.logErrorInformation (l_exception)
# Checked again because the status may have changed while this thread was waiting.
if a_this.i_isFinishedReading:
raise NoMoreNeedsException ("")
if not a_this.isFullWithoutLocking ():
l_wasEmpty: bool = a_this.isEmptyWithoutLocking ()
if a_this.i_dataUntilIndex == a_this.i_bufferLength:
a_this.i_objects [GeneralConstantsConstantsGroup.c_iterationStartNumber] = a_object
a_this.i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber + 1
else:
a_this.i_objects [a_this.i_dataUntilIndex] = a_object
a_this.i_dataUntilIndex = a_this.i_dataUntilIndex + 1
if ((not a_this.i_notificationIsDelayed) and l_wasEmpty) or (a_this.i_notificationIsDelayed and a_this.isFullWithoutLocking ()):
a_this.i_threadCondition.notifyAll ()
return
else:
if a_timeOutPeriodInMilliseconds != -1:
raise TimeOutException ("")
# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
def readWithoutLocking (a_this: "ObjectsPipe", a_timeOutPeriodInMilliseconds: int = -1) -> Optional [T]:
l_readObject: Optional [T] = None
if a_this.i_isFinishedReading:
a_this.i_isFinishedReading = False
while True:
if a_this.isEmptyWithoutLocking ():
if not a_this.i_isFinishedWriting:
try:
if a_timeOutPeriodInMilliseconds == -1:
a_this.i_threadCondition.wait ()
elif a_timeOutPeriodInMilliseconds == 0:
None
else:
a_this.i_threadCondition.wait (a_timeOutPeriodInMilliseconds / 1000)
except (Exception) as l_exception:
Publisher.logErrorInformation (l_exception)
else:
raise NoMoreDataException ("")
# Checked again because the status may have changed while this thread was waiting.
if not a_this.isEmptyWithoutLocking ():
l_wasFull: bool = a_this.isFullWithoutLocking ()
l_readObject = cast (T, a_this.i_objects [a_this.i_dataStartIndex])
a_this.i_objects [a_this.i_dataStartIndex] = None
a_this.i_dataStartIndex = a_this.i_dataStartIndex + 1
if a_this.i_dataStartIndex == a_this.i_dataUntilIndex:
a_this.i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber
a_this.i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber
else:
if a_this.i_dataStartIndex == a_this.i_bufferLength:
a_this.i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber
if ((not a_this.i_notificationIsDelayed) and l_wasFull) or (a_this.i_notificationIsDelayed and a_this.isEmptyWithoutLocking ()):
a_this.i_threadCondition.notifyAll ()
return l_readObject
else:
if a_this.i_isFinishedWriting:
raise NoMoreDataException ("")
if a_timeOutPeriodInMilliseconds != -1:
raise TimeOutException ("")
def isEmpty (a_this: "ObjectsPipe") -> bool:
try:
a_this.i_threadCondition.acquire ()
return a_this.isEmptyWithoutLocking ()
finally:
a_this.i_threadCondition.release ()
def isFull (a_this: "ObjectsPipe") -> bool:
try:
a_this.i_threadCondition.acquire ()
return a_this.isFullWithoutLocking ()
finally:
a_this.i_threadCondition.release ()
# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
def write (a_this: "ObjectsPipe", a_object: Optional [T], a_timeOutPeriodInMilliseconds: int = -1) -> None:
try:
a_this.i_threadCondition.acquire ()
a_this.writeWithoutLocking (a_object, a_timeOutPeriodInMilliseconds)
finally:
a_this.i_threadCondition.release ()
# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
def write_1 (a_this: "ObjectsPipe", a_objects: List [Optional [T]], a_offset: int, a_length: int, a_timeOutPeriodInMilliseconds: int = -1) -> int:
try:
a_this.i_threadCondition.acquire ()
l_writtenLength: int = 0
for l_writtenLength in range (0, a_length, 1):
try:
if (l_writtenLength == 0) or not a_this.isFullWithoutLocking ():
a_this.writeWithoutLocking (a_objects [a_offset + l_writtenLength], a_timeOutPeriodInMilliseconds)
except (NoMoreNeedsException) as l_exception:
if l_writtenLength == 0:
raise l_exception
else:
break
return l_writtenLength
finally:
a_this.i_threadCondition.release ()
# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
def read (a_this: "ObjectsPipe", a_timeOutPeriodInMilliseconds: int = -1) -> Optional [T]:
try:
a_this.i_threadCondition.acquire ()
return a_this.readWithoutLocking (a_timeOutPeriodInMilliseconds)
finally:
a_this.i_threadCondition.release ()
# a_timeOutPeriodInMilliseconds: -1 -> waits indefinitely, 0 -> not wait
def read_1 (a_this: "ObjectsPipe", a_objects: List [Optional [T]], a_offset: int, a_length: int, a_timeOutPeriodInMilliseconds: int = -1) -> int:
try:
a_this.i_threadCondition.acquire ()
l_readLength: int = 0
for l_readLength in range (0, a_length, 1):
if (l_readLength == 0) or not a_this.isEmptyWithoutLocking ():
a_objects [a_offset + l_readLength] = a_this.readWithoutLocking (a_timeOutPeriodInMilliseconds)
else:
break
return l_readLength
finally:
a_this.i_threadCondition.release ()
def readWholeData (a_this: "ObjectsPipe") -> List [Optional [T]]:
try:
a_this.i_threadCondition.acquire ()
l_objectsList: List [Optional [T]] = []
while True:
try:
l_objectsList.append (a_this.readWithoutLocking ())
except (NoMoreDataException) as l_exception:
break
return l_objectsList
finally:
a_this.i_threadCondition.release ()
def finishWriting (a_this: "ObjectsPipe") -> None:
try:
a_this.i_threadCondition.acquire ()
a_this.i_isFinishedWriting = True
a_this.i_threadCondition.notifyAll ()
finally:
a_this.i_threadCondition.release ()
def finishReading (a_this: "ObjectsPipe") -> None:
try:
a_this.i_threadCondition.acquire ()
a_this.i_isFinishedReading = True
a_this.i_threadCondition.notifyAll ()
finally:
a_this.i_threadCondition.release ()
def reset (a_this: "ObjectsPipe") -> None:
try:
a_this.i_threadCondition.acquire ()
a_this.i_isFinishedWriting = False
a_this.i_isFinishedReading = False
a_this.i_dataStartIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber
a_this.i_dataUntilIndex = GeneralConstantsConstantsGroup.c_iterationStartNumber
finally:
a_this.i_threadCondition.release ()
'theBiasPlanet/coreUtilities/constantsGroups/GeneralConstantsConstantsGroup.py'
@Python Source Code
~
class GeneralConstantsConstantsGroup:
~
c_iterationStartNumber: int = 0
~
'theBiasPlanet/coreUtilities/inputsHandling/NoMoreDataException.py'
@Python Source Code
class NoMoreDataException (Exception):
def __init__ (a_this: "NoMoreDataException", a_message: str) -> None:
super ().__init__ (a_message)
'theBiasPlanet/coreUtilities/inputsHandling/NoMoreNeedsException.py'
@Python Source Code
class NoMoreNeedsException (Exception):
def __init__ (a_this: "NoMoreNeedsException", a_message: str) -> None:
super ().__init__ (a_message)
'theBiasPlanet/coreUtilities/messagingHandling/Publisher.py' is totally omitted because the used method just writes logs.
'theBiasPlanet/coreUtilities/timersHandling/TimeOutException.py'
@Python Source Code
class TimeOutException (Exception):
def __init__ (a_this: "TimeOutException", a_message: str) -> None:
super ().__init__ (a_message)
The explanation of the code is almost the same for the Java version, but I will do the explanation including the duplication.
Note that the code has mypy annotations (as my any Python code does).
The constructor takes the buffer size and the algorithm choice for awaking the readers.
'i_objects' is the buffer.
'i_dataStartIndex' and 'i_dataUntilIndex' are the start index and the until (without including) index of the valued area; when the buffer is empty, they are '0' and '0', respectively.
All the supposedly-public methods except the constructor are synchronized on the 'a_this.i_threadCondition' object ("supposedly-public" means being intended (by me) to be used from outside the class, although Python does not allow any non-public method). In fact, the '~WithoutLocking' methods are supposedly-protected.
The 'readWithoutLocking (a_this: "ObjectsPipe", a_timeOutPeriodInMilliseconds: int = -1)' method has the loop on the basis that there may be some multiple readers: a reader that had been waiting on the empty buffer may have been awaken to find out that the written objects have been already snatched by the other readers, being required to wait once more.
Likewise, the 'writeWithoutLocking (a_this: "ObjectsPipe", a_object: Optional [T], a_timeOutPeriodInMilliseconds: int = -1)' method has the loop on the basis that there may be some multiple writers.
The behavior of the 'writeWithoutLocking (a_this: "ObjectsPipe", a_object: Optional [T], a_timeOutPeriodInMilliseconds: int = -1)' method when the pipe has been declared finished reading is somehow different from the behavior of the 'readWithoutLocking (a_this: "ObjectsPipe", a_timeOutPeriodInMilliseconds: int = -1)' method when the pipe has been declared finished writing, because the writer will not need to write any more at all, while the reader will want to read up the objects already stored in the buffer.
Be aware that I have not intensively tested the code, yet, although I have used it in some simple cases, one of which is shown in the next section.
3: An Example Usage and an Execution Result
Hypothesizer 7
This is a sample program that uses the objects pipe.
@Python Source Code
from typing import Optional
import sys
from threading import Thread
import traceback
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException import NoMoreDataException
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException import NoMoreNeedsException
from theBiasPlanet.coreUtilities.performanceMeasuringHandling.PerformanceMeasurer import PerformanceMeasurer
from theBiasPlanet.coreUtilities.pipes.ObjectsPipe import ObjectsPipe
~
~
@staticmethod
def prepareIntegers (a_writer: "ObjectsPipe [int]") -> None:
l_iterationIndex: int
for l_iterationIndex in range (0, 512, 1):
try:
a_writer.write (l_iterationIndex)
except (NoMoreNeedsException) as l_exception:
break
sys.stdout.write ("### written: {0:d}\n".format (l_iterationIndex))
sys.stdout.flush ()
@staticmethod
def processIntegers (a_reader: "ObjectsPipe [int]") -> None:
l_integer: Optional [int] = None
l_numberOfMultipleOf10s: int = 0
while True:
try:
l_integer = a_reader.read ()
except (NoMoreDataException) as l_exception:
break
sys.stdout.write ("### read: {0:d}\n".format (l_integer))
sys.stdout.flush ();
if l_integer % 10 == 0:
l_numberOfMultipleOf10s = l_numberOfMultipleOf10s + 1
sys.stdout.write ("### a multiple of 10s is found.\n".format ())
sys.stdout.flush ();
sys.stdout.write ("### the number of multiple of 10s is {0:d}.\n".format (l_numberOfMultipleOf10s))
sys.stdout.flush ()
@staticmethod
def test2 () -> None:
l_integersPipe: "ObjectsPipe [int]" = ObjectsPipe [int] (16, True)
PerformanceMeasurer.setStartTime ()
def l_subThreadFunction () -> None:
try:
Test1Test.prepareIntegers (l_integersPipe)
except (Exception) as l_exception:
sys.stdout.write ("{0:s}: {1:s}\n".format (str (l_exception), traceback.format_exc ()))
finally:
try:
l_integersPipe.finishWriting ()
except (Exception) as l_exception:
sys.stdout.write ("{0:s}: {1:s}\n".format (str (l_exception), traceback.format_exc ()))
l_subThread: Thread = Thread (target = l_subThreadFunction)
l_subThread.start ()
Test1Test.processIntegers (l_integersPipe)
l_subThread.join ()
sys.stdout.write ("### The elapsed time is {0:,d} ns.\n".format (PerformanceMeasurer.getElapseTimeInNanoSeconds ()))
~
This is an output, in my single-core single-CPU Linux computer (which may not be influencing the output, because Python has the appalling Global Interpreter Lock).
@Output
### written: 0
### read: 0
### a multiple of 10s is found.
### read: 1
### written: 1
### written: 2
### written: 3
### written: 4
### written: 5
### written: 6
### written: 7
### written: 8
### written: 9
### written: 10
### written: 11
### written: 12
### written: 13
### written: 14
### written: 15
### written: 16
### written: 17
### read: 2
### read: 3
### read: 4
### read: 5
### read: 6
### read: 7
### read: 8
### read: 9
### read: 10
### a multiple of 10s is found.
### read: 11
### read: 12
### read: 13
### read: 14
### read: 15
~
### written: 496
### written: 497
### written: 498
### written: 499
### written: 500
### written: 501
### written: 502
### written: 503
### read: 487
### read: 488
### read: 489
### read: 490
### a multiple of 10s is found.
### read: 491
### read: 492
### read: 493
### read: 494
### read: 495
### read: 496
### read: 497
### read: 498
### read: 499
### read: 500
### a multiple of 10s is found.
### read: 501
### read: 502
### read: 503
### written: 504
### written: 505
### written: 506
### written: 507
### written: 508
### written: 509
### written: 510
### written: 511
### read: 504
### read: 505
### read: 506
### read: 507
### read: 508
### read: 509
### read: 510
### a multiple of 10s is found.
### read: 511
### the number of multiple of 10s is 52.
. . . Well, what happened is that the writer acted first and wrote '0' and '1'; the reader read '0' and '1' and was made waiting because the pipe became empty; the writer wrote to the full and awoke the reader; the reader began to read; and so on, I guess. . . . Note that '1' should have been written before the reader started although "### written: 1" was delayed shown: immediately after the 'write' method was finished, the control switched to the reading thread, with the messaging delayed until the control was returned to the writing thread;
When the notification timing mode is set 'False', this is the output.
@Output
### written: 0
### read: 0
### a multiple of 10s is found.
### read: 1
### written: 1
### written: 2
### read: 2
### read: 3
### written: 3
### written: 4
### read: 4
### read: 5
### written: 5
### written: 6
### read: 6
### read: 7
### written: 7
### written: 8
### read: 8
### read: 9
### written: 9
### written: 10
### read: 10
### a multiple of 10s is found.
### read: 11
### written: 11
### written: 12
### read: 12
### read: 13
### written: 13
### written: 14
### read: 14
### read: 15
### written: 15
~
### written: 496
### written: 497
### written: 498
### written: 499
### written: 500
### written: 501
### written: 502
### written: 503
### written: 504
### written: 505
### written: 506
### written: 507
### written: 508
### read: 493
### read: 494
### read: 495
### read: 496
### read: 497
### read: 498
### read: 499
### read: 500
### a multiple of 10s is found.
### read: 501
### read: 502
### read: 503
### read: 504
### read: 505
### read: 506
### read: 507
### read: 508
### written: 509
### written: 510
### written: 511
### read: 509
### read: 510
### a multiple of 10s is found.
### read: 511
### the number of multiple of 10s is 52.
. . . The writer acted first and wrote '0' and '1'; the reader read '0' and '1' and was made waiting because the pipe became empty; the writer wrote '2', awoke the reader, and wrote '3'; the reader began reading; and so on, I guess. . . . Again, "### written: 1" was delayed shown.
Anyway, only small number of slots in the buffer seem to tend to be used.
As I have measured the times taken (with the costly messagings eliminated) in the 2 modes (5 times each mode), the times were {'14,296,000', '15,188,000', '14,926,000', '15,518,000', and '14,495,000': the average -> '14,884,600'} and {'15,321,000', '13,819,000', '15,230,000', '14,191,000', and '14,009,000': the average -> '14,514,000'}, respectively, in nanoseconds. I guess that the difference is not significant. . . . By the way, I have noticed that the Python version is much faster than the Java version, which is because I am using CPython, which is C based? Probably.
4: The Conclusion and Beyond
Hypothesizer 7
Now, I have an objects pipe.
The benefit of any objects pipe is that the memory space will not be exhausted however many objects are processed. In fact, the above sample program does not cause any buffer overflow however large the number of iteration is set.
The objects pipe can be used a string pipe as it is, but as some extra methods are useful for handling string pipe instances, I will create a specific string pipe class that inherits the objects pipe class, in the next article.