A specialized runner for distributed trial. The runner launches a number of local worker processes which will run tests.

Method __init__ Undocumented
Method writeResults Write test run final outcome to result.
Method createLocalWorkers Create local worker protocol instances and return them.
Method launchWorkerProcesses Spawn processes from a list of process protocols.
Method run Spawn local worker processes and load tests. After that, run them.
Method runUntilFailure Run the tests with local worker processes until they fail.
Instance Variable _workerNumber the number of workers to be spawned. (type: int)
Instance Variable _stream stream which the reporter will use.
Instance Variable _reporterFactory the reporter class to be used.
Method _makeResult Make reporter factory, and wrap it with a DistReporter.
Method _driveWorker Drive a LocalWorkerAMP instance, iterating the tests and calling run for every one of them.
_workerNumber =
the number of workers to be spawned. (type: int)
_stream =
stream which the reporter will use.
_reporterFactory =
the reporter class to be used.
def _makeResult(self):

Make reporter factory, and wrap it with a DistReporter.

def __init__(self, reporterFactory, workerNumber, workerArguments, stream=None, tracebackFormat='default', realTimeErrors=False, uncleanWarnings=False, logfile='test.log', workingDirectory='_trial_temp'):
Undocumented
def writeResults(self, result):

Write test run final outcome to result.

ParametersresultA TestResult which will print errors and the summary.
def createLocalWorkers(self, protocols, workingDirectory):

Create local worker protocol instances and return them.

ParametersprotocolsAn iterable of LocalWorkerAMP instances.
workingDirectoryThe base path in which we should run the workers. (type: str)
ReturnsA list of quantity LocalWorker instances.
def launchWorkerProcesses(self, spawner, protocols, arguments):

Spawn processes from a list of process protocols.

ParametersspawnerA IReactorProcess.spawnProcess implementation.
protocolsAn iterable of ProcessProtocol instances.
argumentsExtra arguments passed to the processes.
def _driveWorker(self, worker, result, testCases, cooperate):

Drive a LocalWorkerAMP instance, iterating the tests and calling run for every one of them.

ParametersworkerThe LocalWorkerAMP to drive.
resultThe global DistReporter instance.
testCasesThe global list of tests to iterate.
cooperateThe cooperate function to use, to be customized in tests. (type: function)
ReturnsA Deferred firing when all the tests are finished.
def run(self, suite, reactor=None, cooperate=cooperate, untilFailure=False):

Spawn local worker processes and load tests. After that, run them.

ParameterssuiteA tests suite to be run.
reactorThe reactor to use, to be customized in tests. (type: A provider of twisted.internet.interfaces.IReactorProcess)
cooperateThe cooperate function to use, to be customized in tests. (type: function)
untilFailureIf True, continue to run the tests until they fail. (type: bool.)
ReturnsThe test result. (type: DistReporter)
def runUntilFailure(self, suite):

Run the tests with local worker processes until they fail.

ParameterssuiteA tests suite to be run.
API Documentation for twisted, generated by pydoctor at 2020-03-25 17:34:30.