PyTorch (5) Transfer Learning

本文是來自官方 tutor https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html 的一個心得整理,基本上就是個鸚鵡學舌的概念。

Transfer Learning 簡單來說就是把別人訓練好的模型拿來用, 其中又分成只訓練 full connection layer 和全部重新訓練兩種。局部訓練的原因是因為通常訓練好的 model 特徵已經找得很好了,所以我們只需要微調分類的部份。

1. packages:

會用到下面的 packages:

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.utils.data import DataLoader
import numpy as np
from torchvision.datasets import ImageFolder
from torchvision import models, transforms
import matplotlib.pyplot as plt
from time import time
import os
import copy

2. Processing data:

把 data 分成 train set 和 validation set,輸入影像大小為 224*224,訓練時會隨機翻轉。

Normalize 將 data 從 [0,1] 區間移至 [-1,1] 區間(平均值和標準差是由資料的分佈決定,忽略 outlier 時 mean 和 std 不是 0.5)。

data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

將圖片用 ImageFolder 轉成 dataset,ImageFolder 的類別名稱對應到圖片的資料夾名稱。 再把 dataset 轉成 dataloader。

data_dir = "../../data/hymenoptera_data"
image_datasets = {x: ImageFolder(os.path.join(data_dir, x),
                                 data_transforms[x])
                  for x in ["train", "val"]}
dataloaders = {x: DataLoader(image_datasets[x], batch_size=4,
                             shuffle=True, num_workers=2)
               for x in ["train", "val"]}
dataset_sizes = {x: len(image_datasets[x]) for x in ["train", "val"]}
class_names = image_datasets["train"].classes

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

3. Visualizing images:

將圖像 channel 以 1,2,0 的形式重新排列,並將各層還原至 [0,1] 之間。因為電腦的小數運算會有精度問題,所以將影像還原回 [0,1] 時會爆值。clip 將爆掉的值壓回 [0,1] 區間。

def imshow(inp, title=None):
    """Imshow for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)  # values outside the interval are clipped to the interval edges
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(5)  # pause a bit so that plots are updated

再寫個函式顯示訓練結果,看結果時不計算梯度。max 會回傳最大值和索引,但這邊不會用到最大值。

def visualize_model(model, num_images=6):
    was_training = model.training  # save the state

    model.eval()  # Sets the module in evaluation mode
    images_so_far = 0
    plt.figure()

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloaders['val']):
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            for j in range(inputs.size()[0]):
                images_so_far += 1
                ax = plt.subplot(num_images//2, 2, images_so_far)
                ax.axis('off')
                ax.set_title('Predicted: {}, Ans:{}'.format(class_names[preds[j]],
                                                            class_names[labels.tolist()[j]]))
                imshow(inputs.cpu().data[j])

                if images_so_far == num_images:
                    model.train(mode=was_training)  # restore the state
                    return
        model.train(mode=was_training)   # restore the state

4. Training:

訓練的部份和之前差不多,比較特別的是我們使用 scheduler 讓 learning rate 隨著訓練次數而遞減

def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    since = time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                scheduler.step()
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics/
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc))

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

5. main:

我們使用 resnet18 並改變輸出的類別數,範例是一個二類問題,所以輸出改為 2。我們定義一個 scheduler,這個 scheduler 會在訓練過程中慢慢降低 learning 試圖找出最好的 model。

if __name__ == "__main__":
    # train whole model
    model_ft = models.resnet18(pretrained=True)
    num_ftrs = model_ft.fc.in_features
    model_ft.fc = nn.Linear(num_ftrs, 2)

    model_ft = model_ft.to(device)

    criterion = nn.CrossEntropyLoss()

    # Observe that all parameters are being optimized
    optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.001, momentum=0.9)

    # Decay LR by a factor of 0.1 every 7 epochs
    exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

    model_ft = train_model(model_ft, criterion, optimizer_ft, exp_lr_scheduler,
                           num_epochs=25)

    visualize_model(model_ft)

另一個例子是我們把模型固定,只訓練輸出層。nn 預設會計算梯度,所以我們不用再把梯度打開。剩下的部份都跟上面一樣。

if __name__ == "__main__":
    # tran final layer
    model_conv = models.resnet18(pretrained=True)
    for param in model_conv.parameters():
        param.requires_grad = False

    # Parameters of newly constructed modules have requires_grad=True by default
    num_ftrs = model_conv.fc.in_features
    model_conv.fc = nn.Linear(num_ftrs, 2)

    model_conv = model_conv.to(device)

    criterion = nn.CrossEntropyLoss()

    # Observe that only parameters of final layer are being optimized
    optimizer_conv = optim.SGD(model_conv.fc.parameters(), lr=0.001, momentum=0.9)

    # Decay LR by a factor of 0.1 every 7 epochs
    exp_lr_scheduler = lr_scheduler.StepLR(optimizer_conv, step_size=7, gamma=0.1)

    model_conv = train_model(model_conv, criterion, optimizer_conv,
                             exp_lr_scheduler, num_epochs=25)

    visualize_model(model_conv)


留言

熱門文章