分布式训练框架Horovod初步学习
|
|
简介 Horovod 是 TensorFlow、Keras、PyTorch 和 Apache MXNet 的分布式训练框架。Horovod 的目标是使分布式深度学习快速且易于使用。 简单来说就是为这些框架提供分布式支持,比如有一个需求,由于数据量过大(千万级),想要在128个GPU上运行,以便于快速得到结果,这时候就可以用horovod,只需要简单改不多的代码,就可以将原来在单GPU上跑的模型,并行跑在128个GPU上。 安装 安装CMake:https://cmake.org/install/ 如果您安装了 PyPI 中的 TensorFlow,请确保Tensorflow已安装 或安装了g++-4.8.5``g++-4.9 如果您从PyPI:https://pypi.org/project/torch 安装了 PyTorch,请确保已安装了g++-4.9 如果已安装来自Conda 的任一包,请确保已安装 Conda 中的gxx_-64包 安装 pip horovod 在 CPU 上运行: $ pip install horovod 要使用 NCCL 在 GPU 上运行: $ HOROVOD_GPU_OPERATIONS=NCCL pip install horovod 有关使用 GPU 支持安装 Horovod 的更多详细信息,请阅读GPU 上的 Horovod。 名词解释 rank: 表示进程序号,用于进程间通讯,表征进程优先级。rank = 0 的主机为 master 节点。 local_rank: 进程内,GPU 编号,非显式参数,由 torch.distributed.launch 内部指定。比方说, rank = 3,local_rank = 0 表示第 3 个进程内的第 1 块 GPU。 allreduce: 累加所有数据,并同步到所有节点的操作 allgather: 收集所有数据,并同步到所有节点的操作,完成后每个节点都包含所有节点的数据,并且这些数据单独存在 broadcast: 将数据(需要由根节点确认)从一个节点传播到其他所有节点的操作 使用指南 添加以下步骤: hvd.init()用于初始化horovod 将每个GPU固定给单个进程处理,以避免资源竞争。 每个进程设置为一个GPU,通过设置local rank参数,服务器上的第一个进程将分配第一个 GPU,第二个进程将分配第二个 GPU,等等 if torch.cuda.is_available(): torch.cuda.set_device(hvd.local_rank()) 根据线程个数缩放学习率 将优化器包装在hvd.DistributedOptimizer中。 分布式优化器将梯度计算委托给原始优化器,使用allduce或allgather来平均梯度,然后应用这些平均梯度。 将初始变量的状态从rank 0广播至其他进程。需要保证初始化的一致性。 修改权重保存部分源码,只通过worker 0保存权重,防止由于多线程操作导致的冲突。 Tensorflow + Horovod 这里Tensorflow是1.x版本,更加稳定一些,以下是一个修改示例。 import tensorflow as tf import horovod.tensorflow as hvd # Initialize Horovod hvd.init() # Pin GPU to be used to process local rank (one GPU per process) config = tf.ConfigProto() config.gpu_options.visible_device_list = str(hvd.local_rank()) # Build model... loss = ... opt = tf.train.AdagradOptimizer(0.01 * hvd.size()) # Add Horovod Distributed Optimizer opt = hvd.DistributedOptimizer(opt) # Add hook to broadcast variables from rank 0 to all other processes during # initialization. hooks = [hvd.BroadcastGlobalVariablesHook(0)] # Make training operation train_op = opt.minimize(loss) # Save checkpoints only on worker 0 to prevent other workers from corrupting them. checkpoint_dir = '/tmp/train_logs' if hvd.rank() == 0 else None # The MonitoredTrainingSession takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir, config=config, hooks=hooks) as mon_sess: while not mon_sess.should_stop(): # Perform synchronous training. mon_sess.run(train_op) 完整代码如下: import os import errno import tensorflow as tf import horovod.tensorflow as hvd import numpy as np import argparse from tensorflow import keras layers = tf.layers tf.logging.set_verbosity(tf.logging.INFO) # Training settings parser = argparse.ArgumentParser(description='Tensorflow MNIST Example') parser.add_argument('--use-adasum', action='store_true', default=False, help='use adasum algorithm to do reduction') parser.add_argument('--gradient-predivide-factor', type=float, default=1.0, help='apply gradient predivide factor in optimizer (default: 1.0)') args = parser.parse_args() def conv_model(feature, target, mode): """2-layer convolution model.""" # Convert the target to a one-hot tensor of shape (batch_size, 10) and # with a on-value of 1 for each one-hot vector of length 10. target = tf.one_hot(tf.cast(target, tf.int32), 10, 1, 0) # Reshape feature to 4d tensor with 2nd and 3rd dimensions being # image width and height final dimension being the number of color channels. feature = tf.reshape(feature, [-1, 28, 28, 1]) # First conv layer will compute 32 features for each 5x5 patch with tf.variable_scope('conv_layer1'): h_conv1 = layers.conv2d(feature, 32, kernel_size=[5, 5], activation=tf.nn.relu, padding="SAME") h_pool1 = tf.nn.max_pool( h_conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME') # Second conv layer will compute 64 features for each 5x5 patch. with tf.variable_scope('conv_layer2'): h_conv2 = layers.conv2d(h_pool1, 64, kernel_size=[5, 5], activation=tf.nn.relu, padding="SAME") h_pool2 = tf.nn.max_pool( h_conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME') # reshape tensor into a batch of vectors h_pool2_flat = tf.reshape(h_pool2, [-1, 7 * 7 * 64]) # Densely connected layer with 1024 neurons. h_fc1 = layers.dropout( layers.dense(h_pool2_flat, 1024, activation=tf.nn.relu), rate=0.5, training=mode == tf.estimator.ModeKeys.TRAIN) # Compute logits (1 per class) and compute loss. logits = layers.dense(h_fc1, 10, activation=None) loss = tf.losses.softmax_cross_entropy(target, logits) return tf.argmax(logits, 1), loss def train_input_generator(x_train, y_train, batch_size=64): assert len(x_train) == len(y_train) while True: p = np.random.permutation(len(x_train)) x_train, y_train = x_train[p], y_train[p] index = 0 while index <= len(x_train) - batch_size: yield x_train[index:index + batch_size], \ y_train[index:index + batch_size], index += batch_size def main(_): # Horovod: initialize Horovod. hvd.init() # Keras automatically creates a cache directory in ~/.keras/datasets for # storing the downloaded MNIST data. This creates a race # condition among the workers that share the same filesystem. If the # directory already exists by the time this worker gets around to creating # it, ignore the resulting exception and continue. cache_dir = os.path.join(os.path.expanduser('~'), '.keras', 'datasets') if not os.path.exists(cache_dir): try: os.mkdir(cache_dir) except OSError as e: if e.errno == errno.EEXIST and os.path.isdir(cache_dir): pass else: raise # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = \ keras.datasets.mnist.load_data('MNIST-data-%d' % hvd.rank()) # The shape of downloaded data is (-1, 28, 28), hence we need to reshape it # into (-1, 784) to feed into our network. Also, need to normalize the # features between 0 and 1. x_train = np.reshape(x_train, (-1, 784)) / 255.0 x_test = np.reshape(x_test, (-1, 784)) / 255.0 # Build model... with tf.name_scope('input'): image = tf.placeholder(tf.float32, [None, 784], name='image') label = tf.placeholder(tf.float32, [None], name='label') predict, loss = conv_model(image, label, tf.estimator.ModeKeys.TRAIN) lr_scaler = hvd.size() # By default, Adasum doesn't need scaling when increasing batch size. If used with NCCL, # scale lr by local_size if args.use_adasum: lr_scaler = hvd.local_size() if hvd.nccl_built() else 1 # Horovod: adjust learning rate based on lr_scaler. opt = tf.train.AdamOptimizer(0.001 * lr_scaler) # Horovod: add Horovod Distributed Optimizer. opt = hvd.DistributedOptimizer(opt, op=hvd.Adasum if args.use_adasum else hvd.Average, gradient_predivide_factor=args.gradient_predivide_factor) global_step = tf.train.get_or_create_global_step() train_op = opt.minimize(loss, global_step=global_step) hooks = [ # Horovod: BroadcastGlobalVariablesHook broadcasts initial variable states # from rank 0 to all other processes. This is necessary to ensure consistent # initialization of all workers when training is started with random weights # or restored from a checkpoint. hvd.BroadcastGlobalVariablesHook(0), # Horovod: adjust number of steps based on number of GPUs. tf.train.StopAtStepHook(last_step=20000 // hvd.size()), tf.train.LoggingTensorHook(tensors={'step': global_step, 'loss': loss}, every_n_iter=10), ] # Horovod: pin GPU to be used to process local rank (one GPU per process) config = tf.ConfigProto() config.gpu_options.allow_growth = True config.gpu_options.visible_device_list = str(hvd.local_rank()) # Horovod: save checkpoints only on worker 0 to prevent other workers from # corrupting them. checkpoint_dir = './checkpoints' if hvd.rank() == 0 else None training_batch_generator = train_input_generator(x_train, y_train, batch_size=100) # The MonitoredTrainingSession takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir, hooks=hooks, config=config) as mon_sess: while not mon_sess.should_stop(): # Run a training step synchronously. image_, label_ = next(training_batch_generator) mon_sess.run(train_op, feed_dict={image: image_, label: label_}) if __name__ == "__main__": tf.app.run() PyTorch + Horovod 运行hvd.init() 将每个 GPU 固定到单个进程。 每个进程的典型设置为一个 GPU,请将此设置为本地排名。服务器上的第一个进程将分配第一个 GPU,第二个进程将分配第二个 GPU,等等。 if torch.cuda.is_available(): torch.cuda.set_device(hvd.local_rank()) 按线程数缩放学习率。 同步分布式培训中的有效批次大小按工作人员数量进行缩放。学习率的提高弥补了批次大小的增加。 将优化器包装在hvd.DistributedOptimizer 分布式优化器将梯度计算委托给原始优化器,使用allduce或all 聚集来平均梯度,然后应用这些平均梯度。 将初始变量状态从排名 0 广播到所有其他进程: hvd.broadcast_parameters(model.state_dict(), root_rank=0) hvd.broadcast_optimizer_state(optimizer, root_rank=0) 在使用随机权重开始训练或从检查点恢复训练时,这对于确保所有工作人员的一致初始化是必要的。 修改代码以仅保存工作线程 0 上的检查点,以防止其他工作人员损坏它们。 通过使用 保护模型检查点代码,实现此目的。hvd.rank() != 0 import torch import horovod.torch as hvd # Initialize Horovod hvd.init() # Pin GPU to be used to process local rank (one GPU per process) torch.cuda.set_device(hvd.local_rank()) # Define dataset... train_dataset = ... # Partition dataset among workers using DistributedSampler train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=hvd.size(), rank=hvd.rank()) train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler) # Build model... model = ... model.cuda() optimizer = optim.SGD(model.parameters()) # Add Horovod Distributed Optimizer optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters()) # Broadcast parameters from rank 0 to all other processes. hvd.broadcast_parameters(model.state_dict(), root_rank=0) for epoch in range(100): for batch_idx, (data, target) in enumerate(train_loader): optimizer.zero_grad() output = model(data) loss = F.nll_loss(output, target) loss.backward() optimizer.step() if batch_idx % args.log_interval == 0: print('Train Epoch: {} [{}/{}]\tLoss: {}'.format( epoch, batch_idx * len(data), len(train_sampler), loss.item())) 完整代码: import argparse import torch.multiprocessing as mp import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchvision import datasets, transforms import torch.utils.data.distributed import horovod.torch as hvd # Training settings parser = argparse.ArgumentParser(description='PyTorch MNIST Example') parser.add_argument('--batch-size', type=int, default=64, metavar='N', help='input batch size for training (default: 64)') parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N', help='input batch size for testing (default: 1000)') parser.add_argument('--epochs', type=int, default=10, metavar='N', help='number of epochs to train (default: 10)') parser.add_argument('--lr', type=float, default=0.01, metavar='LR', help='learning rate (default: 0.01)') parser.add_argument('--momentum', type=float, default=0.5, metavar='M', help='SGD momentum (default: 0.5)') parser.add_argument('--no-cuda', action='store_true', default=False, help='disables CUDA training') parser.add_argument('--seed', type=int, default=42, metavar='S', help='random seed (default: 42)') parser.add_argument('--log-interval', type=int, default=10, metavar='N', help='how many batches to wait before logging training status') parser.add_argument('--fp16-allreduce', action='store_true', default=False, help='use fp16 compression during allreduce') parser.add_argument('--use-adasum', action='store_true', default=False, help='use adasum algorithm to do reduction') parser.add_argument('--gradient-predivide-factor', type=float, default=1.0, help='apply gradient predivide factor in optimizer (default: 1.0)') class Net(nn.Module): def __init__(self): super(Net, self).__init__() self.conv1 = nn.Conv2d(1, 10, kernel_size=5) self.conv2 = nn.Conv2d(10, 20, kernel_size=5) self.conv2_drop = nn.Dropout2d() self.fc1 = nn.Linear(320, 50) self.fc2 = nn.Linear(50, 10) def forward(self, x): x = F.relu(F.max_pool2d(self.conv1(x), 2)) x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) x = x.view(-1, 320) x = F.relu(self.fc1(x)) x = F.dropout(x, training=self.training) x = self.fc2(x) return F.log_softmax(x) def train(epoch): model.train() # Horovod: set epoch to sampler for shuffling. train_sampler.set_epoch(epoch) for batch_idx, (data, target) in enumerate(train_loader): if args.cuda: data, target = data.cuda(), target.cuda() optimizer.zero_grad() output = model(data) loss = F.nll_loss(output, target) loss.backward() optimizer.step() if batch_idx % args.log_interval == 0: # Horovod: use train_sampler to determine the number of examples in # this worker's partition. print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( epoch, batch_idx * len(data), len(train_sampler), 100. * batch_idx / len(train_loader), loss.item())) def metric_average(val, name): tensor = torch.tensor(val) avg_tensor = hvd.allreduce(tensor, name=name) return avg_tensor.item() def test(): model.eval() test_loss = 0. test_accuracy = 0. for data, target in test_loader: if args.cuda: data, target = data.cuda(), target.cuda() output = model(data) # sum up batch loss test_loss += F.nll_loss(output, target, size_average=False).item() # get the index of the max log-probability pred = output.data.max(1, keepdim=True)[1] test_accuracy += pred.eq(target.data.view_as(pred)).cpu().float().sum() # Horovod: use test_sampler to determine the number of examples in # this worker's partition. test_loss /= len(test_sampler) test_accuracy /= len(test_sampler) # Horovod: average metric values across workers. test_loss = metric_average(test_loss, 'avg_loss') test_accuracy = metric_average(test_accuracy, 'avg_accuracy') # Horovod: print output only on first rank. if hvd.rank() == 0: print('\nTest set: Average loss: {:.4f}, Accuracy: {:.2f}%\n'.format( test_loss, 100. * test_accuracy)) if __name__ == '__main__': args = parser.parse_args() args.cuda = not args.no_cuda and torch.cuda.is_available() # Horovod: initialize library. hvd.init() torch.manual_seed(args.seed) if args.cuda: # Horovod: pin GPU to local rank. torch.cuda.set_device(hvd.local_rank()) torch.cuda.manual_seed(args.seed) # Horovod: limit # of CPU threads to be used per worker. torch.set_num_threads(1) kwargs = {'num_workers': 1, 'pin_memory': True} if args.cuda else {} # When supported, use 'forkserver' to spawn dataloader workers instead of 'fork' to prevent # issues with Infiniband implementations that are not fork-safe if (kwargs.get('num_workers', 0) > 0 and hasattr(mp, '_supports_context') and mp._supports_context and 'forkserver' in mp.get_all_start_methods()): kwargs['multiprocessing_context'] = 'forkserver' train_dataset = \ datasets.MNIST('data-%d' % hvd.rank(), train=True, download=True, transform=transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)) ])) # Horovod: use DistributedSampler to partition the training data. train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=hvd.size(), rank=hvd.rank()) train_loader = torch.utils.data.DataLoader( train_dataset, batch_size=args.batch_size, sampler=train_sampler, **kwargs) test_dataset = \ datasets.MNIST('data-%d' % hvd.rank(), train=False, transform=transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)) ])) # Horovod: use DistributedSampler to partition the test data. test_sampler = torch.utils.data.distributed.DistributedSampler( test_dataset, num_replicas=hvd.size(), rank=hvd.rank()) test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=args.test_batch_size, sampler=test_sampler, **kwargs) model = Net() # By default, Adasum doesn't need scaling up learning rate. lr_scaler = hvd.size() if not args.use_adasum else 1 if args.cuda: # Move model to GPU. model.cuda() # If using GPU Adasum allreduce, scale learning rate by local_size. if args.use_adasum and hvd.nccl_built(): lr_scaler = hvd.local_size() # Horovod: scale learning rate by lr_scaler. optimizer = optim.SGD(model.parameters(), lr=args.lr * lr_scaler, momentum=args.momentum) # Horovod: broadcast parameters & optimizer state. hvd.broadcast_parameters(model.state_dict(), root_rank=0) hvd.broadcast_optimizer_state(optimizer, root_rank=0) # Horovod: (optional) compression algorithm. compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none # Horovod: wrap optimizer with DistributedOptimizer. optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters(), compression=compression, op=hvd.Adasum if args.use_adasum else hvd.Average, gradient_predivide_factor=args.gradient_predivide_factor) for epoch in range(1, args.epochs + 1): train(epoch) test() 训练命令 开始训练,指定worker个数: # run training with 4 GPUs on a single machine $ horovodrun -np 4 python train.py # run training with 8 GPUs on two machines (4 GPUs each) $ horovodrun -np 8 -H hostname1:4,hostname2:4 python train.py 参考 https://github.com/horovod/horovod 声明:文章收集于网络,版权归原作者所有,为传播信息而发,如有侵权,请联系小编删除,谢谢! 欢迎加入本站公开兴趣群 商业智能与数据分析群 兴趣范围包括:各种让数据产生价值的办法,实际应用案例分享与讨论,分析工具,ETL工具,数据仓库,数据挖掘工具,报表系统等全方位知识 QQ群:81035754 |
时间:2020-12-12 22:21 来源: 转发量:次
声明:本站部分作品是由网友自主投稿和发布、编辑整理上传,对此类作品本站仅提供交流平台,转载的目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,不为其版权负责。如果您发现网站上有侵犯您的知识产权的作品,请与我们取得联系,我们会及时修改或删除。