Pytorch实现Fashion-mnist分类任务全过程
作者:LGDDDDDD 发布时间:2023-07-14 05:12:47
标签:Pytorch,Fashion-mnist,分类任务
数据概况
Fashion-mnist
经典的MNIST数据集包含了大量的手写数字。十几年来,来自机器学习、机器视觉、人工智能、深度学习领域的研究员们把这个数据集作为衡量算法的基准之一。
你会在很多的会议,期刊的论文中发现这个数据集的身影。实际上,MNIST数据集已经成为算法作者的必测的数据集之一。
类别标注
在Fashion-mnist数据集中,每个训练样本都按照以下类别进行了标注:
数据处理
对输入进行归一化
归一化时需要统一进行 x = (x - mean) / std
train_trans = transforms.Compose([
transforms.RandomCrop(28, padding=2),#数据增强
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
normalize
])
test_trans = transforms.Compose([
transforms.ToTensor(),
normalize
])
mnist_train = torchvision.datasets.FashionMNIST(root='../data',train=True,download=True,transform=train_trans)
mnist_test = torchvision.datasets.FashionMNIST(root='../data',train=False,download=True,transform=test_trans)
train_iter = torch.utils.data.DataLoader(mnist_train, batch_size=batch_size, shuffle=True)
test_iter = torch.utils.data.DataLoader(mnist_test, batch_size=batch_size, shuffle=False)
# 求整个数据集的均值
temp_sum = 0
cnt = 0
for X, y in train_iter:
if y.shape[0] != batch_size:
break # 最后一个batch不足batch_size,这里就忽略了
channel_mean = torch.mean(X, dim=(0,2,3)) # 按channel求均值(不过这里只有1个channel)
cnt += 1 # cnt记录的是batch的个数,不是图像
temp_sum += channel_mean[0].item()
dataset_global_mean = temp_sum / cnt
print('整个数据集的像素均值:{}'.format(dataset_global_mean))
# 求整个数据集的标准差
cnt = 0
temp_sum = 0
for X, y in train_iter:
if y.shape[0] != batch_size:
break # 最后一个batch不足batch_size,这里就忽略了
residual = (X - dataset_global_mean) ** 2
channel_var_mean = torch.mean(residual, dim=(0,2,3))
cnt += 1 # cnt记录的是batch的个数,不是图像
temp_sum += math.sqrt(channel_var_mean[0].item())
dataset_global_std = temp_sum / cnt
print('整个数据集的像素标准差:{}'.format(dataset_global_std))
整个数据集的像素均值:0.2860366729433025
整个数据集的像素标准差:0.35288708155778725
数据增强
加入随机裁剪和翻转
============================ step 1/6 数据 ============================
batch_size = 64
normalize = transforms.Normalize(mean=[0.286], std=[0.352])#对像素值归一化
train_trans = transforms.Compose([
transforms.RandomCrop(28, padding=2),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
normalize
])
test_trans = transforms.Compose([
transforms.ToTensor(),
normalize
])
mnist_train = torchvision.datasets.FashionMNIST(root='../data',train=True,download=True,transform=train_trans)
mnist_test = torchvision.datasets.FashionMNIST(root='../data',train=False,download=True,transform=test_trans)
train_iter = torch.utils.data.DataLoader(mnist_train, batch_size=batch_size, shuffle=True)
test_iter = torch.utils.data.DataLoader(mnist_test, batch_size=batch_size, shuffle=False)
定义Resnet网络
class GlobalAvgPool2d(nn.Module):
"""
全局平均池化层
可通过将普通的平均池化的窗口形状设置成输入的高和宽实现
"""
def __init__(self):
super(GlobalAvgPool2d, self).__init__()
def forward(self, x):
return F.avg_pool2d(x, kernel_size=x.size()[2:])
class FlattenLayer(torch.nn.Module):
def __init__(self):
super(FlattenLayer, self).__init__()
def forward(self, x): # x shape: (batch, *, *, ...)
return x.view(x.shape[0], -1)
class Residual(nn.Module):
def __init__(self, in_channels, out_channels, use_1x1conv=False, stride=1):
"""
use_1×1conv: 是否使用额外的1x1卷积层来修改通道数
stride: 卷积层的步幅, resnet使用步长为2的卷积来替代pooling的作用,是个很赞的idea
"""
super(Residual, self).__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, stride=stride)
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
if use_1x1conv:
self.conv3 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride)
else:
self.conv3 = None
self.bn1 = nn.BatchNorm2d(out_channels)
self.bn2 = nn.BatchNorm2d(out_channels)
def forward(self, X):
Y = F.relu(self.bn1(self.conv1(X)))
Y = self.bn2(self.conv2(Y))
if self.conv3:
X = self.conv3(X)
return F.relu(Y + X)
def resnet_block(in_channels, out_channels, num_residuals, first_block=False):
'''
resnet block
num_residuals: 当前block包含多少个残差块
first_block: 是否为第一个block
一个resnet block由num_residuals个残差块组成
其中第一个残差块起到了通道数的转换和pooling的作用
后面的若干残差块就是完成正常的特征提取
'''
if first_block:
assert in_channels == out_channels # 第一个模块的输出通道数同输入通道数一致
blk = []
for i in range(num_residuals):
if i == 0 and not first_block:
blk.append(Residual(in_channels, out_channels, use_1x1conv=True, stride=2))
else:
blk.append(Residual(out_channels, out_channels))
return nn.Sequential(*blk)
# 定义resnet模型结构
net = nn.Sequential(
nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1), # TODO: 缩小感受野, 缩channel
nn.BatchNorm2d(32),
nn.ReLU())
# nn.ReLU(),
# nn.MaxPool2d(kernel_size=2, stride=2)) # TODO:去掉maxpool缩小感受野
# 然后是连续4个block
net.add_module("resnet_block1", resnet_block(32, 32, 2, first_block=True)) # TODO: channel统一减半
net.add_module("resnet_block2", resnet_block(32, 64, 2))
net.add_module("resnet_block3", resnet_block(64, 128, 2))
net.add_module("resnet_block4", resnet_block(128, 256, 2))
# global average pooling
net.add_module("global_avg_pool", GlobalAvgPool2d())
# fc layer
net.add_module("fc", nn.Sequential(FlattenLayer(), nn.Linear(256, 10)))
训练与测试
def evaluate_accuracy(data_iter, net, device=None):
#评估模型在测试集的准确率
if device is None and isinstance(net, torch.nn.Module):
# 如果没指定device就使用net的device
device = list(net.parameters())[0].device
net.eval()
acc_sum, n = 0.0, 0
with torch.no_grad():
for X, y in data_iter:
acc_sum += (net(X.to(device)).argmax(dim=1) == y.to(device)).float().sum().cpu().item()
n += y.shape[0]
net.train() # 改回训练模式
return acc_sum / n
def train_model(net, train_iter, test_iter, batch_size, optimizer, device, num_epochs):
net = net.to(device)
print("training on ", device)
loss = torch.nn.CrossEntropyLoss()
best_test_acc = 0
for epoch in range(num_epochs):
train_l_sum, train_acc_sum, n, batch_count, start = 0.0, 0.0, 0, 0, time.time()
for X, y in train_iter:
X = X.to(device)
y = y.to(device)
y_hat = net(X)
l = loss(y_hat, y)
optimizer.zero_grad()
l.backward()
optimizer.step()
train_l_sum += l.cpu().item()
train_acc_sum += (y_hat.argmax(dim=1) == y).sum().cpu().item()
n += y.shape[0]
batch_count += 1
test_acc = evaluate_accuracy(test_iter, net)
print('epoch %d, loss %.4f, train acc %.3f, test acc %.3f, time %.1f sec'
% (epoch + 1, train_l_sum / batch_count, train_acc_sum / n, test_acc, time.time() - start))
if test_acc > best_test_acc:
print('find best! save at model/best.pth')
best_test_acc = test_acc
torch.save(net.state_dict(), 'model/best.pth')
lr, num_epochs = 0.01, 10
optimizer = torch.optim.Adam(net.parameters(), lr=lr)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
train_model(net, train_iter, test_iter, batch_size, optimizer, device, num_epochs)
完整代码
import os
import sys
import time
import torch
from torch import nn, optim
import torch.nn.functional as F
import torchvision
from torchvision import transforms
class GlobalAvgPool2d(nn.Module):
"""
全局平均池化层
可通过将普通的平均池化的窗口形状设置成输入的高和宽实现
"""
def __init__(self):
super(GlobalAvgPool2d, self).__init__()
def forward(self, x):
return F.avg_pool2d(x, kernel_size=x.size()[2:])
class FlattenLayer(torch.nn.Module):
def __init__(self):
super(FlattenLayer, self).__init__()
def forward(self, x): # x shape: (batch, *, *, ...)
return x.view(x.shape[0], -1)
class Residual(nn.Module):
def __init__(self, in_channels, out_channels, use_1x1conv=False, stride=1):
"""
use_1×1conv: 是否使用额外的1x1卷积层来修改通道数
stride: 卷积层的步幅, resnet使用步长为2的卷积来替代pooling的作用,是个很赞的idea
"""
super(Residual, self).__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, stride=stride)
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
if use_1x1conv:
self.conv3 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride)
else:
self.conv3 = None
self.bn1 = nn.BatchNorm2d(out_channels)
self.bn2 = nn.BatchNorm2d(out_channels)
def forward(self, X):
Y = F.relu(self.bn1(self.conv1(X)))
Y = self.bn2(self.conv2(Y))
if self.conv3:
X = self.conv3(X)
return F.relu(Y + X)
def resnet_block(in_channels, out_channels, num_residuals, first_block=False):
'''
resnet block
num_residuals: 当前block包含多少个残差块
first_block: 是否为第一个block
一个resnet block由num_residuals个残差块组成
其中第一个残差块起到了通道数的转换和pooling的作用
后面的若干残差块就是完成正常的特征提取
'''
if first_block:
assert in_channels == out_channels # 第一个模块的输出通道数同输入通道数一致
blk = []
for i in range(num_residuals):
if i == 0 and not first_block:
blk.append(Residual(in_channels, out_channels, use_1x1conv=True, stride=2))
else:
blk.append(Residual(out_channels, out_channels))
return nn.Sequential(*blk)
# 定义resnet模型结构
net = nn.Sequential(
nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1), # TODO: 缩小感受野, 缩channel
nn.BatchNorm2d(32),
nn.ReLU())
# nn.ReLU(),
# nn.MaxPool2d(kernel_size=2, stride=2)) # TODO:去掉maxpool缩小感受野
# 然后是连续4个block
net.add_module("resnet_block1", resnet_block(32, 32, 2, first_block=True)) # TODO: channel统一减半
net.add_module("resnet_block2", resnet_block(32, 64, 2))
net.add_module("resnet_block3", resnet_block(64, 128, 2))
net.add_module("resnet_block4", resnet_block(128, 256, 2))
# global average pooling
net.add_module("global_avg_pool", GlobalAvgPool2d())
# fc layer
net.add_module("fc", nn.Sequential(FlattenLayer(), nn.Linear(256, 10)))
def load_data_fashion_mnist(batch_size, root='../data'):
"""Download the fashion mnist dataset and then load into memory."""
normalize = transforms.Normalize(mean=[0.28], std=[0.35])
train_augs = transforms.Compose([
transforms.RandomCrop(28, padding=2),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
normalize
])
test_augs = transforms.Compose([
transforms.ToTensor(),
normalize
])
mnist_train = torchvision.datasets.FashionMNIST(root=root, train=True, download=True, transform=train_augs)
mnist_test = torchvision.datasets.FashionMNIST(root=root, train=False, download=True, transform=test_augs)
if sys.platform.startswith('win'):
num_workers = 0 # 0表示不用额外的进程来加速读取数据
else:
num_workers = 4
train_iter = torch.utils.data.DataLoader(mnist_train, batch_size=batch_size, shuffle=True, num_workers=num_workers)
test_iter = torch.utils.data.DataLoader(mnist_test, batch_size=batch_size, shuffle=False, num_workers=num_workers)
return train_iter, test_iter
print('训练...')
batch_size = 64
train_iter, test_iter = load_data_fashion_mnist(batch_size, root='../data')
def evaluate_accuracy(data_iter, net, device=None):
if device is None and isinstance(net, torch.nn.Module):
# 如果没指定device就使用net的device
device = list(net.parameters())[0].device
net.eval()
acc_sum, n = 0.0, 0
with torch.no_grad():
for X, y in data_iter:
acc_sum += (net(X.to(device)).argmax(dim=1) == y.to(device)).float().sum().cpu().item()
n += y.shape[0]
net.train() # 改回训练模式
return acc_sum / n
def train_model(net, train_iter, test_iter, batch_size, optimizer, device, num_epochs, lr, lr_period, lr_decay):
net = net.to(device)
print("training on ", device)
loss = torch.nn.CrossEntropyLoss()
best_test_acc = 0
for epoch in range(num_epochs):
train_l_sum, train_acc_sum, n, batch_count, start = 0.0, 0.0, 0, 0, time.time()
if epoch > 0 and epoch % lr_period == 0: # 每lr_period个epoch,学习率衰减一次
lr = lr * lr_decay
for param_group in optimizer.param_groups:
param_group['lr'] = lr
for X, y in train_iter:
X = X.to(device)
y = y.to(device)
y_hat = net(X)
l = loss(y_hat, y)
optimizer.zero_grad()
l.backward()
optimizer.step()
train_l_sum += l.cpu().item()
train_acc_sum += (y_hat.argmax(dim=1) == y).sum().cpu().item()
n += y.shape[0]
batch_count += 1
test_acc = evaluate_accuracy(test_iter, net)
print('epoch %d, loss %.4f, train acc %.3f, test acc %.3f, time %.1f sec'
% (epoch + 1, train_l_sum / batch_count, train_acc_sum / n, test_acc, time.time() - start))
if test_acc > best_test_acc:
print('find best! save at model/best.pth')
best_test_acc = test_acc
torch.save(net.state_dict(), 'model/best.pth')
# utils.save_model({
# 'arch': args.model,
# 'state_dict': net.state_dict()
# }, 'saved-models/{}-run-{}.pth.tar'.format(args.model, run))
lr, num_epochs, lr_period, lr_decay = 0.01, 50, 5, 0.1
#optimizer = optim.Adam(net.parameters(), lr=lr)
optimizer = optim.SGD(net.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
train_model(net, train_iter, test_iter, batch_size, optimizer, device, num_epochs, lr, lr_period, lr_decay)
print('加载最优模型')
net.load_state_dict(torch.load('model/best.pth'))
net = net.to(device)
print('inference测试集')
net.eval()
id = 0
preds_list = []
with torch.no_grad():
for X, y in test_iter:
batch_pred = list(net(X.to(device)).argmax(dim=1).cpu().numpy())
for y_pred in batch_pred:
preds_list.append((id, y_pred))
id += 1
print('生成测试集评估文件')
with open('result.csv', 'w') as f:
f.write('ID,Prediction\n')
for id, pred in preds_list:
f.write('{},{}\n'.format(id, pred))
来源:https://blog.csdn.net/weixin_43289424/article/details/104598221
![](https://www.aspxhome.com/images/zang.png)
![](https://www.aspxhome.com/images/jiucuo.png)
猜你喜欢
- 1.抽象类抽象类机制中总是要定义一个公共的基类,而将特定的细节留给继承者来实现。通过抽象概念,可以在开发项目中创建扩展性很好的架构。任何一个
- slice 可以用来获取数组片段,它返回新数组,不会修改原数组。除了正常用法,slice 经常用来将 array-like 对象转换为 tr
- 内容摘要:下面是虚机维护中,经常碰到的一些ASP程序中的数据库调用的错误,现收集整理如下:1.不能打开注册表关键字(8007000e);2.
- 目的:了解常用的ORM框架;使用SQLObject框架操作MySQL数据库。面试题:在Python语言中有哪些常用的ORM框架,它们有什么区
- WTForms 是用于web开发的灵活的表单验证和呈现库,它可以与您选择的任何web框架和模板引擎一起工作,并支持数据验证、CSRF保护、国
- 在为一个项目添加权限时,遇到一个问题,就是为项目所有的url设置权限,但是一个一个手动输入太麻烦了,所以考虑用代码获取到一个项目所有的url
- 前言本文使用Mysql8.0的特新实现递归查询,文中给出了详细的实例代码,下面话不多说了,来一起看看详细的介绍吧Mysql8.0递归查询用法
- 这篇文章主要介绍了python多进程间通信代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可
- 具体解决方案如下: 一、IE和FF下document.body对象的clientHeight,offsetHeight,scrollHeig
- 本文实例讲述了PHP中round()函数对浮点数进行四舍五入的方法。分享给大家供大家参考。具体方法如下:语法:round(x,prec)参数
- 有的时候我们在使用pycharm编辑python,需要导入各种各样的包,这些包是不能直接使用的,需要先进行安装。否则就会出现模块导入错误。下
- 同伪类的方式类似,伪元素通过对插人到文档中的虚构元素进行触发,从而达到某种效果。在CSS1里,有两个伪元素,即:first-letter和f
- matplotlib简介如果你在大学里参加过数学建模竞赛或者是用过MATLAB的话,相比会对这一款软件中的画图功能印象深刻。MATLAB可以
- python里dict(字典)怎么变成list(列表)?说明:列表不可以转换为字典1、转换后的列表为无序列表a = {'a'
- SQL Server 获取数据的总记录数,有两种方式:1.先分页获取数据,然后再查询一遍数据库获取到总数量2.使用count(1) over
- 这篇文章主要介绍了django 简单实现登录验证给你,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友
- 相比于range,list等简易单词,enumerate仅凭外形都不太让人愿意用。事实上,enumerate还是很好用的。enumerate
- 一、先进行剪切操作圆形区域占图片可能不多,多余的部分不要。看下图。只要纽扣电池内部和少许的边缘部分,其余黑色背景部分不需要。先沿着纽扣电池的
- 几个常用装饰器pytest.ini 配置文件 例子:[pytest]addopts = -v -s --html=py_test/scrip
- Pygame的mixer 模块可以依据命令播放一个或多个声音,并且也可以将这些声音混合在一起。而获得声音需要四个步骤:一、启动mixer进程