순환 신경망 구현
순환 신경망을 구현여 텍스트를 분류하는 실습을 해보자.
IMDB 데이터 세트는 인터넷 영화 데이터 베이스(Internet Movie Database)에서 수집한 영화 리뷰 데이터이다.
순환 신경망으로 이 리뷰들이 긍정적인지 부정적인지 판별해보자.
텐서플로에서 IMDB 데이터 세트 불러오기
import numpy as np
from tensorflow.keras.datasets import imdb
(x_train_all, y_train_all), (x_test, y_test) = imdb.load_data(skip_top=20, num_words=100)
load_data() 함수를 이용하여 데이터를 불러온다.
skip_top 매개변수에는 가장 많이 등장한 단어들 중 건너뛸 단어의 개수를 지정할 수 있다.
예를 들어 a, the, is 등과 같은 단어들은 영화 리뷰에 많이 등장하지만 분석에 유용하지 않으므로 건너뛴다.
num_words 매개변수는 훈련에 사용할 단어의 개수를 지정한다.
print(x_train_all.shape, y_train_all.shape)
훈련 세트 샘플 확인
훈련 세트의 샘플을 확인해보자.
print(x_train_all[0])
영단어가 아닌 정수 형태로 저장되어 있다.
이 정수들은 영단어를 고유한 정수에 일대일 대응한 것이다. BoW(Bag of Word) 혹은 어휘 사전이라고 부른다.
가장 많이 등장하는 영단어 20개를 건너뛰고 100개의 단어만 선택했기 때문에 사전에 없는 영단어가 많다.
숫자 2는 어휘 사전에 없는 단어이다. 추가로 0과 1은 각각 패딩과 글의 시작을 나타내는 데 사용한다.
따라서 이를 제외시켜야 한다.
for i in range(len(x_train_all)):
x_train_all[i] = [w for w in x_train_all[i] if w > 2]
print(x_train_all[0])
어휘 사전 내려받기
훈련 세트를 쉽게 이해할 수 있도록 영단어로 바꿔보자.
정수를 영단어로 바꾸려면 어휘사전을 내려받아 일대일로 다시 맵핑하면 된다.
어휘 사전은 get_word_index() 함수로 내려받을 수 있다.
word_to_index = imdb.get_word_index()
word_to_index['movie']
movie라는 영단어는 17이라는 정수에 대응되어 있음을 알 수 있다.
index_to_word = {word_to_index[k]: k for k in word_to_index}
for w in x_train_all[0]:
print(index_to_word[w - 3], end=' ')
3 이상부터 영단어를 의미하므로 3을 뺀 값을 어휘 사전의 인덱스로 사용해야 한다.
훈련 샘플의 길이를 확인해보자.
훈련 세트의 입력 데이터는 넘파이 배열이 아니라 파이썬 리스트이다.
각 리뷰들의 길이가 달라 샘플의 길이가 다르다.
print(len(x_train_all[0]), len(x_train_all[1]))
타깃은 어떻게 되어있을지 살펴보자.
print(y_train_all[:10])
1은 긍정 0은 부정을 의미한다.
데이터 전처리
검증세트를 준비하고 샘플의 길이를 맞추는 과정을 진행해야 원활한 학습이 가능하다.
np.random.seed(42)
random_index = np.random.permutation(25000)
x_train = x_train_all[[random_index[:20000]]]
y_train = y_train_all[[random_index[:20000]]]
x_val = x_train_all[[random_index[20000:]]]
y_val = y_train_all[[random_index[20000:]]]
index를 섞어 훈련 세트와 검증 세트를 각각 20000, 5000 개로 나누었다.
이제 샘플의 길이를 맞추자.
일정 길이가 넘으면 샘플을 잘라버리고 길이가 모자라면 0으로 채우는 방식으로 길이를 조절하자.
이때 0을 왼쪽에 추가해야 하는데 오른쪽에 0을 추가하면 이후 샘플이 순환 신경망에 주입될 때 0 이 마지막에 주입되므로 모델의 성능이 좋지 않을 것이다.
from tensorflow.keras.preprocessing import sequence
maxlen=100
x_train_seq = sequence.pad_sequences(x_train, maxlen=maxlen)
x_val_seq = sequence.pad_sequences(x_val, maxlen=maxlen)
print(x_train_seq.shape, x_val_seq.shape)
샘플의 길이를 변경한 훈련 세트의 첫 번째 샘플을 확인하면 왼쪽에 0이 채워져 있다.
print(x_train_seq[0])
샘플 원-핫 인코딩하기
from tensorflow.keras.utils import to_categorical
x_train_onehot = to_categorical(x_train_seq)
x_val_onehot = to_categorical(x_val_seq)
print(x_train_onehot.shape)
샘플을 100개의 단어로 제한했지만 x_train_onehot의 크기를 확인해 보면 760MB에 근접하다.
메모리가 많이 필요하다.
print(x_train_onehot.nbytes)
모델 만들기
MinBatchNetwork 클래스를 기반으로 순환 신경망을 파이썬으로 직접 구현해보자.
__init__() 메소드 수정
class RecurrentNetwork:
def __init(self, n_cells=10, batch_size=32, learning_rate=0.1):
self.n_cells = n_cells #셀 개수
self.batch_size = batch_size #배치 크기
self.w1h = None #은닉 상태에 대한 가중치
self.w1x = None #입력에 대한 가중치
self.b1 = None #순환층의 절편
self.w2 = None #순환층의 가중치
self.b2 = None #출력층의 절편
self.h = None #순환층의 활성화 출력
self.losses = [] #훈련 손실
self.val_losses = [] #검증 손실
self.lr = learning_rate #학습률
은닉층의 개수 대신 셀 개수를 입력받는다.
그리고 셀에 필요한 가중치 w1h, w1x를 선언한다. 타임 스텝을 거슬러 그레이디언트를 전파하려면 순환층의 활성화 출력을 모두 가지고 있어야 하므로 변수 h를 선언한다.
init_weights() 메서드 수정
def init_weights(self, n_features, n_clasees):
orth_init = tf.initializers.Orthgonal()
glorot_init = tf.initializers.GlorotUniform()
self.w1h = orth_init((self.n_celss, self.n_cells)).numpy() #(셀 개수, 셀 개수)
self.w1x = glorot_init((n_features, self.n_cells)).numpy() #(특성 개수, 셀 개수)
self.b1 = np.zeros(self.n_cells) #은닉층의 크기
self.w2 = glorot_init((self.n_cells, n_classes)).numpy() #(셀 개수, 클래스 개수)
self.b2 = np.zeros(n_classes)
가중치 초기화의 중요성은 이전에 배웠다.
순환 신경망에서는 직교 행렬 초기화(orthgonal initialization)를 사용한다. 직교 행렬 초기화는 순환 셀에서 은닉 상태를 위한 가중치가 반복해서 곱해질 때 너무 커지거나 작아지지 않도록 만들어 준다.
정방향 계산
def forpass(self, x):
self.h = [np.zeros((x.shape[0], self.n_cells))] #은닉 상태를 초기화
seq = np.swapaxes(x, 0, 1)
for x in seq:
z1 = np.dot(x, self.w1x) + np.dot(self.h[-1], self.w1h) + self.b1
h = np.tanh(z1) #활성화 함수 적용
self.h.append(h) #역전파를 위한 은닉 상태 저장
z2 = np.dot(h, self.w2) + self.b2 #출력층의 선형식 계산
return z2
각 타임 스텝의 은닉 상태를 저장하기 위한 변수 h를 초기화한다. 이때 은닉 상태의 크기는 (샘플 개수, 셀 개수)이다.
역전파 과정을 진행할 때 이전 타임 스텝의 은닉 상태를 사용한다.
첫 번째 타임 스텝의 이전 은닉 상태는 없으므로 변수 h의 첫 번째 요소에 0으로 채워진 배열을 추가한다.
그런 다음 넘파이의 swapaxes() 함수를 사용하여 입력 x의 첫 번째 배치 차원과 두 번째 타입 스텝 차원을 바꾼다.
정방향 계산을 할 때는 한 샘플의 모든 타입 스텝을 처리하고 그다음에 샘플을 처리하는 방식이 아니다.
미니 배치 안에 있는 모든 샘플의 첫 번째 타임 스텝을 한 번에 처리하고 두 번째 타입 스텝을 한 번에 처리해야 한다.
마지막으로 정방향 계산을 수행하고 계산된 은닉 상태는 변수 h에 순서대로 추가한다.
역방향 계산
모든 샘플의 타임 스텝을 한 번에 처리하기 위해 배치 차원과 타임 스텝 차원을 바꾼다.
err_to_cell 변수에 저장되는 값은 Z_1에 대하여 손실 함수를 미분한 도함수의 결괏값이다.
def backprop(self, x, err):
m = len(x) #샘플 개수
#출력층의 가중치와 절편에 대한 그레이디언트 계산
w2_grad = np.dot(self.h[-1].T, err) / m
b2_grad = np.sum(err) / m
#배치 차원과 타임 스텝 차원 바꾸기
seq = np.swapaxes(x, 0, 1)
w1h_grad = w1x_grad = b1_grad = 0
#셀 지전까지 그레이디언트 계산
err_to_cell = np.dot(err, self.w2.T) * (1 - self.h[-1]**2)
#모든 타임 스텝을 거슬러 가며 그레이디언트 전파
for x, h in zip(seq[::-1][:10], self.h[:-1][::-1][:10]):
w1h_grad += np.dot(h.T, err_to_cell)
w1x_grad += np.dot(x.T, err_to_cell)
b1_grad += np.sum(err_to_cell, axis=0)
#이전 타임 스텝의 셀 직전까지 그레이디언트 계산
err_to_cell = np.dot(err_to_cell, self.w1h) * (1 - h**2)
w1h_grad /= m
w1x_grad /= m
b1_grad /= m
return w1h_grad, w1x_grad, b1_grad, w2_grad, b2_grad
for문에서 슬라이싱 연산을 수행했다. 그레이디언트는 마지막 타임 스텝부터 거꾸로 적용해야 하므로 seq [::-1]을 사용했다. 은닉 상태를 저장한 h 변수의 마지막 항목은 for문 이전에 err_to_cell 변수를 계산하기 위해 사용했기 때문에 이를 제외하고 self.h [:-1][::-1]와 같이 거꾸로 뒤집었다. seq 넘파이 배열과 self.h 리스트를 거꾸로 뒤집은 다음 모든 타임 스텝을 거슬러 올라가지 않는다. 딱 10개의 타임 스텝만 거슬러 진행한다.
순환 신경망은 타임 스텝을 거슬러 올라가며 그레이디언트를 전파할 때 동일한 가중치를 반복적으로 곱한다.
이로 인해 그레이디언트가 너무 커지거나 작아지는 문제가 발생하기 쉽다. 이를 방지하기 위해 그레이디언트를 전파하는 타임 스텝의 수를 제한해야 하는데 이를 TBPTT(Truncated Backpropagation Through Time)라고 한다.
그다음 W_1h의 그레이디언트(w1h_grad)를 구하기 위해 Z_1에 대한 손실 함수의 미분 값(err_to_cell)에 다음 식을 곱한다.
err_to_cell에 동일한 형태를 반복해서 곱함으로써 w1h_grad를 구하는 식을 H와 err_to_cell만의 곱으로 단순화했다.
나머지 메서드를 수정해 완성시켜 보자.
class RecurrentNetwork(MinibatchNetwork):
def __init__(self, n_cells=10, batch_size=32, learning_rate=0.1):
self.n_cells = n_cells #셀 개수
self.batch_size = batch_size #배치 크기
self.w1h = None #은닉 상태에 대한 가중치
self.w1x = None #입력에 대한 가중치
self.b1 = None #순환층의 절편
self.w2 = None #순환층의 가중치
self.b2 = None #출력층의 절편
self.h = None #순환층의 활성화 출력
self.losses = [] #훈련 손실
self.val_losses = [] #검증 손실
self.lr = learning_rate #학습률
def init_weights(self, n_features, n_classes):
orth_init = tf.initializers.Orthogonal()
glorot_init = tf.initializers.GlorotUniform()
self.w1h = orth_init((self.n_cells, self.n_cells)).numpy() #(셀 개수, 셀 개수)
self.w1x = glorot_init((n_features, self.n_cells)).numpy() #(특성 개수, 셀 개수)
self.b1 = np.zeros(self.n_cells) #은닉층의 크기
self.w2 = glorot_init((self.n_cells, n_classes)).numpy() #(셀 개수, 클래스 개수)
self.b2 = np.zeros(n_classes)
def forpass(self, x):
self.h = [np.zeros((x.shape[0], self.n_cells))] #은닉 상태를 초기화
seq = np.swapaxes(x, 0, 1)
for x in seq:
z1 = np.dot(x, self.w1x) + np.dot(self.h[-1], self.w1h) + self.b1
h = np.tanh(z1) #활성화 함수 적용
self.h.append(h) #역전파를 위한 은닉 상태 저장
z2 = np.dot(h, self.w2) + self.b2 #출력층의 선형식 계산
return z2
def backprop(self, x, err):
m = len(x) #샘플 개수
#출력층의 가중치와 절편에 대한 그레이디언트 계산
w2_grad = np.dot(self.h[-1].T, err) / m
b2_grad = np.sum(err) / m
#배치 차원과 타임 스텝 차원 바꾸기
seq = np.swapaxes(x, 0, 1)
w1h_grad = w1x_grad = b1_grad = 0
#셀 지전까지 그레이디언트 계산
err_to_cell = np.dot(err, self.w2.T) * (1 - self.h[-1]**2)
#모든 타임 스텝을 거슬러 가며 그레이디언트 전파
for x, h in zip(seq[::-1][:10], self.h[:-1][::-1][:10]):
w1h_grad += np.dot(h.T, err_to_cell)
w1x_grad += np.dot(x.T, err_to_cell)
b1_grad += np.sum(err_to_cell, axis=0)
#이전 타임 스텝의 셀 직전까지 그레이디언트 계산
err_to_cell = np.dot(err_to_cell, self.w1h) * (1 - h**2)
w1h_grad /= m
w1x_grad /= m
b1_grad /= m
return w1h_grad, w1x_grad, b1_grad, w2_grad, b2_grad
def fit(self, x, y, epochs=100, x_val=None, y_val=None):
y = y.reshape(-1,1)
y_val = y_val.reshape(-1,1) #타깃 열 벡터로 변환
np.random.seed(42)
self.init_weights(x.shape[2], y.shape[1]) #은닉층과 출력층 가중치 초기화
for i in range(epochs):
print('에포크', i, end=' ')
batch_losses = []
for x_batch, y_batch in self.gen_batch(x, y):
print('.', end=' ')
a = self.training(x_batch, y_batch)
a = np.clip(a, 1e-10,1-1e-10) #클리핑
loss = np.mean(-(y_batch*np.log(a) + (1-y_batch)*np.log(1-a)))
batch_losses.append(loss)
print( )
self.losses.append(np.mean(batch_losses))
self.update_val_loss(x_val, y_val) #검증 손실 계산
def training(self, x, y):
m = len(x)
z = self.forpass(x) #정방향 계산
a = self.activation(z) #활성화 함수
err = -(y - a) #오차 계산
w1h_grad, w1x_grad, b1_grad, w2_grad, b2_grad = self.backprop(x, err)
#셀의 가중치와 절편을 업데이트
self.w1h -= self.lr * w1h_grad
self.w1x -= self.lr * w1x_grad
self.b1 -= self.lr * b1_grad
#출력층 가중치 절편 업데이트
self.w2 -= self.lr * w2_grad
self.b2 -= self.lr * b2_grad
return a
def update_val_loss(self, x_val, y_val):
z = self.forpass(x_val) #정방향 계산
a = self.activation(z) #활성화 함수
a = np.clip(a, 1e-10,1-1e-10) #클리핑
val_loss = np.mean(-(y_val*np.log(a) + (1-y_val)*np.log(1-a)))
self.val_losses.append(val_loss)
훈련 및 테스트
rn = RecurrentNetwork(n_cells=32, batch_size=32, learning_rate=0.01)
rn.fit(x_train_onehot, y_train, epochs=20, x_val=x_val_onehot, y_val=y_val)
plt.plot(rn.losses)
plt.plot(rn.val_losses)
plt.ylabel("loss")
plt.xlabel("iteration")
plt.legend(['train_loss', 'val_loss'])
plt.show()
rn.score(x_val_onehot, y_val)
성능이 그렇게 좋지는 않다.