tf.compat.v1.train.SyncReplicasOptimizer

View source on GitHub

Class to synchronize, aggregate gradients and pass them to the optimizer.

Inherits From: Optimizer

tf.compat.v1.train.SyncReplicasOptimizer(
    opt, replicas_to_aggregate, total_num_replicas=None, variable_averages=None,
    variables_to_average=None, use_locking=False, name='sync_replicas'
)

This class is deprecated. For synchrononous training, please use Distribution Strategies.

In a typical asynchronous training environment, it's common to have some stale gradients. For example, with a N-replica asynchronous training, gradients will be applied to the variables N times independently. Depending on each replica's training speed, some gradients might be calculated from copies of the variable from several steps back (N-1 steps on average). This optimizer avoids stale gradients by collecting gradients from all replicas, averaging them, then applying them to the variables in one shot, after which replicas can fetch the new variables and continue.

The following accumulators/queue are created:

The following local variable is created: * sync_rep_local_step, one per replica. Compared against the global_step in each accumulator to check for staleness of the gradients.

The optimizer adds nodes to the graph to collect gradients and pause the trainers until variables are updated. For the Parameter Server job:

  1. An accumulator is created for each variable, and each replica pushes the gradients into the accumulators instead of directly applying them to the variables.
  2. Each accumulator averages once enough gradients (replicas_to_aggregate) have been accumulated.
  3. Apply the averaged gradients to the variables.
  4. Only after all variables have been updated, increment the global step.
  5. Only after step 4, pushes global_step in the token_queue, once for each worker replica. The workers can now fetch the global step, use it to update its local_step variable and start the next batch. Please note that some workers can consume multiple minibatches, while some may not consume even one. This is because each worker fetches minibatches as long as a token exists. If one worker is stuck for some reason and does not consume a token, another worker can use it.

For the replicas:

  1. Start a step: fetch variables and compute gradients.
  2. Once the gradients have been computed, push them into gradient accumulators. Each accumulator will check the staleness and drop the stale.
  3. After pushing all the gradients, dequeue an updated value of global_step from the token queue and record that step to its local_step variable. Note that this is effectively a barrier.
  4. Start the next batch.

Usage

# Create any optimizer to update the variables, say a simple SGD:
opt = GradientDescentOptimizer(learning_rate=0.1)

# Wrap the optimizer with sync_replicas_optimizer with 50 replicas: at each
# step the optimizer collects 50 gradients before applying to variables.
# Note that if you want to have 2 backup replicas, you can change
# total_num_replicas=52 and make sure this number matches how many physical
# replicas you started in your job.
opt = tf.compat.v1.train.SyncReplicasOptimizer(opt, replicas_to_aggregate=50,
                               total_num_replicas=50)

# Some models have startup_delays to help stabilize the model but when using
# sync_replicas training, set it to 0.

# Now you can call `minimize()` or `compute_gradients()` and
# `apply_gradients()` normally
training_op = opt.minimize(total_loss, global_step=self.global_step)


# You can create the hook which handles initialization and queues.
sync_replicas_hook = opt.make_session_run_hook(is_chief)

In the training program, every worker will run the train_op as if not synchronized.

with training.MonitoredTrainingSession(
    master=workers[worker_id].target, is_chief=is_chief,
    hooks=[sync_replicas_hook]) as mon_sess:
  while not mon_sess.should_stop():
    mon_sess.run(training_op)

To use SyncReplicasOptimizer with an Estimator, you need to send sync_replicas_hook while calling the fit. python my_estimator = DNNClassifier(..., optimizer=opt) my_estimator.fit(..., hooks=[sync_replicas_hook])

Args:

Methods

apply_gradients

View source

apply_gradients(
    grads_and_vars, global_step=None, name=None
)

Apply gradients to variables.

This contains most of the synchronization implementation and also wraps the apply_gradients() from the real optimizer.

Args:

Returns:

Raises:

compute_gradients

View source

compute_gradients(
    *args, **kwargs
)

Compute gradients of "loss" for the variables in "var_list".

This simply wraps the compute_gradients() from the real optimizer. The gradients will be aggregated in the apply_gradients() so that user can modify the gradients like clipping with per replica global norm if needed. The global norm with aggregated gradients can be bad as one replica's huge gradients can hurt the gradients from other replicas.

Args:

Returns:

A list of (gradient, variable) pairs.

get_chief_queue_runner

View source

get_chief_queue_runner()

Returns the QueueRunner for the chief to execute.

This includes the operations to synchronize replicas: aggregate gradients, apply to variables, increment global step, insert tokens to token queue.

Note that this can only be called after calling apply_gradients() which actually generates this queuerunner.

Returns:

A QueueRunner for chief to execute.

Raises:

get_init_tokens_op

View source

get_init_tokens_op(
    num_tokens=-1
)

Returns the op to fill the sync_token_queue with the tokens.

This is supposed to be executed in the beginning of the chief/sync thread so that even if the total_num_replicas is less than replicas_to_aggregate, the model can still proceed as the replicas can compute multiple steps per variable update. Make sure: num_tokens >= replicas_to_aggregate - total_num_replicas.

Args:

Returns:

An op for the chief/sync replica to fill the token queue.

Raises:

get_name

View source

get_name()

get_slot

View source

get_slot(
    *args, **kwargs
)

Return a slot named "name" created for "var" by the Optimizer.

This simply wraps the get_slot() from the actual optimizer.

Args:

Returns:

The Variable for the slot if it was created, None otherwise.

get_slot_names

View source

get_slot_names(
    *args, **kwargs
)

Return a list of the names of slots created by the Optimizer.

This simply wraps the get_slot_names() from the actual optimizer.

Args:

Returns:

A list of strings.

make_session_run_hook

View source

make_session_run_hook(
    is_chief, num_tokens=-1
)

Creates a hook to handle SyncReplicasHook ops such as initialization.

minimize

View source

minimize(
    loss, global_step=None, var_list=None, gate_gradients=GATE_OP,
    aggregation_method=None, colocate_gradients_with_ops=False, name=None,
    grad_loss=None
)

Add operations to minimize loss by updating var_list.

This method simply combines calls compute_gradients() and apply_gradients(). If you want to process the gradient before applying them call compute_gradients() and apply_gradients() explicitly instead of using this function.

Args:

Returns:

An Operation that updates the variables in var_list. If global_step was not None, that operation also increments global_step.

Raises:

Eager Compatibility

When eager execution is enabled, loss should be a Python function that takes no arguments and computes the value to be minimized. Minimization (and gradient computation) is done with respect to the elements of var_list if not None, else with respect to any trainable variables created during the execution of the loss function. gate_gradients, aggregation_method, colocate_gradients_with_ops and grad_loss are ignored when eager execution is enabled.

variables

View source

variables()

Fetches a list of optimizer variables in the default graph.

This wraps variables() from the actual optimizer. It does not include the SyncReplicasOptimizer's local step.

Returns:

A list of variables.

Class Variables