tensorflow源碼學習之五 -- 同步訓練和異步訓練
同步和異步訓練是由optimizer來決定的。
1. 同步訓練
同步訓練需要使用SyncReplicasOptimizer,參考https://www.tensorflow.org/api_docs/python/tf/train/SyncReplicasOptimizer 。其他optimizer都屬於異步訓練方式。
同步訓練實現在sync_replicas_optimizer.py文件
中的def apply_gradient()方法中。假設有n個參數:
對於PS,需要創建n個參數收集器(每個參數對應一個收集器),每一個worker將自己計算得到的grad梯度推送到收集器上(推送是使用Send/Recv OP實現的)。每個參數收集器收集到所有的worker的推送值時,對所有的值求平均,然後更新參數的值。當所有的參數都更新完成之後,對global_step加1,並將global_step推送到每個worker的token_queue中,worker更新global_step,開始下一次訓練。
對於Worker,從PS拉取需要的參數,計算grad梯度值,然後將grad推送到相應的參數收集器。推送之後從token_queue中拉取新的global_step(拉取不到新的global_step 就等待?),繼續下一次訓練。
2. 異步訓練
訓練代碼中使用的是GradientDescentOptimizer(繼承了Optimizer),調用其minimize()方法,minimize()方法就是先調用compute_gradients()然後調用apply_gradient()方法。
異步訓練的實現在optimizer.py文件中的def apply_gradient()方法中(GradientDescentOptimizer沒有重寫Optimizer的apply_gradient()方法)。參考https://stackoverflow.com/questions/43147435/how-does-asynchronous-training-work-in-distributed-tensorflow。
對於Worker,worker從PS拉取需要的參數,拉取過程是沒有鎖的,因此拉取的值可能包含了其他worker的修改,也可能沒包含。計算gard梯度值,然後將grad梯度值發送給相應PS。
對於PS,ps收到grad值之後根據優化算法(如,SGD, SGD with Momentum, Adagrad, Adam, etc.)來更新參數。
註:在異步訓練中,假設worker1讀取參數w1,worker2再讀取參數w1,然後worker1更新梯度,worker2再更新梯度,worker1更新的梯度就被worker2覆蓋掉了。如果想對修改做同步,GradientDescentOptimizer的構造函數提供了use_locking參數。
代碼邏輯如下:
def apply_gradients(self, grads_and_vars, global_step=None, name=None): """Apply gradients to variables. This is the second part of `minimize()`. It returns an `Operation` that applies gradients. Args: grads_and_vars: List of (gradient, variable) pairs as returned by `compute_gradients()`. global_step: Optional `Variable` to increment by one after the variables have been updated. name: Optional name for the returned operation. Default to the name passed to the `Optimizer` constructor. Returns: An `Operation` that applies the specified gradients. If `global_step` was not None, that operation also increments `global_step`. Raises: TypeError: If `grads_and_vars` is malformed. ValueError: If none of the variables have gradients. RuntimeError: If you should use `_distributed_apply()` instead. """ # This is a default implementation of apply_gradients() that can be shared # by most optimizers. It relies on the subclass implementing the following # methods: _create_slots(), _prepare(), _apply_dense(), and _apply_sparse(). # Handle DistributionStrategy case. if distribute_lib.get_cross_tower_context(): raise RuntimeError("Use `_distributed_apply()` instead of " "`apply_gradients()` in a cross-tower context.") # TODO(isaprykin): Get rid of `has_distribution_strategy()` check by # always calling _distributed_apply(), using the default distribution # as needed. if distribute_lib.has_distribution_strategy(): grads_and_vars = get_filtered_grad_fn(lambda _: grads_and_vars)() return distribute_lib.get_tower_context().merge_call( self._distributed_apply, grads_and_vars, global_step, name) # No DistributionStrategy case. grads_and_vars = tuple(grads_and_vars) # Make sure repeat iteration works. if not grads_and_vars: raise ValueError("No variables provided.") converted_grads_and_vars = [] for g, v in grads_and_vars: if g is not None: try: # Convert the grad to Tensor or IndexedSlices if necessary. g = ops.convert_to_tensor_or_indexed_slices(g) except TypeError: raise TypeError( "Gradient must be convertible to a Tensor" " or IndexedSlices, or None: %s" % g) if not isinstance(g, (ops.Tensor, ops.IndexedSlices)): raise TypeError( "Gradient must be a Tensor, IndexedSlices, or None: %s" % g) p = _get_processor(v) # _RefVariableProcessor converted_grads_and_vars.append((g, v, p)) # v._ref() = Tensor("weights/Variable:0", shape=(784, 10), dtype=float32_ref, device=/job:ps/task:0) # ((<tf.Tensor ‘train/gradients/softmax/MatMul_grad/tuple/control_dependency_1:0‘ shape=(784, 10) dtype=float32>, <tf.Variable ‘weights/Variable:0‘ shape=(784, 10) dtype=float32_ref>, <tensorflow.python.training.optimizer._RefVariableProcessor object at 0x7f6798012410>), (<tf.Tensor ‘train/gradients/softmax/Add_grad/tuple/control_dependency_1:0‘ shape=(10,) dtype=float32>, <tf.Variable ‘biases/Variable:0‘ shape=(10,) dtype=float32_ref>, <tensorflow.python.training.optimizer._RefVariableProcessor object at 0x7f67980124d0>)) converted_grads_and_vars = tuple(converted_grads_and_vars) var_list = [v for g, v, _ in converted_grads_and_vars if g is not None] if not var_list: raise ValueError("No gradients provided for any variable: %s." % ([str(v) for _, _, v in converted_grads_and_vars],)) with ops.init_scope(): self._create_slots(var_list) update_ops = [] with ops.name_scope(name, self._name) as name: self._prepare() for grad, var, processor in converted_grads_and_vars: if grad is None: continue # We colocate all ops created in _apply_dense or _apply_sparse # on the same device as the variable. # TODO(apassos): figure out how to get the variable name here. if context.executing_eagerly() or isinstance( var, resource_variable_ops.ResourceVariable) and not var._in_graph_mode: # pylint: disable=protected-access scope_name = "" else: scope_name = var.op.name # var.op = {name: "weights/Variable", op: "VariableV2", device: "/job:ps/task:0"} with ops.name_scope("update_" + scope_name), ops.colocate_with(var): update_ops.append(processor.update_op(self, grad)) # 111行 def update_op() 更新op,worker->ps if global_step is None: apply_updates = self._finish(update_ops, name) else: with ops.control_dependencies([self._finish(update_ops, "update")]): with ops.colocate_with(global_step): if isinstance(global_step, resource_variable_ops.ResourceVariable): # TODO(apassos): the implicit read in assign_add is slow; consider # making it less so. apply_updates = resource_variable_ops.assign_add_variable_op( global_step.handle, ops.convert_to_tensor(1, dtype=global_step.dtype), name=name) else: apply_updates = state_ops.assign_add(global_step, 1, name=name) if not context.executing_eagerly(): if isinstance(apply_updates, ops.Tensor): apply_updates = apply_updates.op train_op = ops.get_collection_ref(ops.GraphKeys.TRAIN_OP) if apply_updates not in train_op: train_op.append(apply_updates) return apply_updates
apply_gradients()方法中調用了update_ops.append(processor.update_op(self, grad))方法:
def update_op(self, optimizer, g): if isinstance(g, ops.Tensor): # update_op = {name: "train/GradientDescent/update_weights/Variable/ApplyGradientDescent",op: "ApplyGradientDescent", input: "weights/Variable", input: "train/GradientDescent/learning_rate", input: "train/gradients/softmax/MatMul_grad/tuple/control_dependency_1", device: "/job:ps/task:0"} update_op = optimizer._apply_dense(g, self._v) # pylint: disable=protected-access if self._v.constraint is not None: with ops.control_dependencies([update_op]): return self._v.assign(self._v.constraint(self._v)) else: return update_op # return else: assert isinstance(g, ops.IndexedSlices), ("Gradient ", g, " is neither a " "tensor nor IndexedSlices.") if self._v.constraint is not None: raise RuntimeError( "Cannot use a constraint function on a sparse variable.") # pylint: disable=protected-access return optimizer._apply_sparse_duplicate_indices(g, self._v)
update_op(self, grad))方法調用了optimizer的_app_dense()方法,由於這裏的optimizer是GradientDescentOptimizer,所以是調用GradientDescentOptimizer的_app_dense()方法:
def _apply_dense(self, grad, var): return training_ops.apply_gradient_descent( var, # <tf.Variable ‘weights/Variable:0‘ shape=(784, 10) dtype=float32_ref> math_ops.cast(self._learning_rate_tensor, var.dtype.base_dtype), # <tf.Tensor ‘train/GradientDescent/learning_rate:0‘ shape=() dtype=float32> grad, # Tensor("train/gradients/softmax/MatMul_grad/tuple/control_dependency_1:0", shape=(784, 10), dtype=float32, device=/job:worker/task:0) use_locking=self._use_locking).op # false
又調用了apply_gradient_descent()方法:
def apply_gradient_descent(var, alpha, delta, use_locking=False, name=None): r"""Update ‘*var‘ by subtracting ‘alpha‘ * ‘delta‘ from it. Args: var: A mutable `Tensor`. Must be one of the following types: `float32`, `float64`, `int32`, `uint8`, `int16`, `int8`, `complex64`, `int64`, `qint8`, `quint8`, `qint32`, `bfloat16`, `uint16`, `complex128`, `half`, `uint32`, `uint64`. Should be from a Variable(). alpha: A `Tensor`. Must have the same type as `var`. Scaling factor. Must be a scalar. delta: A `Tensor`. Must have the same type as `var`. The change. use_locking: An optional `bool`. Defaults to `False`. If `True`, the subtraction will be protected by a lock; otherwise the behavior is undefined, but may exhibit less contention. name: A name for the operation (optional). Returns: A mutable `Tensor`. Has the same type as `var`. """ _ctx = _context._context if _ctx is None or not _ctx._eager_context.is_eager: if use_locking is None: use_locking = False use_locking = _execute.make_bool(use_locking, "use_locking") _, _, _op = _op_def_lib._apply_op_helper( "ApplyGradientDescent", var=var, alpha=alpha, delta=delta, use_locking=use_locking, name=name) _result = _op.outputs[:] _inputs_flat = _op.inputs _attrs = ("T", _op.get_attr("T"), "use_locking", _op.get_attr("use_locking")) _execute.record_gradient( "ApplyGradientDescent", _inputs_flat, _attrs, _result, name) _result, = _result return _result else: raise RuntimeError("apply_gradient_descent op does not support eager execution. Arg ‘out‘ is a ref.")
又調用了_apply_op_helper()方法:
# keywords = {‘var‘: <tf.Variable ‘weights/Variable:0‘ shape=(784, 10) dtype=float32_ref>, ‘alpha‘: <tf.Tensor ‘train/GradientDescent/learning_rate:0‘ shape=() dtype=float32>, ‘use_locking‘: False, ‘delta‘: <tf.Tensor ‘train/gradients/softmax/MatMul_grad/tuple/control_dependency_1:0‘ shape=(784, 10) dtype=float32>} def _apply_op_helper(self, op_type_name, name=None, **keywords): """Implementation of apply_op that returns output_structure, op.""" op_info = self._ops.get(op_type_name, None) if op_info is None: raise RuntimeError("Unrecognized Op name " + op_type_name) op_def = op_info.op_def # Fill in the list of default types for all "type" attrs. This # will be used to choose a preferred dtype to convert to in the # absence of input type information. # # TODO(b/31302892): Currently the defaults don‘t work in the right # way if you have two inputs, one of whose type resolution depends # on the other. Handling this will require restructuring this code # significantly. default_type_attr_map = {} for attr_def in op_def.attr: if attr_def.type != "type": continue key = attr_def.name if attr_def.HasField("default_value"): default_type_attr_map[key] = dtypes.as_dtype( attr_def.default_value.type) # Requires that op_def has passed validation (using the C++ # ValidateOpDef() from ../framework/op_def_util.h). attrs = {} inputs = [] input_types = [] with g.as_default(), ops.name_scope(name) as scope: # keywords = {‘var‘: <tf.Variable ‘weights/Variable:0‘ shape=(784, 10) dtype=float32_ref>, ‘alpha‘: <tf.Tensor ‘train/GradientDescent/learning_rate:0‘ shape=() dtype=float32>, ‘use_locking‘: False, ‘delta‘: <tf.Tensor ‘train/gradients/softmax/MatMul_grad/tuple/control_dependency_1:0‘ shape=(784, 10) dtype=float32>} ... # NOTE(mrry): We add an explicit colocation constraint between # the newly created op and any of its reference-typed inputs. must_colocate_inputs = [val for arg, val in zip(op_def.input_arg, inputs) if arg.is_ref] with _MaybeColocateWith(must_colocate_inputs): # Add Op to graph op = g.create_op(op_type_name, inputs, output_types, name=scope, input_types=input_types, attrs=attr_protos, op_def=op_def) return output_structure, op_def.is_stateful, op
該方法比較長,最後是調用了create_op():
def create_op( self, op_type, inputs, dtypes, # pylint: disable=redefined-outer-name input_types=None, name=None, attrs=None, op_def=None, compute_shapes=True, compute_device=True): """Creates an `Operation` in this graph. This is a low-level interface for creating an `Operation`. Most programs will not call this method directly, and instead use the Python op constructors, such as `tf.constant()`, which add ops to the default graph. Args: op_type: The `Operation` type to create. This corresponds to the `OpDef.name` field for the proto that defines the operation. inputs: A list of `Tensor` objects that will be inputs to the `Operation`. dtypes: A list of `DType` objects that will be the types of the tensors that the operation produces. input_types: (Optional.) A list of `DType`s that will be the types of the tensors that the operation consumes. By default, uses the base `DType` of each input in `inputs`. Operations that expect reference-typed inputs must specify `input_types` explicitly. name: (Optional.) A string name for the operation. If not specified, a name is generated based on `op_type`. attrs: (Optional.) A dictionary where the key is the attribute name (a string) and the value is the respective `attr` attribute of the `NodeDef` proto that will represent the operation (an `AttrValue` proto). op_def: (Optional.) The `OpDef` proto that describes the `op_type` that the operation will have. compute_shapes: (Optional.) Deprecated. Has no effect (shapes are always computed). compute_device: (Optional.) If True, device functions will be executed to compute the device property of the Operation. Raises: TypeError: if any of the inputs is not a `Tensor`. ValueError: if colocation conflicts with existing device assignment. Returns: An `Operation` object. """ del compute_shapes self._check_not_finalized() for idx, a in enumerate(inputs): if not isinstance(a, Tensor): raise TypeError("Input #%d is not a tensor: %s" % (idx, a)) if name is None: name = op_type # If a names ends with a ‘/‘ it is a "name scope" and we use it as-is, # after removing the trailing ‘/‘. if name and name[-1] == "/": name = _name_from_scope_name(name) else: name = self.unique_name(name) node_def = _NodeDef(op_type, name, device=None, attrs=attrs) input_ops = set([t.op for t in inputs]) control_inputs = self._control_dependencies_for_inputs(input_ops) # _create_op_helper mutates the new Operation. `_mutation_lock` ensures a # Session.run call cannot occur between creating and mutating the op. with self._mutation_lock(): ret = Operation( node_def, self, inputs=inputs, output_types=dtypes, control_inputs=control_inputs, input_types=input_types, original_op=self._default_original_op, op_def=op_def) self._create_op_helper(ret, compute_device=compute_device) return ret
又調用了_create_op_helper():
def _create_op_helper(self, op, compute_device=True): """Common logic for creating an op in this graph.""" # Apply any additional attributes requested. Do not overwrite any existing # attributes. for key, value in self._attr_scope_map.items(): try: op.get_attr(key) except ValueError: if callable(value): value = value(op.node_def) if not isinstance(value, (type(None), attr_value_pb2.AttrValue)): raise TypeError( "Callable for scope map key ‘%s‘ must return either None or " "an AttrValue protocol buffer; but it returned: %s" % (key, value)) if value: op._set_attr(key, value) # pylint: disable=protected-access # Apply a kernel label if one has been specified for this op type. try: kernel_label = self._op_to_kernel_label_map[op.type] op._set_attr("_kernel", # pylint: disable=protected-access attr_value_pb2.AttrValue(s=compat.as_bytes(kernel_label))) except KeyError: pass # Apply the overriding op type for gradients if one has been specified for # this op type. try: mapped_op_type = self._gradient_override_map[op.type] op._set_attr("_gradient_op_type", # pylint: disable=protected-access attr_value_pb2.AttrValue(s=compat.as_bytes(mapped_op_type))) except KeyError: pass self._record_op_seen_by_control_dependencies(op) if compute_device: self._apply_device_functions(op) if self._colocation_stack: all_colocation_groups = [] for colocation_op in self._colocation_stack: all_colocation_groups.extend(colocation_op.colocation_groups()) if colocation_op.device: # Make this device match the device of the colocated op, to provide # consistency between the device and the colocation property. if (op.device and pydev.canonical_name(op.device) != pydev.canonical_name(colocation_op.device)): logging.warning("Tried to colocate %s with an op %s that had " "a different device: %s vs %s. Postponing " "error-checking until all devices are assigned.", op.name, colocation_op.name, op.device, colocation_op.device) else: op._set_device(colocation_op.device) # pylint: disable=protected-access all_colocation_groups = sorted(set(all_colocation_groups)) # pylint: disable=protected-access op._set_attr("_class", attr_value_pb2.AttrValue( list=attr_value_pb2.AttrValue.ListValue(s=all_colocation_groups))) # pylint: enable=protected-access # Sets "container" attribute if # (1) self._container is not None # (2) "is_stateful" is set in OpDef # (3) "container" attribute is in OpDef # (4) "container" attribute is None if self._container and op.op_def.is_stateful: try: container_attr = op.get_attr("container") except ValueError: # "container" attribute is not in OpDef pass else: if not container_attr: op._set_attr("container", attr_value_pb2.AttrValue( # pylint: disable=protected-access s=compat.as_bytes(self._container)))
其中有段邏輯
if compute_device: self._apply_device_functions(op)
compute_device = True,會接著調用_apply_device_functions(op):
def _apply_device_functions(self, op): """Applies the current device function stack to the given operation.""" # Apply any device functions in reverse order, so that the most recently # pushed function has the first chance to apply a device to the op. # We apply here because the result can depend on the Operation‘s # signature, which is computed in the Operation constructor. for device_function in reversed(self._device_function_stack): if device_function is None: break op._set_device(device_function(op)) # pylint: disable=protected-access
這個方法裏為op分配的設備。分配策略為replica_device_setter()方法設置的策略。
參考:
[1] http://jcf94.com/2018/01/13/2018-01-13-tfunpacking/ (session.run())
[2] http://jcf94.com/2018/01/23/2018-01-23-tfunpacking2/ (tf數據流模型和自動求導)
[3] http://jcf94.com/2018/02/28/2018-02-28-tfunpacking3/ (graph和node)
[4] http://jcf94.com/2018/03/07/2018-03-07-tfunpacking4/ (device)
[5] http://jcf94.com/2018/03/09/2018-03-09-tfunpacking5/ (distributed)
[6] https://www.tensorflow.org/deploy/distributed (distributed tensorflow)
[7] https://stackoverflow.com/questions/43147435/how-does-asynchronous-training-work-in-distributed-tensorflow (asynchronous training in distributed tensorflow)
tensorflow源碼學習之五 -- 同步訓練和異步訓練