딥러닝/딥러닝: 시계열 데이터
LSTM 종합 코드 정리
qordnswnd123
2025. 1. 28. 21:17
1. 모듈 임포트
import numpy as np
import pandas as pd
import torch
import random
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
2. 함수 정의
# device 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 난수 시드 설정
def set_seed(seed_value):
random.seed(seed_value) # 파이썬 난수 생성기
np.random.seed(seed_value) # Numpy 난수 생성기
torch.manual_seed(seed_value) # PyTorch 난수 생성기
# CUDA 환경에 대한 시드 설정 (GPU 사용 시)
if torch.cuda.is_available():
torch.cuda.manual_seed(seed_value)
torch.cuda.manual_seed_all(seed_value)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
# 시퀀스 데이터 생성 함수
def build_sequence_dataset(df, seq_length):
dataX = []
dataY = []
for i in range(0, len(df) - seq_length):
_x = df.iloc[i:i + seq_length].values # 시퀀스 데이터
_y = df.iloc[i + seq_length]['temperature'] # 다음 포인트의 기온을 레이블로 사용
dataX.append(_x)
dataY.append(_y)
return np.array(dataX), np.array(dataY)
class LSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, output_size):
super(LSTMModel, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size) # 출력 크기 조정을 위한 선형 레이어
def forward(self, x):
batch_size = x.size(0)
h0, c0 = self.init_hidden(batch_size, x.device)
out, _ = self.lstm(x, (h0, c0))
out = self.fc(out[:, -1, :]) # 마지막 타임 스텝의 출력만 사용
return out
def init_hidden(self, batch_size, device):
h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(device)
c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(device)
return h0, c0
def train(model, train_loader, optimizer, criterion):
model.train() # 모델을 학습 모드로 설정
total_loss = 0
for data, target in train_loader:
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
total_loss += loss.item()
return total_loss / len(train_loader)
def validate_model(model, test_loader, criterion):
model.eval() # 모델을 평가 모드로 설정
total_loss = 0
actuals = []
predictions = []
with torch.no_grad(): # 그라디언트 계산을 비활성화
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
loss = criterion(output, target.view(-1, 1))
total_loss += loss.item()
# 예측값과 실제값을 리스트에 저장
actuals.extend(target.squeeze(1).tolist())
predictions.extend(output.squeeze(1).tolist())
# 손실 계산
avg_loss = total_loss / len(test_loader)
return avg_loss, actuals, predictions
3. 데이터 로드
##########################################################
# 데이터 로드
##########################################################
df_weather = pd.read_csv('sample_weather_data.csv')
##########################################################
# 피처 선택
##########################################################
features = ['humidity','rainfall','wspeed','temperature']
df_weather = df_weather[features]
4. 모델 학습과 검증
##########################################################
# 파라미터 설정
##########################################################
seed_value = 42
set_seed(seed_value) # 위에서 정의한 함수 호출로 모든 시드 설정
# model parameter
num_output = 1
num_hidden = 10
num_features = len(features)
# hyper parameter
seq_length = 6 # 과거 6일의 데이터를 기반으로 다음날의 기온을 예측
batch_size = 32
learning_rate = 0.01
##########################################################
# 정규화를 위한 스케일러 초기화 및 적용
##########################################################
scaler = StandardScaler()
weather_scaled_arr = scaler.fit_transform(df_weather)
# DataFrame으로 변환 (스케일러는 numpy array를 반환하기 때문)
df_weather_scaled = pd.DataFrame(weather_scaled_arr, columns=features)
##########################################################
# 시퀀스 데이터 생성
##########################################################
sequence_dataX, sequence_dataY = build_sequence_dataset(df_weather_scaled, seq_length)
##########################################################
# train - test 데이터 분할
##########################################################
# sequence_dataX와 sequence_dataY를 사용하여 데이터를 학습 세트와 테스트 세트로 분할
train_X, test_X, train_Y, test_Y = train_test_split(
sequence_dataX, sequence_dataY, test_size=0.2, shuffle = False
)
##########################################################
# 텐서로 데이터 변환
##########################################################
train_X_tensor = torch.tensor(train_X, dtype=torch.float32)
train_Y_tensor = torch.tensor(train_Y.reshape(-1, 1), dtype=torch.float32)
test_X_tensor = torch.tensor(test_X, dtype=torch.float32)
test_Y_tensor = torch.tensor(test_Y.reshape(-1, 1), dtype=torch.float32)
# TensorDataset 생성
train_dataset = TensorDataset(train_X_tensor, train_Y_tensor)
test_dataset = TensorDataset(test_X_tensor, test_Y_tensor)
# DataLoader 설정
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
##########################################################
# 모델 인스턴스 생성
##########################################################
model_lstm = LSTMModel(input_size=num_features, hidden_size=num_hidden, num_layers=1, output_size=num_output).to(device)
optimizer_lstm = torch.optim.Adam(model_lstm.parameters(), lr=learning_rate)
criterion_lstm = nn.MSELoss()
##########################################################
# 학습
##########################################################
# 각 에포크의 평균 손실을 저장할 리스트 초기화
train_loss_lst = []
test_loss_lst = []
max_epochs = 200
for epoch in range(max_epochs):
train_loss = train(model_lstm, train_loader, optimizer_lstm, criterion_lstm)
test_loss, actuals, predictions = validate_model(model_lstm, test_loader, criterion_lstm)
train_loss_lst.append(train_loss) # 손실 기록
test_loss_lst.append(test_loss) # 손실 기록
if (epoch+1) % 10 == 0:
print(f"epoch {epoch+1}: train loss(mse) = {train_loss:.4f} test loss(mse) = {test_loss:.4f}")
print(f"학습 완료 : 총 {epoch+1} epoch")
epoch 10: train loss(mse) = 0.0467 test loss(mse) = 0.0563
epoch 20: train loss(mse) = 0.0447 test loss(mse) = 0.0498
epoch 30: train loss(mse) = 0.0385 test loss(mse) = 0.0541
epoch 40: train loss(mse) = 0.0348 test loss(mse) = 0.0533
epoch 50: train loss(mse) = 0.0322 test loss(mse) = 0.0515
epoch 60: train loss(mse) = 0.0334 test loss(mse) = 0.0609
epoch 70: train loss(mse) = 0.0252 test loss(mse) = 0.0568
epoch 80: train loss(mse) = 0.0233 test loss(mse) = 0.0537
epoch 90: train loss(mse) = 0.0237 test loss(mse) = 0.0613
epoch 100: train loss(mse) = 0.0201 test loss(mse) = 0.0566
epoch 110: train loss(mse) = 0.0175 test loss(mse) = 0.0648
epoch 120: train loss(mse) = 0.0170 test loss(mse) = 0.0672
epoch 130: train loss(mse) = 0.0170 test loss(mse) = 0.0704
epoch 140: train loss(mse) = 0.0171 test loss(mse) = 0.0734
epoch 150: train loss(mse) = 0.0173 test loss(mse) = 0.0785
epoch 160: train loss(mse) = 0.0115 test loss(mse) = 0.0890
epoch 170: train loss(mse) = 0.0116 test loss(mse) = 0.0904
epoch 180: train loss(mse) = 0.0101 test loss(mse) = 0.0988
epoch 190: train loss(mse) = 0.0089 test loss(mse) = 0.0954
epoch 200: train loss(mse) = 0.0084 test loss(mse) = 0.0975
학습 완료 : 총 200 epoch
5. epoch에 따른 학습-검증 데이터 손실 시각화
import matplotlib.pyplot as plt
# 서브플롯 생성
fig, axes = plt.subplots(nrows=2, ncols=1, figsize=(8, 8)) # nrows=2로 설정하여 두 개의 행을 가지는 서브플롯 생성
# 첫 번째 그래프: 에포크에 따른 손실
axes[0].plot(train_loss_lst, marker='o', linestyle='-', color='blue', label='Train Loss')
axes[0].plot(test_loss_lst, marker='o', linestyle='-', color='red', label='Test Loss')
axes[0].set_title('Epoch vs. Train/Valid Loss')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss')
axes[0].grid(True)
axes[0].legend()
# 두 번째 그래프: 실제값과 예측값
axes[1].plot(actuals, label='Actual Values', color='blue')
axes[1].plot(predictions, label='Predicted Values', color='red', alpha=0.5)
axes[1].set_title('Actual vs Predicted Values')
axes[1].set_xlabel('Sample Number')
axes[1].set_ylabel('Temperature')
axes[1].legend()
# 전체 그래프 레이아웃 조정
plt.tight_layout()
plt.show()
6. 모델 및 모델 파라미터, 실험환경 저장
model_params = {
'input_size' :num_features,
'hidden_size' : num_hidden,
'num_layers': 1,
'output_size': num_output
}
training_params = {
'batch_size' : batch_size,
'sequence_length':seq_length,
'learning_rate': learning_rate,
'num_epochs': max_epochs,
'seed':seed_value,
}
filepath = 'my_lstm_checkpoin_001.pth'
torch.save({
'model_state_dict': model_lstm.state_dict(),
'optimizer_state_dict': optimizer_lstm.state_dict(),
'model_params': model_params, # 모델 생성 매개변수
'training_params': training_params # 학습 환경 매개변수
}, filepath)
7. 저장된 체크포인트로부터 모델과 옵티마이저 복원, 검증 데이터에 대한 예측
checkpoint = torch.load(filepath)
model_params = checkpoint['model_params']
training_params = checkpoint['training_params']
model_lstm = LSTMModel(**model_params)
model_lstm.load_state_dict(checkpoint['model_state_dict'])
optimizer_lstm = torch.optim.Adam(model_lstm.parameters(), lr=training_params['learning_rate'])
optimizer_lstm.load_state_dict(checkpoint['optimizer_state_dict'])
# 검증 수행 및 결과 출력
test_loss, actuals, predictions = validate_model(model_lstm, test_loader, criterion_lstm)
print(f"테스트 손실: {test_loss:.4f}")
print("Actuals Sample:", actuals[:10])
print("Predictions Sample:", predictions[:10])
# 실제값과 예측값을 시각화
plt.figure(figsize=(8, 4))
plt.plot(actuals, label='Actual Values', color='blue')
plt.plot(predictions, label='Predicted Values', color='red', alpha=0.5)
plt.title('Actual vs Predicted Values')
plt.xlabel('Sample Number')
plt.ylabel('Temperature')
plt.legend()
plt.show()
테스트 손실: 0.0975
Actuals Sample: [1.079803228378296, 1.04249107837677, 1.0704751014709473, 1.2570358514785767, 1.145099401473999, 1.3130040168762207, 1.3596442937850952, 1.2290517091751099, 1.1637555360794067, 1.1264433860778809]
Predictions Sample: [0.9366627931594849, 1.0627059936523438, 1.0069472789764404, 1.0358588695526123, 1.2203190326690674, 1.1728713512420654, 1.3894400596618652, 0.4047238230705261, 0.7450530529022217, 0.8612487316131592]