A universal solution, which lets any thread start waiting, be interrupted, and restart waiting at any time. The input can be also non-standard.
Topics
About: The Python programming language
The table of contents of this article
- Starting Context
- Target Context
- Orientation
- Main Body
- 1: My Requirement
- 2: Those Pieces of Advice That Do Not Work
- 3: The Plan
- 4: The Code
- 5: An Example Usage and an Execution Result
Starting Context
- The reader has a basic knowledge on the Python programming language.
Target Context
- The reader will have a universal solution for interrupting any standard input wait in Python.
Orientation
Here is a universal solution for interrupting any standard input wait in Python.
The string pipe introduced in the previous article is used.
The haltable reader created here will be ported to some other programming languages in some other series (for Java, C++, and C#).
Main Body
1: My Requirement
Hypothesizer 7
My requirement is this: to create a thread that waits for the standard input; after a while, to stop the thread; after a while, to create another thread that waits for the standard input; and so on.
I thought that that would be quite easy; I thought that there would be a way to stop the thread, a way to interrupt the standard input wait, a way to non-blocking-ly peek the input, or a way to timeout the wait.
2: Those Pieces of Advice That Do Not Work
Hypothesizer 7
A prevalent piece of advice for stopping a thread is to use a flag or a like mechanism . . .
You know, that does not work for me: the thread is not looping in Python, but is being stuck in a single method call!
Anther piece of advice is to use the 'ctypes.pythonapi.PyThreadState_SetAsyncExc' method.
Nice, but unfortunately, that does not work, either, because that method can interrupt only between Python lines, not inside any single Python line that consists of some C code.
Then, can I interrupt the standard input wait?
Well, the Python 'io.TextIOWrapper' class (whose instance 'sys.stdin' is) does not have such a method.
Then, can I non-blocking-ly peek the input?
The class does not seem to support that . . .
Then, can I timeout the wait?
That does not seem so . . .
Well, what then?
Another piece of advice is to close the input stream.
You know, I cannot close the standard input stream, because subsequent threads have to use it (the closed standard input stream cannot be reopened, right?).
I wondered whether I could programmatically send a dummy input datum to wake the thread, but that seems impossible with only the standard Python.
Another piece of advice is to send a 'signal.SIGINT' signal to the process.
Nice, but unfortunately, that works only if the thread is the main thread . . .
Another piece of advice is to use the 'select.select (rlist, wlist, xlist[, timeout])' function to check the state of the standard input.
Well, that seemed very promising, but had turned out to not work, either, because reading the single character disturbs the method: I mean, when the 4 characters string "ABC¥n" is inputted, the method reports that the input is ready, all right, but after reading only the single character "A", the method reports that the input is not ready, leaving my program to wonder whether it should read another character. . . . You know, as the string length is unknown, my program cannot know how many characters are there to be read.
Another piece of advice is to make the thread a daemon thread.
Obviously, that does not work, because that stops the thread only when the program ends, which is not what I want.
So, all the pieces of advice do not work! Why do I have to be stuck with this seemingly easy task, indeed?
3: The Plan
Hypothesizer 7
It is important to understand that something is impossible, because only after that, we can begin to consider the problem on that condition.
I have now understood that it is impossible to stop the thread that is waiting for the standard input. . . . No, I have to state it more correctly: it is impossible to stop the thread that is waiting for the standard input DIRECTLY.
That correct statement opens a way to a new idea: so, my thread should wait for the standard input INDIRECTLY. I mean, there should be a daemon thread that keeps waiting for the standard input and relays input data to my threads. It will be fine if the relaying mechanism is interruptible.
In fact, I already have an interruptible relaying mechanism: the string pipe introduced in the previous article.
It will be better to create a haltable standard input reader class as a subclass of a haltable input reader class, than to implement the logic in the program of the immediate concern.
4: The Code
Hypothesizer 7
This is the code of my haltable input reader class, which has mypy annotations (as my any Python code does).
'theBiasPlanet/coreUtilities/inputs/HaltableReader.py'
@Python Source Code
from typing import List
from typing import Optional
from typing import TextIO
from collections import OrderedDict
from io import StringIO
from threading import Thread
from theBiasPlanet.coreUtilities.constantsGroups.GeneralConstantsConstantsGroup import GeneralConstantsConstantsGroup
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException import NoMoreDataException
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException import NoMoreNeedsException
from theBiasPlanet.coreUtilities.locks.SharableLock import SharableLock
from theBiasPlanet.coreUtilities.messagingHandling.Publisher import Publisher
from theBiasPlanet.coreUtilities.pipes.StringPipe import StringPipe
from theBiasPlanet.coreUtilities.timersHandling.TimeOutException import TimeOutException
class HaltableReader:
def __init__ (a_this: "HaltableReader", a_underlyingStream: TextIO, a_bufferLength: int) -> None:
a_this.i_underlyingStream: TextIO
a_this.i_bufferLength: int
a_this.i_dispatchDataThread: Optional [Thread] = None
a_this.i_subscriberIdentificationToStringPipeMap: "OrderedDict [str, StringPipe]" = OrderedDict ()
a_this.i_sharableLock: "SharableLock" = SharableLock ()
a_this.i_underlyingStream = a_underlyingStream
a_this.i_bufferLength = a_bufferLength
# Any subscriber has to read consistently, or the other subscribers may be stuck because the dispatching thread will be stuck waiting to write to the string pipe for the neglecting subscriber.
def addSubscriber (a_this: "HaltableReader", a_subscriberIdentification: str) -> None:
try:
a_this.i_sharableLock.lockExclusively ()
a_this.i_subscriberIdentificationToStringPipeMap.update ({a_subscriberIdentification: StringPipe (a_this.i_bufferLength, False)})
finally:
a_this.i_sharableLock.unlockExclusively ()
def removeSubscriber (a_this: "HaltableReader", a_subscriberIdentification: str) -> None:
try:
a_this.i_sharableLock.lockSharedly ()
try:
l_stringPipe: "StringPipe" = a_this.i_subscriberIdentificationToStringPipeMap [a_subscriberIdentification]
l_stringPipe.finishWriting ()
except (KeyError) as l_exception:
None
finally:
a_this.i_sharableLock.unlockSharedly ()
try:
a_this.i_sharableLock.lockExclusively ()
a_this.i_subscriberIdentificationToStringPipeMap.pop (a_subscriberIdentification)
finally:
a_this.i_sharableLock.unlockExclusively ()
def startDispatchDataThread (a_this: "HaltableReader") -> None:
if a_this.i_dispatchDataThread is None:
def l_dispatchDataThreadFunction () -> None:
try:
l_data: str
while True:
l_data = a_this.i_underlyingStream.read (1)
if l_data == "":
break
l_subscriberIdentification: str = None
try:
a_this.i_sharableLock.lockSharedly ()
for l_subscriberIdentification in a_this.i_subscriberIdentificationToStringPipeMap:
a_this.i_subscriberIdentificationToStringPipeMap [l_subscriberIdentification].writeWholeString (StringIO (l_data))
finally:
a_this.i_sharableLock.unlockSharedly ()
except (EOFError) as l_exception:
None
except (Exception) as l_exception:
Publisher.logErrorInformation (l_exception)
finally:
try:
a_this.i_sharableLock.lockSharedly ()
for l_subscriberIdentification in a_this.i_subscriberIdentificationToStringPipeMap:
a_this.i_subscriberIdentificationToStringPipeMap [l_subscriberIdentification].finishWriting ()
finally:
a_this.i_sharableLock.unlockSharedly ()
a_this.i_dispatchDataThread = Thread (target = l_dispatchDataThreadFunction)
a_this.i_dispatchDataThread.daemon = True
a_this.i_dispatchDataThread.start ()
def close (a_this: "HaltableReader") -> None:
a_this.i_underlyingStream.close ()
def read (a_this: "HaltableReader", a_subscriberIdentification: str, a_maximumLength: int, a_timeOutPeriodInMilliseconds: int = -1) -> str:
l_stringPipe: "Optional [StringPipe]" = None
try:
a_this.i_sharableLock.lockSharedly ()
try:
l_stringPipe = a_this.i_subscriberIdentificationToStringPipeMap [a_subscriberIdentification]
except (KeyError) as l_exception:
None
finally:
a_this.i_sharableLock.unlockSharedly ()
if l_stringPipe is not None:
return l_stringPipe.readString (a_maximumLength, a_timeOutPeriodInMilliseconds)
else:
raise NoMoreNeedsException ("")
def readLine (a_this: "HaltableReader", a_subscriberIdentification: str, a_maximumLength: int, a_timeOutPeriodInMilliseconds: int = -1) -> str:
l_stringPipe: "Optional [StringPipe]" = None
try:
a_this.i_sharableLock.lockSharedly ()
try:
l_stringPipe = a_this.i_subscriberIdentificationToStringPipeMap.get (a_subscriberIdentification)
except (KeyError) as l_exception:
None
finally:
a_this.i_sharableLock.unlockSharedly ()
if l_stringPipe is not None:
return l_stringPipe.readStringLine (a_maximumLength, a_timeOutPeriodInMilliseconds)
else:
raise NoMoreNeedsException ("")
def isReady (a_this: "HaltableReader", a_subscriberIdentification: str) -> bool:
l_stringPipe: "Optional [StringPipe]" = None
try:
a_this.i_sharableLock.lockSharedly ()
try:
l_stringPipe = a_this.i_subscriberIdentificationToStringPipeMap [a_subscriberIdentification]
except (KeyError) as l_exception:
None
finally:
a_this.i_sharableLock.unlockSharedly ()
if l_stringPipe is not None:
return not (l_stringPipe.isEmpty ())
else:
return False
'theBiasPlanet/coreUtilities/constantsGroups/GeneralConstantsConstantsGroup.py'
@Python Source Code
~
class GeneralConstantsConstantsGroup:
c_emptyString: str = ""
~
'theBiasPlanet/coreUtilities/locks/SharableLock.py'
@Python Source Code
from typing import Optional
import threading
from threading import Condition
from threading import Thread
class SharableLock:
def __init__ (a_this: "SharableLock") -> None:
a_this.i_sharingLockingNumberOfTimes: int = 0
a_this.i_exclusiveLockingThread: Optional [Thread] = None
a_this.i_exclusiveLockingNumberOfTimes: int = 0
a_this.i_threadCondition: Condition = Condition ()
def lockSharedly (a_this: "SharableLock") -> None:
a_this.i_threadCondition.acquire ()
while a_this.i_exclusiveLockingNumberOfTimes > 0:
a_this.i_threadCondition.wait ()
a_this.i_sharingLockingNumberOfTimes = a_this.i_sharingLockingNumberOfTimes + 1
a_this.i_threadCondition.release ()
def lockExclusively (a_this: "SharableLock") -> None:
a_this.i_threadCondition.acquire ()
while a_this.i_sharingLockingNumberOfTimes > 0 or (a_this.i_exclusiveLockingThread is not None and threading.current_thread () != a_this.i_exclusiveLockingThread):
a_this.i_threadCondition.wait ()
if a_this.i_exclusiveLockingThread is None:
a_this.i_exclusiveLockingThread = threading.current_thread ()
a_this.i_exclusiveLockingNumberOfTimes = a_this.i_exclusiveLockingNumberOfTimes + 1
a_this.i_threadCondition.release ()
def unlockSharedly (a_this: "SharableLock") -> None:
a_this.i_threadCondition.acquire ()
if a_this.i_sharingLockingNumberOfTimes > 0:
a_this.i_sharingLockingNumberOfTimes = a_this.i_sharingLockingNumberOfTimes - 1
if a_this.i_sharingLockingNumberOfTimes == 0:
a_this.i_threadCondition.notify_all ()
a_this.i_threadCondition.release ()
def unlockExclusively (a_this: "SharableLock") -> None:
a_this.i_threadCondition.acquire ()
if a_this.i_exclusiveLockingNumberOfTimes > 0:
a_this.i_exclusiveLockingNumberOfTimes = a_this.i_exclusiveLockingNumberOfTimes - 1
if a_this.i_exclusiveLockingNumberOfTimes == 0:
a_this.i_exclusiveLockingThread = None
a_this.i_threadCondition.notify_all ()
a_this.i_threadCondition.release ()
'theBiasPlanet/coreUtilities/messagingHandling/Publisher.py' is totally omitted because the used method just writes logs.
This is the code of my haltable standard input reader class.
'theBiasPlanet/coreUtilities/inputs/HaltableStandardInputReader.py'
@Python Source Code
import sys
from theBiasPlanet.coreUtilities.constantsGroups.DefaultValuesConstantsGroup import DefaultValuesConstantsGroup
from theBiasPlanet.coreUtilities.inputs.HaltableReader import HaltableReader
class HaltableStandardInputReader (HaltableReader):
s_singletonInstance: "HaltableStandardInputReader" = None
def __init__ (a_this: "HaltableStandardInputReader") -> None:
super ().__init__ (sys.stdin, DefaultValuesConstantsGroup.c_smallBufferSize)
@staticmethod
def getInstance () -> "HaltableStandardInputReader":
if HaltableStandardInputReader.s_singletonInstance is None:
HaltableStandardInputReader.s_singletonInstance = HaltableStandardInputReader ()
return HaltableStandardInputReader.s_singletonInstance
There supposed to be only one standard input reader class instance, which is supposed to be gotten from the 'HaltableStandardInputReader.getInstance ()' method in order to make the instance a singleton (although it cannot be forced because Python does not allow any private constructor (or method), regrettably).
The 'HaltableReader.startDispatchDataThread' method has to be called in order to start the relaying thread (can be called multiple times without any harm).
Any thread can be registered as a subscriber to the standard input reader class instance by calling the 'HaltableReader.addSubscriber (a_this: "HaltableReader", a_subscriberIdentification: str)' method, and unsubscribing by the 'HaltableReader.removeSubscriber (a_this: "HaltableReader", a_subscriberIdentification: str)' method call will interrupt any input wait by the thread.
If the thread wants to use timeout for the wait, it can do so.
Multiple threads can subscribe to the standard input reader class instance, although that is not included in my immediate requirement.
I guess that there is no case for which this solution does not work.
5: An Example Usage and an Execution Result
Hypothesizer 7
This is an example usage.
@Python Source Code
from typing import List
from typing import Optional
from datetime import datetime
import sys
import threading
from threading import Thread
import time
from theBiasPlanet.coreUtilities.inputs.HaltableStandardInputReader import HaltableStandardInputReader
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreDataException import NoMoreDataException
from theBiasPlanet.coreUtilities.inputsHandling.NoMoreNeedsException import NoMoreNeedsException
from theBiasPlanet.coreUtilities.timersHandling.TimeOutException import TimeOutException
class Test1Test:
@staticmethod
def main (a_arguments: List [str]) -> None:
Test1Test.test4 ()
@staticmethod
def test4 () -> None:
l_haltableStandardInputReader: "HaltableStandardInputReader" = HaltableStandardInputReader.getInstance ()
l_haltableStandardInputReader.startDispatchDataThread ()
def l_workerThreadFunction () -> None:
l_data: Optional [str] = None
l_subscriberIdentification: str = str (id (threading.current_thread ()))
while True:
try:
l_data = l_haltableStandardInputReader.readLine (l_subscriberIdentification, 10, 10 * 1000)
sys.stdout.write ("### {0:s}: {1:s}: the standard input data: {2:s}\n".format (str (datetime.now ()), l_subscriberIdentification, l_data))
sys.stdout.flush ()
except (NoMoreDataException) as l_exception:
break
except (NoMoreNeedsException) as l_exception:
break
except (TimeOutException) as l_exception:
None
sys.stdout.write ("### {0:s}: {1:s}: ended\n".format (str (datetime.now ()), l_subscriberIdentification))
sys.stdout.flush ()
l_workerThreadA: Thread = Thread (target = l_workerThreadFunction)
l_subscriberIdentificationForWorkerThreadA: str = str (id (l_workerThreadA))
l_haltableStandardInputReader.addSubscriber (l_subscriberIdentificationForWorkerThreadA)
l_workerThreadB: Thread = Thread (target = l_workerThreadFunction)
l_subscriberIdentificationForWorkerThreadB: str = str (id (l_workerThreadB))
l_haltableStandardInputReader.addSubscriber (l_subscriberIdentificationForWorkerThreadB)
l_workerThreadA.start ()
l_workerThreadB.start ()
sys.stdout.write ("### {0:s}: sleeping . . .\n".format (str (datetime.now ())))
sys.stdout.flush ()
time.sleep (30)
l_haltableStandardInputReader.removeSubscriber (l_subscriberIdentificationForWorkerThreadA)
l_workerThreadA.join ()
sys.stdout.write ("### {0:s}: sleeping . . .\n".format (str (datetime.now ())))
sys.stdout.flush ()
time.sleep (30)
l_haltableStandardInputReader.removeSubscriber (l_subscriberIdentificationForWorkerThreadB)
l_workerThreadB.join ()
if __name__ == "__main__":
Test1Test.main (sys.argv)
This is an output.
@Output
### 2020-08-03 14:34:09.851530: sleeping . . .
ABC
### 2020-08-03 14:34:14.924620: 139963775811104: the standard input data: ABC
### 2020-08-03 14:34:14.924868: 139963775840512: the standard input data: ABC
### 2020-08-03 14:34:39.853083: 139963775811104: ended
### 2020-08-03 14:34:39.853977: sleeping . . .
DEF
### 2020-08-03 14:34:44.147788: 139963775840512: the standard input data: DEF
### 2020-08-03 14:35:09.854674: 139963775840512: ended