딥러닝/딥러닝: 시계열 데이터

LSTM 종합 코드 정리

qordnswnd123 2025. 1. 28. 21:17

1. 모듈 임포트

import numpy as np
import pandas as pd

import torch
import random
import torch.nn as nn

from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import matplotlib.pyplot as plt

2. 함수 정의

# device 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 난수 시드 설정
def set_seed(seed_value):
    random.seed(seed_value)  # 파이썬 난수 생성기
    np.random.seed(seed_value)  # Numpy 난수 생성기
    torch.manual_seed(seed_value)  # PyTorch 난수 생성기

    # CUDA 환경에 대한 시드 설정 (GPU 사용 시)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed_value)
        torch.cuda.manual_seed_all(seed_value)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False


# 시퀀스 데이터 생성 함수
def build_sequence_dataset(df, seq_length):
    dataX = []
    dataY = []
    for i in range(0, len(df) - seq_length):
        _x = df.iloc[i:i + seq_length].values  # 시퀀스 데이터
        _y = df.iloc[i + seq_length]['temperature']  # 다음 포인트의 기온을 레이블로 사용
        dataX.append(_x)
        dataY.append(_y)
    return np.array(dataX), np.array(dataY)


class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMModel, self).__init__()

        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)  # 출력 크기 조정을 위한 선형 레이어

    def forward(self, x):
        batch_size = x.size(0)

        h0, c0 = self.init_hidden(batch_size, x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])  # 마지막 타임 스텝의 출력만 사용
        return out

    def init_hidden(self, batch_size, device):
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(device)
        return h0, c0


def train(model, train_loader, optimizer, criterion):
    model.train()  # 모델을 학습 모드로 설정
    total_loss = 0
    for data, target in train_loader:
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(train_loader)


def validate_model(model, test_loader, criterion):
    model.eval()  # 모델을 평가 모드로 설정
    total_loss = 0
    actuals = []
    predictions = []

    with torch.no_grad():  # 그라디언트 계산을 비활성화
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target.view(-1, 1))
            total_loss += loss.item()

            # 예측값과 실제값을 리스트에 저장
            actuals.extend(target.squeeze(1).tolist())
            predictions.extend(output.squeeze(1).tolist())

    # 손실 계산
    avg_loss = total_loss / len(test_loader)

    return avg_loss, actuals, predictions

3. 데이터 로드

##########################################################
#  데이터 로드
##########################################################
df_weather = pd.read_csv('sample_weather_data.csv')

##########################################################
#  피처 선택
##########################################################
features = ['humidity','rainfall','wspeed','temperature']
df_weather = df_weather[features]

4. 모델 학습과 검증

##########################################################
#  파라미터 설정
##########################################################
seed_value = 42
set_seed(seed_value)  # 위에서 정의한 함수 호출로 모든 시드 설정

# model parameter
num_output = 1
num_hidden = 10
num_features = len(features)


# hyper parameter
seq_length = 6  # 과거 6일의 데이터를 기반으로 다음날의 기온을 예측
batch_size = 32
learning_rate = 0.01


##########################################################
# 정규화를 위한 스케일러 초기화 및 적용
##########################################################
scaler = StandardScaler()
weather_scaled_arr = scaler.fit_transform(df_weather)

# DataFrame으로 변환 (스케일러는 numpy array를 반환하기 때문)
df_weather_scaled = pd.DataFrame(weather_scaled_arr, columns=features)

##########################################################
# 시퀀스 데이터 생성
##########################################################
sequence_dataX, sequence_dataY = build_sequence_dataset(df_weather_scaled, seq_length)

##########################################################
# train - test 데이터 분할
##########################################################

# sequence_dataX와 sequence_dataY를 사용하여 데이터를 학습 세트와 테스트 세트로 분할
train_X, test_X, train_Y, test_Y = train_test_split(
    sequence_dataX, sequence_dataY, test_size=0.2, shuffle = False
)

##########################################################
# 텐서로 데이터 변환
##########################################################

train_X_tensor = torch.tensor(train_X, dtype=torch.float32)
train_Y_tensor = torch.tensor(train_Y.reshape(-1, 1), dtype=torch.float32)
test_X_tensor = torch.tensor(test_X, dtype=torch.float32)
test_Y_tensor = torch.tensor(test_Y.reshape(-1, 1), dtype=torch.float32)

# TensorDataset 생성
train_dataset = TensorDataset(train_X_tensor, train_Y_tensor)
test_dataset = TensorDataset(test_X_tensor, test_Y_tensor)

# DataLoader 설정

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


##########################################################
# 모델 인스턴스 생성
##########################################################

model_lstm = LSTMModel(input_size=num_features, hidden_size=num_hidden, num_layers=1, output_size=num_output).to(device)

optimizer_lstm = torch.optim.Adam(model_lstm.parameters(), lr=learning_rate)
criterion_lstm = nn.MSELoss()

##########################################################
# 학습
##########################################################

# 각 에포크의 평균 손실을 저장할 리스트 초기화
train_loss_lst = []
test_loss_lst = []

max_epochs = 200
for epoch in range(max_epochs):
    train_loss = train(model_lstm, train_loader, optimizer_lstm, criterion_lstm)
    test_loss, actuals, predictions = validate_model(model_lstm, test_loader, criterion_lstm)

    train_loss_lst.append(train_loss)  # 손실 기록
    test_loss_lst.append(test_loss)  # 손실 기록

    if (epoch+1) % 10 == 0:
        print(f"epoch {epoch+1}: train loss(mse) = {train_loss:.4f}  test loss(mse) = {test_loss:.4f}")

print(f"학습 완료 : 총 {epoch+1} epoch")
epoch 10: train loss(mse) = 0.0467  test loss(mse) = 0.0563
epoch 20: train loss(mse) = 0.0447  test loss(mse) = 0.0498
epoch 30: train loss(mse) = 0.0385  test loss(mse) = 0.0541
epoch 40: train loss(mse) = 0.0348  test loss(mse) = 0.0533
epoch 50: train loss(mse) = 0.0322  test loss(mse) = 0.0515
epoch 60: train loss(mse) = 0.0334  test loss(mse) = 0.0609
epoch 70: train loss(mse) = 0.0252  test loss(mse) = 0.0568
epoch 80: train loss(mse) = 0.0233  test loss(mse) = 0.0537
epoch 90: train loss(mse) = 0.0237  test loss(mse) = 0.0613
epoch 100: train loss(mse) = 0.0201  test loss(mse) = 0.0566
epoch 110: train loss(mse) = 0.0175  test loss(mse) = 0.0648
epoch 120: train loss(mse) = 0.0170  test loss(mse) = 0.0672
epoch 130: train loss(mse) = 0.0170  test loss(mse) = 0.0704
epoch 140: train loss(mse) = 0.0171  test loss(mse) = 0.0734
epoch 150: train loss(mse) = 0.0173  test loss(mse) = 0.0785
epoch 160: train loss(mse) = 0.0115  test loss(mse) = 0.0890
epoch 170: train loss(mse) = 0.0116  test loss(mse) = 0.0904
epoch 180: train loss(mse) = 0.0101  test loss(mse) = 0.0988
epoch 190: train loss(mse) = 0.0089  test loss(mse) = 0.0954
epoch 200: train loss(mse) = 0.0084  test loss(mse) = 0.0975
학습 완료 : 총 200 epoch

5. epoch에 따른 학습-검증 데이터 손실 시각화

import matplotlib.pyplot as plt

# 서브플롯 생성
fig, axes = plt.subplots(nrows=2, ncols=1, figsize=(8, 8))  # nrows=2로 설정하여 두 개의 행을 가지는 서브플롯 생성

# 첫 번째 그래프: 에포크에 따른 손실
axes[0].plot(train_loss_lst, marker='o', linestyle='-', color='blue', label='Train Loss')
axes[0].plot(test_loss_lst, marker='o', linestyle='-', color='red', label='Test Loss')
axes[0].set_title('Epoch vs. Train/Valid Loss')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss')
axes[0].grid(True)
axes[0].legend()

# 두 번째 그래프: 실제값과 예측값
axes[1].plot(actuals, label='Actual Values', color='blue')
axes[1].plot(predictions, label='Predicted Values', color='red', alpha=0.5)
axes[1].set_title('Actual vs Predicted Values')
axes[1].set_xlabel('Sample Number')
axes[1].set_ylabel('Temperature')
axes[1].legend()

# 전체 그래프 레이아웃 조정
plt.tight_layout()
plt.show()

6. 모델 및 모델 파라미터, 실험환경 저장

model_params  = {
    'input_size' :num_features,
    'hidden_size' : num_hidden,
    'num_layers': 1,
    'output_size': num_output
}

training_params = {
    'batch_size' : batch_size,
    'sequence_length':seq_length,
    'learning_rate': learning_rate,
    'num_epochs': max_epochs,
    'seed':seed_value,
}

filepath = 'my_lstm_checkpoin_001.pth' 

torch.save({
    'model_state_dict': model_lstm.state_dict(),
    'optimizer_state_dict': optimizer_lstm.state_dict(),
    'model_params': model_params,  # 모델 생성 매개변수
    'training_params': training_params  # 학습 환경 매개변수
}, filepath)

7. 저장된 체크포인트로부터 모델과 옵티마이저 복원, 검증 데이터에 대한 예측

checkpoint = torch.load(filepath)

model_params = checkpoint['model_params']
training_params = checkpoint['training_params']

model_lstm = LSTMModel(**model_params)
model_lstm.load_state_dict(checkpoint['model_state_dict'])

optimizer_lstm = torch.optim.Adam(model_lstm.parameters(), lr=training_params['learning_rate'])
optimizer_lstm.load_state_dict(checkpoint['optimizer_state_dict']) 

# 검증 수행 및 결과 출력
test_loss, actuals, predictions = validate_model(model_lstm, test_loader, criterion_lstm)
print(f"테스트 손실: {test_loss:.4f}")
print("Actuals Sample:", actuals[:10])
print("Predictions Sample:", predictions[:10])


# 실제값과 예측값을 시각화
plt.figure(figsize=(8, 4))
plt.plot(actuals, label='Actual Values', color='blue')
plt.plot(predictions, label='Predicted Values', color='red', alpha=0.5)
plt.title('Actual vs Predicted Values')
plt.xlabel('Sample Number')
plt.ylabel('Temperature')
plt.legend()
plt.show()
테스트 손실: 0.0975
Actuals Sample: [1.079803228378296, 1.04249107837677, 1.0704751014709473, 1.2570358514785767, 1.145099401473999, 1.3130040168762207, 1.3596442937850952, 1.2290517091751099, 1.1637555360794067, 1.1264433860778809]
Predictions Sample: [0.9366627931594849, 1.0627059936523438, 1.0069472789764404, 1.0358588695526123, 1.2203190326690674, 1.1728713512420654, 1.3894400596618652, 0.4047238230705261, 0.7450530529022217, 0.8612487316131592]