import os import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, Dataset from torchvision import models, transforms from PIL import Image from safetensors.torch import save_file class CatDogDataset(Dataset): def __init__(self, root_dir, transform=None): self.root_dir = root_dir self.transform = transform self.image_paths = [] self.labels = [] for filename in os.listdir(root_dir): if 'cat' in filename: self.image_paths.append(os.path.join(root_dir, filename)) self.labels.append(0) # cat elif 'dog' in filename: self.image_paths.append(os.path.join(root_dir, filename)) self.labels.append(1) # dog def __len__(self): return len(self.image_paths) def __getitem__(self, idx): img_path = self.image_paths[idx] image = Image.open(img_path).convert('RGB') label = self.labels[idx] if self.transform: image = self.transform(image) return image, label # 数据预处理 data_transforms = { 'train': transforms.Compose([ transforms.RandomResizedCrop(224), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]), 'val': transforms.Compose([ transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]), } data_dir = 'dog-cat' image_datasets = {x: CatDogDataset(os.path.join(data_dir, x), data_transforms[x]) for x in ['train', 'val']} dataloaders = {x: DataLoader(image_datasets[x], batch_size=32, shuffle=True, num_workers=4) for x in ['train', 'val']} dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']} class_names = ['cat', 'dog'] device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # 加载预训练的 ResNet-50 模型并进行微调 model_ft = models.resnet50(weights=models.ResNet50_Weights.DEFAULT) num_ftrs = model_ft.fc.in_features model_ft.fc = nn.Linear(num_ftrs, len(class_names)) model_ft = model_ft.to(device) criterion = nn.CrossEntropyLoss() # 优化器 optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.001, momentum=0.9) # 学习率调度器 exp_lr_scheduler = optim.lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1) # 训练模型 def train_model(model, criterion, optimizer, scheduler, num_epochs=25): best_model_wts = model.state_dict() best_acc = 0.0 for epoch in range(num_epochs): print(f'Epoch {epoch}/{num_epochs - 1}') print('-' * 10) # 每个epoch都有训练和验证阶段 for phase in ['train', 'val']: if phase == 'train': model.train() # 设置模型为训练模式 else: model.eval() # 设置模型为评估模式 running_loss = 0.0 running_corrects = 0 # 遍历数据 for inputs, labels in dataloaders[phase]: inputs = inputs.to(device) labels = labels.to(device) # 清零参数梯度 optimizer.zero_grad() # 前向传播 with torch.set_grad_enabled(phase == 'train'): outputs = model(inputs) _, preds = torch.max(outputs, 1) loss = criterion(outputs, labels) # 只有在训练阶段才反向传播+优化 if phase == 'train': loss.backward() optimizer.step() # 统计 running_loss += loss.item() * inputs.size(0) running_corrects += torch.sum(preds == labels.data) if phase == 'train': scheduler.step() epoch_loss = running_loss / dataset_sizes[phase] epoch_acc = running_corrects.double() / dataset_sizes[phase] print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}') # 深拷贝模型 if phase == 'val' and epoch_acc > best_acc: best_acc = epoch_acc best_model_wts = model.state_dict() print() print(f'Best val Acc: {best_acc:4f}') # 加载最佳模型权重 model.load_state_dict(best_model_wts) return model # 训练和评估模型 model_ft = train_model(model_ft, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=25) # 保存模型为 .pt 文件 torch.save(model_ft.state_dict(), 'pytorch_model.bin') metadata = {"format": "pt"} # 保存模型为 .safetensors 文件 save_file(model_ft.state_dict(), 'model.safetensors', metadata=metadata)