PyTorch (6-3) Distributed Applications with PyTorch (一機多卡)
本文來自官方教學 https://pytorch.org/tutorials/intermediate/dist_tuto.html。 這篇範例可能比較舊,所以我改了一些地方。
1. Packages:
import os import torch import torch.distributed as dist import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from math import ceil from random import Random from torch.multiprocessing import Process from torchvision import datasets, transforms
2. Partition:
將資料分成多個 chunks,之後會設定 chunk 數量等於顯卡數量。
設定 chunk 數量等於顯卡數量有兩個好處:
- 每個 chunk 資料量相同
- 每個顯卡依照自己的 GPU 編號取 chunk
class Partition(object): """ Dataset-like object, but only access a subset of it. """ def __init__(self, data, index): self.data = data self.index = index def __len__(self): return len(self.index) def __getitem__(self, index): data_idx = self.index[index] return self.data[data_idx] class DataPartitioner(object): """ Partitions a dataset into different chuncks. """ def __init__(self, data, sizes, seed=1234): self.data = data self.partitions = [] rng = Random() rng.seed(seed) data_len = len(data) indexes = [x for x in range(0, data_len)] rng.shuffle(indexes) for frac in sizes: part_len = int(frac * data_len) self.partitions.append(indexes[0:part_len]) indexes = indexes[part_len:] def use(self, partition): return Partition(self.data, self.partitions[partition])
將資料平均分給各個 ranks,128 是一個 batch constant。我們會把這個 batch 分給多個 rank,但基數就是 128。
def partition_dataset(): """ Partitioning MNIST """ dataset = datasets.MNIST( '../../data/MNIST', train=True, download=True, transform=transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307, ), (0.3081, )) ])) size = dist.get_world_size() bsz = int(128 / float(size)) partition_sizes = [1.0 / size for _ in range(size)] partition = DataPartitioner(dataset, partition_sizes) partition = partition.use(dist.get_rank()) train_set = torch.utils.data.DataLoader( partition, batch_size=bsz, shuffle=True) return train_set, bsz
3. ConvNet:
簡單 ConvNet。
class Net(nn.Module): """ Network architecture. """ def __init__(self): super(Net, self).__init__() self.conv1 = nn.Conv2d(1, 10, kernel_size=5) self.conv2 = nn.Conv2d(10, 20, kernel_size=5) self.conv2_drop = nn.Dropout2d() self.fc1 = nn.Linear(320, 50) self.fc2 = nn.Linear(50, 10) def forward(self, x): x = F.relu(F.max_pool2d(self.conv1(x), 2)) x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) x = x.view(-1, 320) x = F.relu(self.fc1(x)) x = F.dropout(x, training=self.training) x = self.fc2(x) return x
4. Distributed Application:
接下來各 gpu 會各自計算梯度再同時更新,過程如下。
我們通常會把 model 放在不同 gpu 上,但也可以放在同個 gpu 裡 (比較沒有實質的意義)。
def run(rank, size): """ Distributed Synchronous SGD Example """ torch.manual_seed(1234) train_set, bsz = partition_dataset() # [start-20180912-cooper-mod] # model = Net() model = model.cuda(rank) # model = model.cuda(0) # [end-20180912-cooper-mod] # # [start-20180912-cooper-add] # criterion = nn.CrossEntropyLoss() # [end-20180912-cooper-add] # optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5) num_batches = ceil(len(train_set.dataset) / float(bsz)) for epoch in range(10): epoch_loss = 0.0 for data, target in train_set: # [start-20180912-cooper-mod] # data, target = torch.tensor(data), torch.tensor(target) data, target = data.cuda(rank), target.cuda(rank) # data, target = data.cuda(0), target.cuda(0) # [end-20180912-cooper-mod] # optimizer.zero_grad() output = model(data) # [start-20180912-cooper-mod] # loss = criterion(output, target) epoch_loss += loss.item() loss.backward() # [end-20180912-cooper-mod] # average_gradients(model) optimizer.step() print("Rank: {:2}, epoch: {:2}, loss: {:.3}".format( dist.get_rank(), epoch, epoch_loss / num_batches))
加總並平均所有梯度後 optimizer 會更新所有 tensor。
def average_gradients(model): """ Gradient averaging. """ size = float(dist.get_world_size()) for param in model.parameters(): dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM, group=0) param.grad.data /= size
5. 初始化和運行:
根據官網的說法https://pytorch.org/docs/stable/distributed.html,backends 會決定傳輸的 tensor 類型。
選用 gloo:
def init_processes(rank, size, fn, backend='gloo'): """ Initialize the distributed environment. """ os.environ['MASTER_ADDR'] = '127.0.0.1' os.environ['MASTER_PORT'] = '29500' dist.init_process_group(backend, rank=rank, world_size=size) fn(rank, size)
if __name__ == "__main__": size = 2 processes = [] for rank in range(size): p = Process(target=init_processes, args=(rank, size, run)) p.start() processes.append(p) for p in processes: p.join()
留言
張貼留言