Builder for defining tests relating to IReactorThreads.

Method test_getThreadPool reactor.getThreadPool() returns an instance of ThreadPool which starts when reactor.run() is called and stops before it returns.
Method test_suggestThreadPoolSize reactor.suggestThreadPoolSize() sets the maximum size of the reactor threadpool.
Method test_delayedCallFromThread A function scheduled with IReactorThreads.callFromThread invoked from a delayed call is run immediately in the next reactor iteration.
Method test_callFromThread A function scheduled with IReactorThreads.callFromThread invoked from another thread is run in the reactor thread.
Method test_stopThreadPool When the reactor stops, ReactorBase._stopThreadPool drops the reactor's direct reference to its internal threadpool and removes the associated startup and shutdown triggers.
Method test_stopThreadPoolWhenStartedAfterReactorRan We must handle the case of shutting down the thread pool when it was started after the reactor was run in a special way.
Method test_cleanUpThreadPoolEvenBeforeReactorIsRun When the reactor has its shutdown event fired before it is run, the thread pool is completely destroyed.
Method test_isInIOThread The reactor registers itself as the I/O thread when it runs so that twisted.python.threadable.isInIOThread returns True if it is called in the thread the reactor is running in.
Method test_isNotInIOThread The reactor registers itself as the I/O thread when it runs so that twisted.python.threadable.isInIOThread returns False if it is called in a different thread than the reactor is running in.

Inherited from ReactorBuilder:

Class Variable skippedReactors A dict mapping FQPN strings of reactors for which the tests defined by this class will be skipped to strings giving the skip message.
Class Variable requiredInterfaces A list of interfaces which the reactor must provide or these tests will be skipped. The default, None, means that no interfaces are required.
Instance Variable reactorFactory A no-argument callable which returns the reactor to use for testing.
Instance Variable originalHandler The SIGCHLD handler which was installed when setUp ran and which will be re-installed when tearDown runs.
Method setUp Clear the SIGCHLD handler, if there is one, to ensure an environment like the one which exists prior to a call to reactor.run.
Method tearDown Restore the original SIGCHLD handler and reap processes as long as there seem to be any remaining.
Method unbuildReactor Clean up any resources which may have been allocated for the given reactor by its creation or by a test which used it.
Method buildReactor Create and return a reactor using self.reactorFactory.
Method getTimeout Determine how long to run the test before considering it failed.
Method runReactor Run the reactor for at most the given amount of time.
Class Method makeTestCaseClasses Create a SynchronousTestCase subclass which mixes in cls for each known reactor and return a dict mapping their names to them.
Instance Variable _reactors A list of FQPN strings giving the reactors for which SynchronousTestCases will be created.
def test_getThreadPool(self):

reactor.getThreadPool() returns an instance of ThreadPool which starts when reactor.run() is called and stops before it returns.

def test_suggestThreadPoolSize(self):

reactor.suggestThreadPoolSize() sets the maximum size of the reactor threadpool.

def test_delayedCallFromThread(self):

A function scheduled with IReactorThreads.callFromThread invoked from a delayed call is run immediately in the next reactor iteration.

When invoked from the reactor thread, previous implementations of IReactorThreads.callFromThread would skip the pipe/socket based wake up step, assuming the reactor would wake up on its own. However, this resulted in the reactor not noticing an insert into the thread queue at the right time (in this case, after the thread queue has been processed for that reactor iteration).

def test_callFromThread(self):

A function scheduled with IReactorThreads.callFromThread invoked from another thread is run in the reactor thread.

def test_stopThreadPool(self):

When the reactor stops, ReactorBase._stopThreadPool drops the reactor's direct reference to its internal threadpool and removes the associated startup and shutdown triggers.

This is the case of the thread pool being created before the reactor is run.

def test_stopThreadPoolWhenStartedAfterReactorRan(self):

We must handle the case of shutting down the thread pool when it was started after the reactor was run in a special way.

Some implementation background: The thread pool is started with callWhenRunning, which only returns a system trigger ID when it is invoked before the reactor is started.

This is the case of the thread pool being created after the reactor is started.

def test_cleanUpThreadPoolEvenBeforeReactorIsRun(self):

When the reactor has its shutdown event fired before it is run, the thread pool is completely destroyed.

For what it's worth, the reason we support this behavior at all is because Trial does this.

This is the case of the thread pool being created without the reactor being started at al.

def test_isInIOThread(self):

The reactor registers itself as the I/O thread when it runs so that twisted.python.threadable.isInIOThread returns True if it is called in the thread the reactor is running in.

def test_isNotInIOThread(self):

The reactor registers itself as the I/O thread when it runs so that twisted.python.threadable.isInIOThread returns False if it is called in a different thread than the reactor is running in.

API Documentation for twisted, generated by pydoctor at 2020-03-25 17:34:30.