tf.train.Coordinator

View source on GitHub

A coordinator for threads.

tf.train.Coordinator(
    clean_stop_exception_types=None
)

This class implements a simple mechanism to coordinate the termination of a set of threads.

Usage:

# Create a coordinator.
coord = Coordinator()
# Start a number of threads, passing the coordinator to each of them.
...start thread 1...(coord, ...)
...start thread N...(coord, ...)
# Wait for all the threads to terminate.
coord.join(threads)

Any of the threads can call coord.request_stop() to ask for all the threads to stop. To cooperate with the requests, each thread must check for coord.should_stop() on a regular basis. coord.should_stop() returns True as soon as coord.request_stop() has been called.

A typical thread running with a coordinator will do something like:

while not coord.should_stop():
  ...do some work...

Exception handling:

A thread can report an exception to the coordinator as part of the request_stop() call. The exception will be re-raised from the coord.join() call.

Thread code:

try:
  while not coord.should_stop():
    ...do some work...
except Exception as e:
  coord.request_stop(e)

Main code:

try:
  ...
  coord = Coordinator()
  # Start a number of threads, passing the coordinator to each of them.
  ...start thread 1...(coord, ...)
  ...start thread N...(coord, ...)
  # Wait for all the threads to terminate.
  coord.join(threads)
except Exception as e:
  ...exception that was passed to coord.request_stop()

To simplify the thread implementation, the Coordinator provides a context handler stop_on_exception() that automatically requests a stop if an exception is raised. Using the context handler the thread code above can be written as:

with coord.stop_on_exception():
  while not coord.should_stop():
    ...do some work...

Grace period for stopping:

After a thread has called coord.request_stop() the other threads have a fixed time to stop, this is called the 'stop grace period' and defaults to 2 minutes. If any of the threads is still alive after the grace period expires coord.join() raises a RuntimeError reporting the laggards.

try:
  ...
  coord = Coordinator()
  # Start a number of threads, passing the coordinator to each of them.
  ...start thread 1...(coord, ...)
  ...start thread N...(coord, ...)
  # Wait for all the threads to terminate, give them 10s grace period
  coord.join(threads, stop_grace_period_secs=10)
except RuntimeError:
  ...one of the threads took more than 10s to stop after request_stop()
  ...was called.
except Exception:
  ...exception that was passed to coord.request_stop()

Args:

Attributes:

Methods

clear_stop

View source

clear_stop()

Clears the stop flag.

After this is called, calls to should_stop() will return False.

join

View source

join(
    threads=None, stop_grace_period_secs=120, ignore_live_threads=False
)

Wait for threads to terminate.

This call blocks until a set of threads have terminated. The set of thread is the union of the threads passed in the threads argument and the list of threads that registered with the coordinator by calling Coordinator.register_thread().

After the threads stop, if an exc_info was passed to request_stop, that exception is re-raised.

Grace period handling: When request_stop() is called, threads are given 'stop_grace_period_secs' seconds to terminate. If any of them is still alive after that period expires, a RuntimeError is raised. Note that if an exc_info was passed to request_stop() then it is raised instead of that RuntimeError.

Args:

Raises:

raise_requested_exception

View source

raise_requested_exception()

If an exception has been passed to request_stop, this raises it.

register_thread

View source

register_thread(
    thread
)

Register a thread to join.

Args:

request_stop

View source

request_stop(
    ex=None
)

Request that the threads stop.

After this is called, calls to should_stop() will return True.

Note: If an exception is being passed in, in must be in the context of handling the exception (i.e. try: ... except Exception as ex: ...) and not a newly created one.

Args:

should_stop

View source

should_stop()

Check if stop was requested.

Returns:

True if a stop was requested.

stop_on_exception

@contextlib.contextmanager
stop_on_exception(
    *args, **kwds
)

Context manager to request stop when an Exception is raised.

Code that uses a coordinator must catch exceptions and pass them to the request_stop() method to stop the other threads managed by the coordinator.

This context handler simplifies the exception handling. Use it as follows:

with coord.stop_on_exception():
  # Any exception raised in the body of the with
  # clause is reported to the coordinator before terminating
  # the execution of the body.
  ...body...

This is completely equivalent to the slightly longer code:

try:
  ...body...
except:
  coord.request_stop(sys.exc_info())

Yields:

nothing.

wait_for_stop

View source

wait_for_stop(
    timeout=None
)

Wait till the Coordinator is told to stop.

Args:

Returns:

True if the Coordinator is told stop, False if the timeout expired.