인공지능/딥러닝

[딥러닝 파이토치 교과서] ResNet 용어 정리 및 코드 분석

M.랄라 2023. 5. 30. 13:19
  • 병목블록 (bottleneck block) 
    • 1X1 합성곱층의 채널 수를 조절하면서 차원을 줄였다 늘리것을 병목과 같다고 하여 병목블록 이라고 함
    • 교통에서 차선이 3개에서 1개로 줄을 때 병목이 생긴다고 하는데, 이런 원리(?)를 차용한듯 싶음
  • 아이덴티티 매핑 (Identity mapping = Shortcut = Skip Connection)
    • 세 가지 모두 같은 말로 아래 그림의 + 기호를 의미함
    • 입력 x가 어떤 함수를 통과하더라도 다시 x라는 형태(값이 아님)로 출력되도록 하는 것
    • stride의 크기를 조절하며 downsample(다운샘플)을 하기도 함

  • 프로젝션 숏컷 (projection-shortcut) = 합성곱 블록
    • 입력의 차원을 출력에 맞추어 변경해야 하는 것
    • 위의 숏컷을 블록단위로 묶은 것을 projection-shortcut이라고 함

 


주요 코드 분석

class BasicBlock(nn.Module):    
    expansion = 1
    
    def __init__(self, in_channels, out_channels, stride = 1, downsample = False):
        super().__init__()                
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 3, 
                               stride = stride, padding = 1, bias = False)
        self.bn1 = nn.BatchNorm2d(out_channels)        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, 
                               stride = 1, padding = 1, bias = False)
        self.bn2 = nn.BatchNorm2d(out_channels)        
        self.relu = nn.ReLU(inplace = True)
        
        if downsample:
            conv = nn.Conv2d(in_channels, out_channels, kernel_size = 1, 
                             stride = stride, bias = False)
            bn = nn.BatchNorm2d(out_channels)
            downsample = nn.Sequential(conv, bn)
        else:
            downsample = None        
        self.downsample = downsample
        
    def forward(self, x):       
        i = x       
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)        
        x = self.conv2(x)
        x = self.bn2(x)
        
        if self.downsample is not None:
            i = self.downsample(i)
                        
        x += i
        x = self.relu(x)
        
        return x
  • downsample 부분은 Kernel_size=1 이기 때문에 1x1 COnvolution 레이어를 생성함. 1x1 컨볼루션은 입력이미지의 채널 수를 변경하지 않고 해상도를 줄이는 역할을 하므로 다운샘플링이 되는 것이다. 
  • x += i는 Residual Connection을 수행하기 위한 코드임. 이전 레이어의 출력인 x에 이전 잔차인 i를 더하면 입력과 출력의 차이를 학습에 활용한다
class Bottleneck(nn.Module):    
    expansion = 4
    
    def __init__(self, in_channels, out_channels, stride = 1, downsample = False):
        super().__init__()    
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 1, stride = 1, bias = False)
        self.bn1 = nn.BatchNorm2d(out_channels)        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, stride = stride, padding = 1, bias = False)
        self.bn2 = nn.BatchNorm2d(out_channels)        
        self.conv3 = nn.Conv2d(out_channels, self.expansion * out_channels, kernel_size = 1,
                               stride = 1, bias = False)
        self.bn3 = nn.BatchNorm2d(self.expansion * out_channels)        
        self.relu = nn.ReLU(inplace = True)
        
        if downsample:
            conv = nn.Conv2d(in_channels, self.expansion * out_channels, kernel_size = 1, 
                             stride = stride, bias = False)
            bn = nn.BatchNorm2d(self.expansion * out_channels)
            downsample = nn.Sequential(conv, bn)
        else:
            downsample = None            
        self.downsample = downsample
        
    def forward(self, x):        
        i = x        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)        
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)        
        x = self.conv3(x)
        x = self.bn3(x)
                
        if self.downsample is not None:
            i = self.downsample(i)
            
        x += i
        x = self.relu(x)
    
        return x
  • Bottleneck에서 expnasion=4를 계속 곱해주는 이유는 네트워크의 표현능력을 증가시키기 위해서임. Bottleneck에서 수행하는 3x3 컨볼루션 레이어는 입력 채널의 크기를 줄이기 때문에 중간 레이어에서 발생한 정보 손실을 최소화 하기 때문에 1x1 컨볼루션을 수행해 입력 채널을 확장함. 입력채널을 확장하는 것은 네트워크가 더 깊은 표현을 학습하고 더 복잡한 특징을 모델링할 수 있게 함. 
class ResNet(nn.Module):
  def __init__(self, config, output_dim, zero_init_residual=False):
    super().__init__()

    block, n_blocks, channels = config #ResNet을 호출할 때 넘겨준 config값 저장
    self.in_channels = channels[0]
    assert len(n_blocks) == len(channels) == 4 #블록크기 = 채널크기 = 4

    self.conv1 = nn.Conv2d(3, self.in_channels, kernel_size=7, stride=2, padding=3, bias=False)
    self.bn1 = nn.BatchNorm2d(self.in_channels)
    self.relu = nn.ReLU(inplace=True)
    self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

    self.layer1 = self.get_resnet_layer(block, n_blocks[0], channels[0])
    self.layer2 = self.get_resnet_layer(block, n_blocks[1], channels[1], stride=2)
    self.layer3 = self.get_resnet_layer(block, n_blocks[2], channels[2], stride=2)
    self.layer4 = self.get_resnet_layer(block, n_blocks[3], channels[3], stride=2)

    self.avgpool = nn.AdaptiveAvgPool2d((1,1))
    self.fc = nn.Linear(self.in_channels, output_dim)

    if zero_init_residual:
      for m in self.modules():
        if isinstance(m, Bottleneck):
          nn.init.constant_(m.bn3.weight, 0)
        elif isinstance(m, Bottleneck):  
          nn.init.constant_(m.bn2.weight, 0)

  def get_resnet_layer(self, block, n_blocks, channels, stride=1): # 블록을 추가하기 위한 함수
    layers = []
    if self.in_channels != block.expansion * channels:
      downsample = True
    else:
      downsample = False
    
    layers.append(block(self.in_channels, channels, stride, downsample))
    for i in range(1, n_blocks): #n_blocks만큼 계층 추가
      layers.append(block(block.expansion*channels, channels))
    self.in_channels = block.expansion * channels
    return nn.Sequential(*layers)

  def forward(self, x):        
      x = self.conv1(x)
      x = self.bn1(x)
      x = self.relu(x)
      x = self.maxpool(x)
      x = self.layer1(x)
      x = self.layer2(x)
      x = self.layer3(x)
      x = self.layer4(x)        
      x = self.avgpool(x)
      h = x.view(x.shape[0], -1)
      x = self.fc(h)        
      return x, h
  • n_blocks는 ResNet 모델의 각 레이어에 속한 블록(block)의 개수를 나타낸다. ResNet은 여러 개의 블록으로 구성된 신경망 구조이다. 현재 모델에서는 BasicBlock, Bottleneck에 해당한다.
resnet18_config = ResNetConfig(block=BasicBlock, n_blocks=[2,2,2,2], channels=[64,128,256,512])
resnet34_config = ResNetConfig(block=BasicBlock, n_blocks=[3,4,6,3], channels=[64,128,256,512])
resnet50_config = ResNetConfig(block=Bottleneck, n_blocks=[3,4,6,3], channels=[64,128,256,512])
resnet101_config = ResNetConfig(block=Bottleneck, n_blocks=[3,4,23,3], channels=[64,128,256,512])
resnet152_config = ResNetConfig(block=Bottleneck, n_blocks=[3,8,36,3], channels=[64,128,256,512])
  • resnet18_config = ResNetConfig(block=BasicBlock, n_blocks=[2,2,2,2], channels=[64,128,256,512]) 의 경우에는 4개의 레이어에([2,2,2,2]하면 4개이니까) 각각 BasicBlock이 2개씩 쌓인다는 의미이다.
  • assert len(n_blocks) == len(channels) == 4 에서 하필 4인 이유는 우리가 n_blocks를 [2,2,2,2]처럼 4개의 레이어로 넣어줄 것이기 때문이다. 각 레이어마다 블록의 개수와 채널의 개수를 지정해줘야 되기 때문에 두개가 같지 않으면 인풋이 잘못 들어온 것이므로 당연히 에러를 내줘야 한다.
def calculate_topk_accuracy(y_pred, y, k = 2):
    with torch.no_grad(): 
        batch_size = y.shape[0]
        _, top_pred = y_pred.topk(k, 1)
        top_pred = top_pred.t()
        correct = top_pred.eq(y.view(1, -1).expand_as(top_pred))
        correct_1 = correct[:1].reshape(-1).float().sum(0, keepdim = True)
        correct_k = correct[:k].reshape(-1).float().sum(0, keepdim = True)
        acc_1 = correct_1 / batch_size
        acc_k = correct_k / batch_size
    return acc_1, acc_k
  • 이 함수는 가장 높은 k개의 top accuracy를 반환하는 함수이다.
  • torch.no_grad()는 모델의 파라미터가 업데이트 되지 않고 모델이 입력에 대한 예측을 만들기만 한다는 뜻이다. 따라서 모델을 평가할 때 사용하여 모델의 파라미터가 업데이트 되지 않도록 한다. (기본적으로 파이토치는 모델이 웨이트를 계속 업데이트 하도록 되어있음)
  • y_pred.topk(k,1)은 y_pred에서 가장 큰 k개의 값을 선택한다는 뜻이다. 1은 dim에서 행을 의미함. 따라서 행을 기준으로 가장 큰 두개의 값을 선택한다는 뜻임. 첫 번째 반환값은 y_pred에서 가장 큰 2개의 값의 텐서이다. 두 번째 반환값은 해당 값들의 인덱스이다. 따라서 top_pred는 가장 큰 두 개의 값에 대한 인덱스 값을 갖게된다.
  • 그 뽑은 인덱스에 대해서 t()를 해주고 (행과 열을 바꾸는 연산임), 그것을  실제 레이블인 y에 대해서 얼마나 예측이 맞는지를 구햊주는 것임
def train(model, iterator, optimizer, criterion, device):    
    epoch_loss = 0
    epoch_acc_1 = 0
    epoch_acc_5 = 0
    
    model.train()    
    for (x, y) in iterator:        
        x = x.to(device)
        y = y.to(device)
            
        optimizer.zero_grad()                
        y_pred = model(x)  

        loss = criterion(y_pred[0], y) 
        
        acc_1, acc_5 = calculate_topk_accuracy(y_pred[0], y)    
        loss.backward()        
        optimizer.step()        
        
        epoch_loss += loss.item()
        epoch_acc_1 += acc_1.item()
        epoch_acc_5 += acc_5.item()
        
    epoch_loss /= len(iterator)
    epoch_acc_1 /= len(iterator)
    epoch_acc_5 /= len(iterator)        
    return epoch_loss, epoch_acc_1, epoch_acc_5
  • y_pred[0]을 가져오는 이유는 class ResNet.forward()에서 x, h를 return 하기 때문. forward()함수의 코드를 보면 마지막에 x = self.fc(h) 를 해주는데 이 결과 값이 바로 분류모델에서 예측된 레이블에 대한 확률 분포 값이다. (이부분이 잘 이해가 안된다면 마지막에 딥러닝에서 fully connected layer가 오는 이유를 공부하시길 추천드린다.) 따라서 확률분포값인 x만 가져오기 위해서 y_pred[0]을 가져온다.

[테스트]

더보기
  • resnet50_config = ResNetConfig(block=Bottleneck, n_blocks=[3,4,6,3], channels=[64,64,256,512]) 으로 변경을하면 에러가 나는데, 이때의 x와 i의 shape를 각각 찍어보면 아래와 같이 뜬다.
class Bottleneck(nn.Module):    
    expansion = 4
    
    def __init__(self, in_channels, out_channels, stride = 1, downsample = False):
        super().__init__()    
        print('channel은?')
        print(in_channels)
        print(out_channels)
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 1, stride = 1, bias = False)
        self.bn1 = nn.BatchNorm2d(out_channels)        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, stride = stride, padding = 1, bias = False)
        self.bn2 = nn.BatchNorm2d(out_channels)        
        self.conv3 = nn.Conv2d(out_channels, self.expansion * out_channels, kernel_size = 1,
                               stride = 1, bias = False)
        self.bn3 = nn.BatchNorm2d(self.expansion * out_channels)        
        self.relu = nn.ReLU(inplace = True)
        
        if downsample:
            conv = nn.Conv2d(in_channels, self.expansion * out_channels, kernel_size = 1, 
                             stride = stride, bias = False)
            bn = nn.BatchNorm2d(self.expansion * out_channels)
            downsample = nn.Sequential(conv, bn)
        else:
            downsample = None            
        self.downsample = downsample
        
    def forward(self, x):  



        i = x        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)        
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)        
        x = self.conv3(x)
        x = self.bn3(x)
        
        
        print(self)
        print('x의 shape')
        print(x.shape)        
        if self.downsample is not None:
            i = self.downsample(i)
        print(x.shape) 

        print('i의 shape')
        print(i.shape)

        x += i
        x = self.relu(x)
    
        return x
  • x의 shape가 32, 256, 56, 56인 것은 ResNet의 처음 이미지 input이 224x224이며 ResNet모델 중간에 있는 MaxPool2d와 AdaptiveAvgPool2d 거치면서 사이즈가 각각 1/2씩 줄어들기 때문이다.  224 -> 112 -> 56
downsample 계산
64
4
64
downsample 계산
256
4
64
downsample 계산
256
4
256
downsample 계산
1024
4
512
===========================layer===========================
self.channel은
x의 shape
torch.Size([32, 256, 56, 56])
Sequential(
  (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
  (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)
i의 shape
torch.Size([32, 256, 56, 56])
self.channel은
x의 shape
torch.Size([32, 256, 56, 56])
None
i의 shape
torch.Size([32, 256, 56, 56])
self.channel은
x의 shape
torch.Size([32, 256, 56, 56])
None
i의 shape
torch.Size([32, 256, 56, 56])
===========================layer===========================
self.channel은
x의 shape
torch.Size([32, 256, 28, 28])
None
i의 shape
torch.Size([32, 256, 56, 56])
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-209-7ff42ab80a34> in <cell line: 14>()
     15     start_time = time.monotonic()
     16 
---> 17     train_loss, train_acc_1, train_acc_5 = train(model, train_iterator, optimizer, criterion, device)
     18     valid_loss, valid_acc_1, valid_acc_5 = evaluate(model, valid_iterator, criterion, device)
     19 

6 frames
<ipython-input-201-bb00d6a86280> in forward(self, x)
     45         print(i.shape)
     46 
---> 47         x += i
     48         x = self.relu(x)
     49 

RuntimeError: The size of tensor a (28) must match the size of tensor b (56) at non-singleton dimension 3
  • 에러가 나는 원인은, 다운샘플링이 진행되어야 하는데 진행되지 않았기 때문! 다운샘플링이 실행되는 조건은 다음과 같다.
if self.in_channels != block.expansion * channels:
      downsample = True
    else:
      downsample = False
  • 그러나 두 번째 레이어에서의 in_channels = 256, block.expansion = 4, channels =64 이기 므로 256 == 64*4이므로 다운샘플링이 진행되지 않아 아래와 같이 찍히게 되고,i에 오는 것이 downsample되지 않아 에러가 발생하는 것이다! 
x의 shape
torch.Size([32, 256, 28, 28])
None
i의 shape
torch.Size([32, 256, 56, 56])