【Pytorch】基础训练框架

【Pytorch】基础训练框架,第1张

参考:https://blog.csdn.net/weixin_44493291/article/details/111932895
感谢Chaossll提供的基本思路!

起因

每次训练都要写完整的流程,比较麻烦,实际上,除了网络要修改,其他部分大同小异。


我们可以梳理成一定格式。


针对

这里针对图像分类任务,给出训练框架。


默认图像数据集的格式是ImageNet。


四大模块
  • 准备数据
  • 设计模型
  • 设置Loss和Optimizer
  • 训练和测试
  • (推理)
准备数据
import torch
from torchvision import transforms
from torchvision import datasets
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch.optim as optim

train_dir=''
test_dir=''
batch_size = 64
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
normalize = transforms.Normalize(mean=[0.5,0.5,0.5],
                                 std=[0.5,0.5,0.5])
train_dataset = datasets.ImageFolder(train_dir,
                                     transforms.Compose([
                                         transforms.RandomResizedCrop((224, 224)),
                                         transforms.RandomHorizontalFlip(),
                                         transforms.ToTensor(),
                                         normalize]))
test_dataset = datasets.ImageFolder(test_dir,
                                    transforms.Compose([
                                        transforms.Resize((224, 224)),
                                        transforms.ToTensor(),
                                        normalize]))
train_loader = DataLoader(train_dataset,
               batch_size=batch_size,
               shuffle=True)
test_loader = DataLoader(test_dataset,
               batch_size=batch_size,
               shuffle=True)
设计模型/导入模型
import torch
import torchvision
#num_classes=10
net = torchvision.models.resnet34()
net.to(device)

print(net)
设置Loss和Optimizer
lr=0.01
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=lr, momentum=0.5)
训练和测试
def train(epoch):
    running_loss = 0.0
    correct = 0
    total = 0
    net.train()
    for batch_idx, data in enumerate(train_loader, 0):
        inputs, target = data
        inputs, target = inputs.to(device), target.to(device)
        optimizer.zero_grad()
        outputs = net(inputs)
        _, predicted = torch.max(outputs.data, dim=1)
        total += target.size(0)
        correct += (predicted == target).sum().item()
        loss = criterion(outputs, target)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        if (batch_idx+1) % 30 == 0:
            print('[%d, %5d]  loss: %.3f  acc: %.3f' % (epoch + 1, batch_idx + 1, running_loss / 30, correct / total))
            running_loss = 0.0
    print('[%d] Accuracy on train set: %d %% [%d/%d]' % (epoch+1, 100 * correct / total, correct, total))
    return correct / total

def test():
    net.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in test_loader:
            inputs, target = data
            inputs, target = inputs.to(device), target.to(device)
            outputs = net(inputs)
            _, predicted = torch.max(outputs.data, dim=1)
            total += target.size(0)
            correct += (predicted == target).sum().item()
    print('[%d] Accuracy on test set: %d %% [%d/%d]' % (epoch+1, 100 * correct / total, correct, total))
    return correct / total

checkpoint=''
if checkpoint!='':
    net.load_state_dict(torch.load(checkpoint))
net_name='res18'
txt_name = './log/' + net_name + 'log.txt'


train_acc_list=[]
test_acc_list=[]
print("===================================Start Training===================================")
for epoch in range(100):
    train_acc=train(epoch)
    test_acc=test()
    with open(txt_name, 'a') as f:
            f.write(str(train_acc) + ' '+str(test_acc) + '\n')
    train_acc_list.append(train_acc)
    test_acc_list.append(test_acc)
    if (epoch+1)%30 == 0:
        torch.save(net.state_dict(),'{}-{}-{}-{}-{:.3f}-{:.3f}.pt'.format(net_name,epoch+1,batchsize,lr,train_acc,test_acc))
print("=================================Training Finished==================================")
print(train_acc_list)
print(test_acc_list)
推理代码
from model.resnet import MyNet
from model.resnet import ResBlock
import torch
from torchvision import transforms
from torchvision import datasets
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch.optim as optim

img_dir='autodl-tmp/RAF/test/1/test_0008_aligned.jpg'
checkpoint='autodl-tmp/ckpt/res18-100-64-0.01-0.814-0.899.pt'
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
#net_name='res18_3'

normalize = transforms.Normalize(mean=[0.5,0.5,0.5],
                                 std=[0.5,0.5,0.5])
transform=transforms.Compose([transforms.Resize((32, 32)),
                          transforms.ToTensor(),
                          normalize])
from PIL import Image
img = Image.open(img_dir)
img_t=transform(img)
batch=torch.unsqueeze(img_t,0)
batch=batch.to(device)

net=MyNet(ResBlock)
net.to(device)
if checkpoint!='':
    net.load_state_dict(torch.load(checkpoint))

out=net(batch)
_, predicted = torch.max(out.data, dim=1)
index=predicted.item()
cls=['surprise','fear','disgust','happiness','sadness','anger','neutral']
print(cls[index])
完整代码
import torch
from torchvision import transforms
from torchvision import datasets
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch.optim as optim

train_dir='autodl-tmp/RAF/train'
test_dir='autodl-tmp/RAF/test'
checkpoint='autodl-tmp/ckpt/res18-100-64-0.01-0.814-0.899.pt'
net_name='res18_3'

batch_size = 64
epochs=30
lr=0.001

save_dir='./autodl-tmp/ckpt/'+net_name+'/'
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
normalize = transforms.Normalize(mean=[0.5,0.5,0.5],
                                 std=[0.5,0.5,0.5])
train_dataset = datasets.ImageFolder(train_dir,
                                     transforms.Compose([
                                         transforms.RandomResizedCrop((32, 32)),
                                         transforms.RandomHorizontalFlip(),
                                         transforms.ToTensor(),
                                         normalize]))
test_dataset = datasets.ImageFolder(test_dir,
                                    transforms.Compose([
                                        transforms.Resize((32, 32)),
                                        transforms.ToTensor(),
                                        normalize]))
train_loader = DataLoader(train_dataset,
               batch_size=batch_size,
               shuffle=True)
test_loader = DataLoader(test_dataset,
               batch_size=batch_size,
               shuffle=True)

from model.resnet import MyNet
from model.resnet import ResBlock
net=MyNet(ResBlock)
net.to(device)

criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=lr, momentum=0.5)


def train(epoch):
    running_loss = 0.0
    correct = 0
    total = 0
    net.train()
    for batch_idx, data in enumerate(train_loader, 0):
        inputs, target = data
        inputs, target = inputs.to(device), target.to(device)
        optimizer.zero_grad()
        # forward + backward + update
        outputs = net(inputs)
        _, predicted = torch.max(outputs.data, dim=1)
        total += target.size(0)
        correct += (predicted == target).sum().item()
        loss = criterion(outputs, target)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        if (batch_idx+1) % 30 == 0:
            print('[%d, %3d]  loss: %.3f  acc: %.3f' % (epoch + 1, batch_idx + 1, running_loss / 30, correct / total))
            #print(predicted)
            #print(total)
            #print(correct)
            running_loss = 0.0
    print('[%d] Accuracy on train set: %d %% [%d/%d]' % (epoch+1, 100 * correct / total, correct, total))
    return correct / total

def test():
    net.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in test_loader:
            inputs, target = data
            inputs, target = inputs.to(device), target.to(device)
            outputs = net(inputs)
            _, predicted = torch.max(outputs.data, dim=1)
            total += target.size(0)
            correct += (predicted == target).sum().item()
            #print(predicted)
            #print(target.size(0))
            #print((predicted == target).sum().item())
    print('[%d] Accuracy on test set: %d %% [%d/%d]' % (epoch+1, 100 * correct / total, correct, total))
    return correct / total


if checkpoint!='':
    net.load_state_dict(torch.load(checkpoint))

txt_name = save_dir + net_name + '_log.txt'
import os
if not os.path.exists(save_dir):
    os.makedirs(save_dir)

train_acc_list=[]
test_acc_list=[]
print("===================================Start Training===================================")
for epoch in range(epochs):
    train_acc=train(epoch)
    test_acc=test()
    with open(txt_name, 'a') as f:
            f.write(str(train_acc) + ' '+str(test_acc) + '\n')
    train_acc_list.append(train_acc)
    test_acc_list.append(test_acc)
    if (epoch+1)%10 == 0:
        torch.save(net.state_dict(),save_dir+'{}-{}-{}-{}-{:.3f}-{:.3f}.pt'.format(net_name,epoch+1,batch_size,lr,train_acc,test_acc))
print("=================================Training Finished==================================")
print(train_acc_list)
print(test_acc_list)
pywebio推理
from model.resnet import MyNet
from model.resnet import ResBlock
import torch
from torchvision import transforms
from torchvision import datasets
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch.optim as optim

from pywebio.input import *
from pywebio.output import *
from pywebio.pin import *
from pywebio import start_server
from PIL import Image
from io import BytesIO

checkpoint='model/res18-100-64-0.01-0.814-0.899.pt'
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
normalize = transforms.Normalize(mean=[0.5,0.5,0.5],
                                 std=[0.5,0.5,0.5])
transform=transforms.Compose([transforms.Resize((32, 32)),
                          transforms.ToTensor(),
                          normalize])
net=MyNet(ResBlock)
net.to(device)
if checkpoint!='':
    net.load_state_dict(torch.load(checkpoint))
def mypage():
    img = file_upload("Select a image:", accept="image/*",placeholder='请上传图片')
    #put_file('1.png',img['content'])
    img=Image.open(BytesIO(img['content']))
    #put_text(type(img))
    #put_text(str(img))
    img_t=transform(img)
    batch=torch.unsqueeze(img_t,0)
    batch=batch.to(device)
    out=net(batch)
    _, predicted = torch.max(out.data, dim=1)
    index=predicted.item()
    cls=['surprise','fear','disgust','happiness','sadness','anger','neutral']
    put_text(str(cls[index]))
    
if __name__ == '__main__':
    start_server(
        applications=[mypage,],
        debug=True,
        auto_open_webbrowser=False,
        remote_access=False,
        )

如果碰到RuntimeError: This event loop is already running
查阅资料后,发现所使用的 Python 编辑器为 Spyder,其连接着 IPython 内核,而 IPython 内核本身在事件循环上运行,而 asyncio 不允许嵌套其事件循环,因此会出现如上图的错误信息。



因此需要加上:

import nest_asyncio
nest_asyncio.apply()

欢迎分享,转载请注明来源:内存溢出

原文地址:https://www.54852.com/langs/577638.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-04-11
下一篇2022-04-11

发表评论

登录后才能评论

评论列表(0条)

    保存