Postsもっと読む
CNN Variational Autoencoder
PyTorch CNN VAEのサンプルコード解説
MNISTデータセットを使ったCNN版Variational Autoencoder(VAE)のコードを解説します。
完全なサンプルコード
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
# ハイパーパラメータ
BATCH_SIZE = 128
EPOCHS = 10
LEARNING_RATE = 1e-3
LATENT_DIM = 20 # 潜在変数の次元数
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# データセットの準備
transform = transforms.Compose([
transforms.ToTensor(),
])
train_dataset = datasets.MNIST('~/.pytorch/data', train=True, download=True, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
# VAEモデルの定義
class VAE(nn.Module):
def __init__(self, latent_dim=20):
super(VAE, self).__init__()
# エンコーダー(画像 → 潜在変数)
self.encoder = nn.Sequential(
nn.Conv2d(1, 32, kernel_size=3, stride=2, padding=1), # 28x28 -> 14x14
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1), # 14x14 -> 7x7
nn.ReLU(),
nn.Flatten(), # 64*7*7 = 3136
)
# 潜在変数の平均と分散を出力
self.fc_mu = nn.Linear(64*7*7, latent_dim)
self.fc_logvar = nn.Linear(64*7*7, latent_dim)
# デコーダー(潜在変数 → 画像)
self.decoder_input = nn.Linear(latent_dim, 64*7*7)
self.decoder = nn.Sequential(
nn.Unflatten(1, (64, 7, 7)), # 3136 -> 64x7x7
nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1), # 7x7 -> 14x14
nn.ReLU(),
nn.ConvTranspose2d(32, 1, kernel_size=3, stride=2, padding=1, output_padding=1), # 14x14 -> 28x28
nn.Sigmoid(), # [0, 1]の範囲に正規化
)
def encode(self, x):
"""エンコーダー部分"""
h = self.encoder(x)
mu = self.fc_mu(h)
logvar = self.fc_logvar(h)
return mu, logvar
def reparameterize(self, mu, logvar):
"""再パラメータ化トリック: z = μ + σ * ε"""
std = torch.exp(0.5 * logvar) # 標準偏差
eps = torch.randn_like(std) # 標準正規分布からサンプリング
z = mu + eps * std
return z
def decode(self, z):
"""デコーダー部分"""
h = self.decoder_input(z)
reconstruction = self.decoder(h)
return reconstruction
def forward(self, x):
"""順伝播"""
mu, logvar = self.encode(x)
z = self.reparameterize(mu, logvar)
reconstruction = self.decode(z)
return reconstruction, mu, logvar
# 損失関数の定義
def vae_loss(recon_x, x, mu, logvar):
"""
VAEの損失関数 = 再構成誤差 + KLダイバージェンス
"""
# 再構成誤差(Binary Cross Entropy)
recon_loss = F.binary_cross_entropy(recon_x, x, reduction='sum')
# KLダイバージェンス
# KL(N(μ,σ²) || N(0,1)) = -0.5 * Σ(1 + log(σ²) - μ² - σ²)
kl_divergence = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
return recon_loss + kl_divergence
# モデルの初期化
model = VAE(latent_dim=LATENT_DIM).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
# 学習ループ
def train(epoch):
model.train()
train_loss = 0
for batch_idx, (data, _) in enumerate(train_loader):
data = data.to(device)
optimizer.zero_grad()
# 順伝播
recon_batch, mu, logvar = model(data)
# 損失計算
loss = vae_loss(recon_batch, data, mu, logvar)
# 逆伝播
loss.backward()
train_loss += loss.item()
optimizer.step()
if batch_idx % 100 == 0:
print(f'Epoch {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}] '
f'Loss: {loss.item() / len(data):.4f}')
avg_loss = train_loss / len(train_loader.dataset)
print(f'====> Epoch: {epoch} Average loss: {avg_loss:.4f}')
# 学習実行
for epoch in range(1, EPOCHS + 1):
train(epoch)
# 生成画像の可視化
model.eval()
with torch.no_grad():
# ランダムサンプリングから生成
z = torch.randn(64, LATENT_DIM).to(device)
sample = model.decode(z).cpu()
# 画像表示
fig, axes = plt.subplots(8, 8, figsize=(10, 10))
for i, ax in enumerate(axes.flat):
ax.imshow(sample[i].squeeze(), cmap='gray')
ax.axis('off')
plt.tight_layout()
plt.savefig('vae_generated_samples.png')
plt.show()
# 再構成画像の可視化
with torch.no_grad():
data, _ = next(iter(train_loader))
data = data[:8].to(device)
recon, _, _ = model(data)
# 元画像と再構成画像を比較
fig, axes = plt.subplots(2, 8, figsize=(15, 4))
for i in range(8):
# 元画像
axes[0, i].imshow(data[i].cpu().squeeze(), cmap='gray')
axes[0, i].axis('off')
# 再構成画像
axes[1, i].imshow(recon[i].cpu().squeeze(), cmap='gray')
axes[1, i].axis('off')
axes[0, 0].set_ylabel('Original', size=20)
axes[1, 0].set_ylabel('Reconstructed', size=20)
plt.tight_layout()
plt.savefig('vae_reconstruction.png')
plt.show()
主要な構成要素の解説
1. エンコーダー(Encoder)
self.encoder = nn.Sequential(
nn.Conv2d(1, 32, kernel_size=3, stride=2, padding=1),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
nn.ReLU(),
nn.Flatten(),
)
- 入力画像を畳み込み層で特徴抽出
- 28×28 → 14×14 → 7×7 と縮小
- 平均(μ)と分散(logvar)を出力
2. 再パラメータ化トリック
def reparameterize(self, mu, logvar):
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std)
z = mu + eps * std
return z
- 目的: 確率的なサンプリングでも勾配が伝播できるようにする
- ε ~ N(0,1) を使って z = μ + σε と変換
3. デコーダー(Decoder)
self.decoder = nn.Sequential(
nn.ConvTranspose2d(64, 32, ...),
nn.ConvTranspose2d(32, 1, ...),
nn.Sigmoid(),
)
- 潜在変数から画像を再構成
- ConvTranspose2d(転置畳み込み)で画像を拡大
4. VAE損失関数
loss = 再構成誤差 + KLダイバージェンス
- 再構成誤差: 元画像と再構成画像の差
- KLダイバージェンス: 潜在変数分布をN(0,1)に近づける正則化項
実行結果
このコードを実行すると:
Postsもっと読む
Pytorchで敵対生成ネットワーク(GAN)
PyTorch + GAN + MNISTサンプルコードの詳細解説
完全なサンプルコード
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import numpy as np
# デバイスの設定
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'使用デバイス: {device}')
# ハイパーパラメータ
latent_dim = 100 # 潜在空間の次元数
img_size = 28 # 画像サイズ
channels = 1 # チャンネル数(グレースケール)
batch_size = 128
learning_rate = 0.0002
num_epochs = 50
beta1 = 0.5 # Adam最適化のβ1パラメータ
# データの準備
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize([0.5], [0.5]) # [-1, 1]に正規化
])
train_dataset = datasets.MNIST(
root='./data',
train=True,
download=True,
transform=transform
)
dataloader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=2
)
# Generatorの定義
class Generator(nn.Module):
def __init__(self):
super(Generator, self).__init__()
# ノイズから画像を生成するネットワーク
self.model = nn.Sequential(
# 入力: latent_dim次元のノイズベクトル
nn.Linear(latent_dim, 256),
nn.LeakyReLU(0.2, inplace=True),
nn.Linear(256, 512),
nn.BatchNorm1d(512),
nn.LeakyReLU(0.2, inplace=True),
nn.Linear(512, 1024),
nn.BatchNorm1d(1024),
nn.LeakyReLU(0.2, inplace=True),
# 出力: 28*28 = 784次元
nn.Linear(1024, img_size * img_size * channels),
nn.Tanh() # [-1, 1]の範囲に出力
)
def forward(self, z):
img = self.model(z)
img = img.view(img.size(0), channels, img_size, img_size)
return img
# Discriminatorの定義
class Discriminator(nn.Module):
def __init__(self):
super(Discriminator, self).__init__()
# 画像が本物か偽物かを判定するネットワーク
self.model = nn.Sequential(
# 入力: 28*28 = 784次元の画像
nn.Linear(img_size * img_size * channels, 512),
nn.LeakyReLU(0.2, inplace=True),
nn.Linear(512, 256),
nn.LeakyReLU(0.2, inplace=True),
# 出力: 本物である確率
nn.Linear(256, 1),
nn.Sigmoid()
)
def forward(self, img):
img_flat = img.view(img.size(0), -1)
validity = self.model(img_flat)
return validity
# モデルのインスタンス化
generator = Generator().to(device)
discriminator = Discriminator().to(device)
# 損失関数
adversarial_loss = nn.BCELoss()
# オプティマイザー
optimizer_G = optim.Adam(generator.parameters(), lr=learning_rate, betas=(beta1, 0.999))
optimizer_D = optim.Adam(discriminator.parameters(), lr=learning_rate, betas=(beta1, 0.999))
# 学習ループ
print("学習開始...")
for epoch in range(num_epochs):
for i, (real_imgs, _) in enumerate(dataloader):
# ラベルの準備
batch_size_current = real_imgs.size(0)
real_imgs = real_imgs.to(device)
# 本物と偽物のラベル
real_labels = torch.ones(batch_size_current, 1).to(device)
fake_labels = torch.zeros(batch_size_current, 1).to(device)
# ---------------------
# Discriminatorの学習
# ---------------------
optimizer_D.zero_grad()
# 本物の画像に対する損失
real_output = discriminator(real_imgs)
d_loss_real = adversarial_loss(real_output, real_labels)
# 偽物の画像を生成
z = torch.randn(batch_size_current, latent_dim).to(device)
fake_imgs = generator(z)
# 偽物の画像に対する損失
fake_output = discriminator(fake_imgs.detach())
d_loss_fake = adversarial_loss(fake_output, fake_labels)
# 合計損失
d_loss = d_loss_real + d_loss_fake
d_loss.backward()
optimizer_D.step()
# -----------------
# Generatorの学習
# -----------------
optimizer_G.zero_grad()
# Generatorは偽物をDiscriminatorに本物と判定させたい
fake_output = discriminator(fake_imgs)
g_loss = adversarial_loss(fake_output, real_labels)
g_loss.backward()
optimizer_G.step()
# 進捗表示
if i % 100 == 0:
print(f"[Epoch {epoch}/{num_epochs}] [Batch {i}/{len(dataloader)}] "
f"[D loss: {d_loss.item():.4f}] [G loss: {g_loss.item():.4f}]")
# エポック終了ごとに生成画像を保存
if epoch % 5 == 0:
with torch.no_grad():
z = torch.randn(16, latent_dim).to(device)
generated_imgs = generator(z).cpu()
fig, axes = plt.subplots(4, 4, figsize=(8, 8))
for idx, ax in enumerate(axes.flat):
img = generated_imgs[idx].squeeze().numpy()
img = (img + 1) / 2 # [-1, 1] -> [0, 1]
ax.imshow(img, cmap='gray')
ax.axis('off')
plt.suptitle(f'Epoch {epoch}')
plt.tight_layout()
plt.savefig(f'generated_epoch_{epoch}.png')
plt.close()
print("学習完了!")
# 最終的な生成画像の表示
with torch.no_grad():
z = torch.randn(25, latent_dim).to(device)
generated_imgs = generator(z).cpu()
fig, axes = plt.subplots(5, 5, figsize=(10, 10))
for idx, ax in enumerate(axes.flat):
img = generated_imgs[idx].squeeze().numpy()
img = (img + 1) / 2
ax.imshow(img, cmap='gray')
ax.axis('off')
plt.suptitle('最終生成画像')
plt.tight_layout()
plt.savefig('final_generated_images.png')
plt.show()
詳細解説
1. GANの基本概念
Generator (生成器) Discriminator (識別器)
↓ ↓
偽の画像を生成 本物/偽物を判定
↓ ↓
互いに競争しながら学習
2. データの正規化
transforms.Normalize([0.5], [0.5])
- MNIST画像を
[-1, 1]の範囲に正規化 - 計算式:
(x - 0.5) / 0.5
3. Generator(生成器)の役割
入力: ランダムノイズ (100次元)
↓
全結合層 + 活性化
↓
徐々に次元を拡大
↓
出力: 28×28の画像 (784次元)
重要なポイント:
Postsもっと読む
Vision Transformer(ViT)画像分類
PyTorch Vision Transformer画像分類サンプルコード解説
Vision Transformerの実装と解説をします。
1. 基本的な実装
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
# ===== パッチ埋め込み層 =====
class PatchEmbedding(nn.Module):
"""画像をパッチに分割し、埋め込みベクトルに変換"""
def __init__(self, img_size=224, patch_size=16, in_channels=3, embed_dim=768):
super().__init__()
self.img_size = img_size
self.patch_size = patch_size
self.n_patches = (img_size // patch_size) ** 2
# 畳み込みでパッチ埋め込みを実現
self.proj = nn.Conv2d(
in_channels,
embed_dim,
kernel_size=patch_size,
stride=patch_size
)
def forward(self, x):
# x: (B, C, H, W) → (B, embed_dim, n_patches**0.5, n_patches**0.5)
x = self.proj(x)
# (B, embed_dim, H', W') → (B, embed_dim, n_patches)
x = x.flatten(2)
# (B, embed_dim, n_patches) → (B, n_patches, embed_dim)
x = x.transpose(1, 2)
return x
# ===== Multi-Head Attention =====
class MultiHeadAttention(nn.Module):
def __init__(self, embed_dim=768, num_heads=12, dropout=0.1):
super().__init__()
self.embed_dim = embed_dim
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads
self.scale = self.head_dim ** -0.5
# Query, Key, Value の線形変換
self.qkv = nn.Linear(embed_dim, embed_dim * 3)
self.proj = nn.Linear(embed_dim, embed_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
B, N, C = x.shape
# QKV計算
qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, self.head_dim)
qkv = qkv.permute(2, 0, 3, 1, 4) # (3, B, num_heads, N, head_dim)
q, k, v = qkv[0], qkv[1], qkv[2]
# Attention計算
attn = (q @ k.transpose(-2, -1)) * self.scale # (B, num_heads, N, N)
attn = attn.softmax(dim=-1)
attn = self.dropout(attn)
# 値と結合
x = (attn @ v).transpose(1, 2).reshape(B, N, C)
x = self.proj(x)
x = self.dropout(x)
return x
# ===== MLP (Feed Forward Network) =====
class MLP(nn.Module):
def __init__(self, embed_dim=768, mlp_ratio=4.0, dropout=0.1):
super().__init__()
hidden_dim = int(embed_dim * mlp_ratio)
self.fc1 = nn.Linear(embed_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, embed_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
x = self.fc1(x)
x = F.gelu(x)
x = self.dropout(x)
x = self.fc2(x)
x = self.dropout(x)
return x
# ===== Transformer Block =====
class TransformerBlock(nn.Module):
def __init__(self, embed_dim=768, num_heads=12, mlp_ratio=4.0, dropout=0.1):
super().__init__()
self.norm1 = nn.LayerNorm(embed_dim)
self.attn = MultiHeadAttention(embed_dim, num_heads, dropout)
self.norm2 = nn.LayerNorm(embed_dim)
self.mlp = MLP(embed_dim, mlp_ratio, dropout)
def forward(self, x):
# Pre-Norm構造
x = x + self.attn(self.norm1(x))
x = x + self.mlp(self.norm2(x))
return x
# ===== Vision Transformer =====
class VisionTransformer(nn.Module):
def __init__(
self,
img_size=224,
patch_size=16,
in_channels=3,
num_classes=10,
embed_dim=768,
depth=12,
num_heads=12,
mlp_ratio=4.0,
dropout=0.1
):
super().__init__()
# パッチ埋め込み
self.patch_embed = PatchEmbedding(img_size, patch_size, in_channels, embed_dim)
num_patches = self.patch_embed.n_patches
# CLSトークン (分類用の特別なトークン)
self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
# 位置埋め込み
self.pos_embed = nn.Parameter(torch.zeros(1, num_patches + 1, embed_dim))
self.pos_drop = nn.Dropout(dropout)
# Transformer Blocks
self.blocks = nn.ModuleList([
TransformerBlock(embed_dim, num_heads, mlp_ratio, dropout)
for _ in range(depth)
])
# 分類ヘッド
self.norm = nn.LayerNorm(embed_dim)
self.head = nn.Linear(embed_dim, num_classes)
# 重み初期化
nn.init.trunc_normal_(self.pos_embed, std=0.02)
nn.init.trunc_normal_(self.cls_token, std=0.02)
def forward(self, x):
B = x.shape[0]
# パッチ埋め込み
x = self.patch_embed(x) # (B, n_patches, embed_dim)
# CLSトークンを追加
cls_tokens = self.cls_token.expand(B, -1, -1) # (B, 1, embed_dim)
x = torch.cat([cls_tokens, x], dim=1) # (B, n_patches+1, embed_dim)
# 位置埋め込みを追加
x = x + self.pos_embed
x = self.pos_drop(x)
# Transformer Blocksを通過
for block in self.blocks:
x = block(x)
# 正規化
x = self.norm(x)
# CLSトークンのみを使用して分類
cls_token_final = x[:, 0]
logits = self.head(cls_token_final)
return logits
2. 訓練コード
# ===== データ準備 =====
def get_dataloaders(batch_size=32):
transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
train_dataset = datasets.CIFAR10(
root='./data',
train=True,
download=True,
transform=transform
)
test_dataset = datasets.CIFAR10(
root='./data',
train=False,
download=True,
transform=transform
)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
return train_loader, test_loader
# ===== 訓練関数 =====
def train_one_epoch(model, dataloader, criterion, optimizer, device):
model.train()
running_loss = 0.0
correct = 0
total = 0
for images, labels in dataloader:
images, labels = images.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
epoch_loss = running_loss / len(dataloader)
epoch_acc = 100. * correct / total
return epoch_loss, epoch_acc
# ===== 評価関数 =====
def evaluate(model, dataloader, criterion, device):
model.eval()
running_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for images, labels in dataloader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
loss = criterion(outputs, labels)
running_loss += loss.item()
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
epoch_loss = running_loss / len(dataloader)
epoch_acc = 100. * correct / total
return epoch_loss, epoch_acc
# ===== メイン実行 =====
def main():
# ハイパーパラメータ
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
num_epochs = 50
batch_size = 64
learning_rate = 3e-4
# モデル作成(小型版)
model = VisionTransformer(
img_size=224,
patch_size=16,
in_channels=3,
num_classes=10,
embed_dim=384, # 小さめ
depth=6, # 浅め
num_heads=6,
mlp_ratio=4.0,
dropout=0.1
).to(device)
print(f"モデルパラメータ数: {sum(p.numel() for p in model.parameters()):,}")
# データローダー
train_loader, test_loader = get_dataloaders(batch_size)
# 損失関数とオプティマイザ
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=0.05)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs)
# 訓練ループ
for epoch in range(num_epochs):
train_loss, train_acc = train_one_epoch(model, train_loader, criterion, optimizer, device)
test_loss, test_acc = evaluate(model, test_loader, criterion, device)
scheduler.step()
print(f"Epoch [{epoch+1}/{num_epochs}]")
print(f" Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
print(f" Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%")
# モデル保存
torch.save(model.state_dict(), 'vit_model.pth')
print("訓練完了!")
if __name__ == "__main__":
main()
3. 事前学習済みモデルの使用
# torchvisionの事前学習済みViTを使う簡単な方法
from torchvision.models import vit_b_16, ViT_B_16_Weights
def use_pretrained_vit():
# 事前学習済みモデルをロード
weights = ViT_B_16_Weights.IMAGENET1K_V1
model = vit_b_16(weights=weights)
# ファインチューニング用にヘッドを置き換え
num_classes = 10 # CIFAR-10
model.heads = nn.Linear(model.hidden_dim, num_classes)
# 前処理も取得
preprocess = weights.transforms()
return model, preprocess
# 使用例
model, preprocess = use_pretrained_vit()
4. 実行の結果
$ python vision-transformer-classification.py
モデルパラメータ数: 11,022,730
Epoch [1/50]
Train Loss: 1.7205, Train Acc: 36.19%
Test Loss: 1.5259, Test Acc: 44.82%
Epoch [2/50]
Train Loss: 1.4613, Train Acc: 46.57%
Test Loss: 1.3900, Test Acc: 49.07%
...
Epoch [46/50]
Train Loss: 0.0055, Train Acc: 99.86%
Test Loss: 1.6752, Test Acc: 74.55%
Epoch [47/50]
Train Loss: 0.0057, Train Acc: 99.82%
Test Loss: 1.6772, Test Acc: 74.43%
Epoch [48/50]
Train Loss: 0.0044, Train Acc: 99.89%
Test Loss: 1.6697, Test Acc: 74.76%
Epoch [49/50]
Train Loss: 0.0041, Train Acc: 99.90%
Test Loss: 1.6655, Test Acc: 74.78%
Epoch [50/50]
Train Loss: 0.0043, Train Acc: 99.89%
Test Loss: 1.6649, Test Acc: 74.76%
訓練完了!
主要な構成要素の解説
- パッチ埋め込み: 画像を16×16などのパッチに分割し、ベクトル化
- CLSトークン: 分類に使用する特別なトークン
- 位置埋め込み: パッチの位置情報を学習
- Transformer: Self-Attentionで画像全体の関係性を捉える
- 分類ヘッド: CLSトークンから最終的な予測を出力
このコードでCIFAR-10での画像分類が実行できます!
Postsもっと読む
PyTorch Lightningで画像分類
PyTorch Lightningで画像分類のサンプルコード解説
PyTorch Lightningを使った画像分類の完全なサンプルコードを解説します。
完全なサンプルコード
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
# 1. モデル定義
class ImageClassifier(pl.LightningModule):
def __init__(self, num_classes=10, learning_rate=1e-3):
super().__init__()
self.save_hyperparameters() # ハイパーパラメータを自動保存
# 簡単なCNNモデル
self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
self.pool = nn.MaxPool2d(2, 2)
self.fc1 = nn.Linear(128 * 4 * 4, 512)
self.fc2 = nn.Linear(512, num_classes)
self.dropout = nn.Dropout(0.5)
def forward(self, x):
# 順伝播の定義
x = self.pool(F.relu(self.conv1(x))) # 32x32 -> 16x16
x = self.pool(F.relu(self.conv2(x))) # 16x16 -> 8x8
x = self.pool(F.relu(self.conv3(x))) # 8x8 -> 4x4
x = x.view(-1, 128 * 4 * 4)
x = self.dropout(F.relu(self.fc1(x)))
x = self.fc2(x)
return x
def training_step(self, batch, batch_idx):
# 訓練時の1ステップ
x, y = batch
logits = self(x)
loss = F.cross_entropy(logits, y)
# 精度計算
preds = torch.argmax(logits, dim=1)
acc = (preds == y).float().mean()
# ログ記録
self.log('train_loss', loss, prog_bar=True)
self.log('train_acc', acc, prog_bar=True)
return loss
def validation_step(self, batch, batch_idx):
# 検証時の1ステップ
x, y = batch
logits = self(x)
loss = F.cross_entropy(logits, y)
preds = torch.argmax(logits, dim=1)
acc = (preds == y).float().mean()
self.log('val_loss', loss, prog_bar=True)
self.log('val_acc', acc, prog_bar=True)
return loss
def test_step(self, batch, batch_idx):
# テスト時の1ステップ
x, y = batch
logits = self(x)
loss = F.cross_entropy(logits, y)
preds = torch.argmax(logits, dim=1)
acc = (preds == y).float().mean()
self.log('test_loss', loss)
self.log('test_acc', acc)
return loss
def configure_optimizers(self):
# オプティマイザとスケジューラの設定
optimizer = torch.optim.Adam(self.parameters(), lr=self.hparams.learning_rate)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.5, patience=3
)
return {
'optimizer': optimizer,
'lr_scheduler': {
'scheduler': scheduler,
'monitor': 'val_loss'
}
}
# 2. データモジュール定義
class CIFAR10DataModule(pl.LightningDataModule):
def __init__(self, data_dir='./data', batch_size=32, num_workers=4):
super().__init__()
self.data_dir = data_dir
self.batch_size = batch_size
self.num_workers = num_workers
# データ前処理
self.transform_train = transforms.Compose([
transforms.RandomHorizontalFlip(),
transforms.RandomCrop(32, padding=4),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
self.transform_test = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
def prepare_data(self):
# データのダウンロード(1度だけ実行)
datasets.CIFAR10(self.data_dir, train=True, download=True)
datasets.CIFAR10(self.data_dir, train=False, download=True)
def setup(self, stage=None):
# データセットの設定
if stage == 'fit' or stage is None:
cifar_full = datasets.CIFAR10(
self.data_dir, train=True, transform=self.transform_train
)
# 訓練データと検証データに分割
self.cifar_train, self.cifar_val = random_split(
cifar_full, [45000, 5000]
)
if stage == 'test' or stage is None:
self.cifar_test = datasets.CIFAR10(
self.data_dir, train=False, transform=self.transform_test
)
def train_dataloader(self):
return DataLoader(
self.cifar_train,
batch_size=self.batch_size,
shuffle=True,
num_workers=self.num_workers
)
def val_dataloader(self):
return DataLoader(
self.cifar_val,
batch_size=self.batch_size,
num_workers=self.num_workers
)
def test_dataloader(self):
return DataLoader(
self.cifar_test,
batch_size=self.batch_size,
num_workers=self.num_workers
)
# 3. 訓練実行
def main():
# データモジュール作成
dm = CIFAR10DataModule(batch_size=64, num_workers=4)
# モデル作成
model = ImageClassifier(num_classes=10, learning_rate=1e-3)
# コールバック設定
checkpoint_callback = ModelCheckpoint(
monitor='val_loss',
dirpath='checkpoints/',
filename='cifar10-{epoch:02d}-{val_loss:.2f}',
save_top_k=3,
mode='min'
)
early_stop_callback = EarlyStopping(
monitor='val_loss',
patience=5,
mode='min'
)
# Trainer設定
trainer = pl.Trainer(
max_epochs=20,
accelerator='auto', # 自動でGPU/CPU選択
devices=1,
callbacks=[checkpoint_callback, early_stop_callback],
log_every_n_steps=10
)
# 訓練実行
trainer.fit(model, dm)
# テスト実行
trainer.test(model, dm)
if __name__ == '__main__':
main()
主要な構成要素の解説
1. LightningModule (モデル定義)
__init__: モデルの層を定義forward: 順伝播処理training_step: 訓練時の処理(損失計算など)validation_step: 検証時の処理configure_optimizers: オプティマイザ設定
2. LightningDataModule (データ管理)
prepare_data: データダウンロードsetup: データセット分割train/val/test_dataloader: データローダー提供
3. Trainer (訓練管理)
- エポック数、GPU設定、コールバックなどを統合管理
4. 実行の結果
$ python pytorch-lightning-cnn-classification.py
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
| Name | Type | Params | Mode
----------------------------------------------
0 | conv1 | Conv2d | 896 | train
1 | conv2 | Conv2d | 18.5 K | train
2 | conv3 | Conv2d | 73.9 K | train
3 | pool | MaxPool2d | 0 | train
4 | fc1 | Linear | 1.0 M | train
5 | fc2 | Linear | 5.1 K | train
6 | dropout | Dropout | 0 | train
----------------------------------------------
1.1 M Trainable params
0 Non-trainable params
1.1 M Total params
4.590 Total estimated model params size (MB)
7 Modules in train mode
0 Modules in eval mode
Epoch 19: 100%|███████████████████████████████████| 704/704 [00:03<00:00, 195.75it/s, v_num=0, train_loss=0.0607, train_acc=1.000, val_loss=0.604, val_acc=0.790]`Trainer.fit` stopped: `max_epochs=20` reached.
Epoch 19: 100%|███████████████████████████████████| 704/704 [00:03<00:00, 193.97it/s, v_num=0, train_loss=0.0607, train_acc=1.000, val_loss=0.604, val_acc=0.790]LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
Testing DataLoader 0: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████| 157/157 [00:00<00:00, 413.72it/s]
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Test metric ┃ DataLoader 0 ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ test_acc │ 0.8090999722480774 │
│ test_loss │ 0.566657304763794 │
└───────────────────────────┴───────────────────────────┘
PyTorch Lightningの利点
- コードが整理される: 訓練ループを書く必要なし
- GPU対応が簡単:
accelerator='auto'だけ - 再現性が高い: ハイパーパラメータ自動保存
- ログ管理が楽: TensorBoard等に自動記録
このコードをそのまま実行すれば、CIFAR-10での画像分類が動きます!
Postsもっと読む
PyTorch + CNNでMNIST数字分類
PyTorch + CNNでMNIST数字分類の解説
MNISTの手書き数字を分類するCNNの実装を段階的に解説します。
完全なコード
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
# 1. CNNモデルの定義
class CNN(nn.Module):
def __init__(self):
super(CNN, self).__init__()
# 畳み込み層
self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1) # 28x28x1 → 28x28x32
self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1) # 14x14x32 → 14x14x64
# プーリング層
self.pool = nn.MaxPool2d(2, 2) # サイズを半分に
# 全結合層
self.fc1 = nn.Linear(64 * 7 * 7, 128)
self.fc2 = nn.Linear(128, 10) # 10クラス分類
# ドロップアウト
self.dropout = nn.Dropout(0.5)
def forward(self, x):
# 畳み込み + ReLU + プーリング
x = self.pool(torch.relu(self.conv1(x))) # 28x28x32 → 14x14x32
x = self.pool(torch.relu(self.conv2(x))) # 14x14x64 → 7x7x64
# 平坦化
x = x.view(-1, 64 * 7 * 7)
# 全結合層
x = torch.relu(self.fc1(x))
x = self.dropout(x)
x = self.fc2(x)
return x
# 2. データの準備
transform = transforms.Compose([
transforms.ToTensor(), # PIL画像をTensorに変換
transforms.Normalize((0.5,), (0.5,)) # 正規化 (平均, 標準偏差)
])
# データセットのダウンロードと読み込み
train_dataset = datasets.MNIST(root='~/.pytorch/data', train=True,
download=True, transform=transform)
test_dataset = datasets.MNIST(root='~/.pytorch/data', train=False,
download=True, transform=transform)
# DataLoader作成
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1000, shuffle=False)
# 3. モデル、損失関数、最適化手法の設定
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# MPSが利用可能かチェック
if torch.backends.mps.is_available():
device = torch.device("mps")
model = CNN().to(device)
criterion = nn.CrossEntropyLoss() # 多クラス分類用の損失関数
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 4. 訓練関数
def train(model, device, train_loader, optimizer, criterion, epoch):
model.train() # 訓練モードに設定
total_loss = 0
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
# 勾配をゼロに
optimizer.zero_grad()
# 順伝播
output = model(data)
# 損失計算
loss = criterion(output, target)
# 逆伝播
loss.backward()
# パラメータ更新
optimizer.step()
total_loss += loss.item()
if batch_idx % 100 == 0:
print(f'Epoch: {epoch}, Batch: {batch_idx}, Loss: {loss.item():.4f}')
return total_loss / len(train_loader)
# 5. テスト関数
def test(model, device, test_loader):
model.eval() # 評価モードに設定
correct = 0
total = 0
with torch.no_grad(): # 勾配計算を無効化
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
# 最も確率の高いクラスを予測
_, predicted = torch.max(output.data, 1)
total += target.size(0)
correct += (predicted == target).sum().item()
accuracy = 100 * correct / total
print(f'Test Accuracy: {accuracy:.2f}%')
return accuracy
# 6. 訓練実行
epochs = 5
for epoch in range(1, epochs + 1):
train_loss = train(model, device, train_loader, optimizer, criterion, epoch)
print(f'Average Loss: {train_loss:.4f}')
test(model, device, test_loader)
print('-' * 60)
# モデルの保存
torch.save(model.state_dict(), 'mnist_cnn.pth')
主要な構成要素の解説
1. CNNモデル構造
入力 (1x28x28)
↓
Conv2d (32フィルター) → ReLU → MaxPool → (32x14x14)
↓
Conv2d (64フィルター) → ReLU → MaxPool → (64x7x7)
↓
Flatten → (3136)
↓
FC (128) → ReLU → Dropout
↓
FC (10) → 出力
2. 重要なパラメータ
kernel_size=3: 3×3の畳み込みフィルターpadding=1: 画像サイズを維持MaxPool2d(2,2): 2×2領域の最大値を取得Dropout(0.5): 過学習防止
3. 実行結果
$ python3 mnist+cnn+classification.py
Epoch: 1, Batch: 0, Loss: 2.3096
Epoch: 1, Batch: 100, Loss: 0.3755
Epoch: 1, Batch: 200, Loss: 0.4064
Epoch: 1, Batch: 300, Loss: 0.2006
Epoch: 1, Batch: 400, Loss: 0.1787
Epoch: 1, Batch: 500, Loss: 0.0984
Epoch: 1, Batch: 600, Loss: 0.1776
Epoch: 1, Batch: 700, Loss: 0.2356
Epoch: 1, Batch: 800, Loss: 0.1497
Epoch: 1, Batch: 900, Loss: 0.1300
Average Loss: 0.2377
Test Accuracy: 98.61%
------------------------------------------------------------
Epoch: 2, Batch: 0, Loss: 0.1457
Epoch: 2, Batch: 100, Loss: 0.1019
Epoch: 2, Batch: 200, Loss: 0.0407
Epoch: 2, Batch: 300, Loss: 0.0687
Epoch: 2, Batch: 400, Loss: 0.0562
Epoch: 2, Batch: 500, Loss: 0.0583
Epoch: 2, Batch: 600, Loss: 0.0361
Epoch: 2, Batch: 700, Loss: 0.0554
Epoch: 2, Batch: 800, Loss: 0.0757
Epoch: 2, Batch: 900, Loss: 0.2820
Average Loss: 0.0859
Test Accuracy: 98.98%
------------------------------------------------------------
Epoch: 3, Batch: 0, Loss: 0.0496
Epoch: 3, Batch: 100, Loss: 0.1323
Epoch: 3, Batch: 200, Loss: 0.0146
Epoch: 3, Batch: 300, Loss: 0.0297
Epoch: 3, Batch: 400, Loss: 0.0217
Epoch: 3, Batch: 500, Loss: 0.0470
Epoch: 3, Batch: 600, Loss: 0.0499
Epoch: 3, Batch: 700, Loss: 0.0439
Epoch: 3, Batch: 800, Loss: 0.0967
Epoch: 3, Batch: 900, Loss: 0.0390
Average Loss: 0.0642
Test Accuracy: 99.00%
------------------------------------------------------------
Epoch: 4, Batch: 0, Loss: 0.0106
Epoch: 4, Batch: 100, Loss: 0.0114
Epoch: 4, Batch: 200, Loss: 0.0156
Epoch: 4, Batch: 300, Loss: 0.0550
Epoch: 4, Batch: 400, Loss: 0.0288
Epoch: 4, Batch: 500, Loss: 0.0282
Epoch: 4, Batch: 600, Loss: 0.1245
Epoch: 4, Batch: 700, Loss: 0.0610
Epoch: 4, Batch: 800, Loss: 0.0127
Epoch: 4, Batch: 900, Loss: 0.0365
Average Loss: 0.0516
Test Accuracy: 99.12%
------------------------------------------------------------
Epoch: 5, Batch: 0, Loss: 0.0130
Epoch: 5, Batch: 100, Loss: 0.0412
Epoch: 5, Batch: 200, Loss: 0.0173
Epoch: 5, Batch: 300, Loss: 0.0064
Epoch: 5, Batch: 400, Loss: 0.0599
Epoch: 5, Batch: 500, Loss: 0.0848
Epoch: 5, Batch: 600, Loss: 0.0542
Epoch: 5, Batch: 700, Loss: 0.0545
Epoch: 5, Batch: 800, Loss: 0.0207
Epoch: 5, Batch: 900, Loss: 0.0759
Average Loss: 0.0424
Test Accuracy: 99.20%
------------------------------------------------------------
このコードで99.20%の精度が達成できます!