
参考:https://blog.csdn.net/weixin_44493291/article/details/111932895
感谢Chaossll提供的基本思路!
每次训练都要写完整的流程,比较麻烦,实际上,除了网络要修改,其他部分大同小异。
我们可以梳理成一定格式。
这里针对图像分类任务,给出训练框架。
默认图像数据集的格式是ImageNet。
- 准备数据
- 设计模型
- 设置Loss和Optimizer
- 训练和测试
- (推理)
import torch
from torchvision import transforms
from torchvision import datasets
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch.optim as optim
train_dir=''
test_dir=''
batch_size = 64
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
normalize = transforms.Normalize(mean=[0.5,0.5,0.5],
std=[0.5,0.5,0.5])
train_dataset = datasets.ImageFolder(train_dir,
transforms.Compose([
transforms.RandomResizedCrop((224, 224)),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
normalize]))
test_dataset = datasets.ImageFolder(test_dir,
transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
normalize]))
train_loader = DataLoader(train_dataset,
batch_size=batch_size,
shuffle=True)
test_loader = DataLoader(test_dataset,
batch_size=batch_size,
shuffle=True)
设计模型/导入模型
import torch
import torchvision
#num_classes=10
net = torchvision.models.resnet34()
net.to(device)
print(net)
设置Loss和Optimizer
lr=0.01
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=lr, momentum=0.5)
训练和测试
def train(epoch):
running_loss = 0.0
correct = 0
total = 0
net.train()
for batch_idx, data in enumerate(train_loader, 0):
inputs, target = data
inputs, target = inputs.to(device), target.to(device)
optimizer.zero_grad()
outputs = net(inputs)
_, predicted = torch.max(outputs.data, dim=1)
total += target.size(0)
correct += (predicted == target).sum().item()
loss = criterion(outputs, target)
loss.backward()
optimizer.step()
running_loss += loss.item()
if (batch_idx+1) % 30 == 0:
print('[%d, %5d] loss: %.3f acc: %.3f' % (epoch + 1, batch_idx + 1, running_loss / 30, correct / total))
running_loss = 0.0
print('[%d] Accuracy on train set: %d %% [%d/%d]' % (epoch+1, 100 * correct / total, correct, total))
return correct / total
def test():
net.eval()
correct = 0
total = 0
with torch.no_grad():
for data in test_loader:
inputs, target = data
inputs, target = inputs.to(device), target.to(device)
outputs = net(inputs)
_, predicted = torch.max(outputs.data, dim=1)
total += target.size(0)
correct += (predicted == target).sum().item()
print('[%d] Accuracy on test set: %d %% [%d/%d]' % (epoch+1, 100 * correct / total, correct, total))
return correct / total
checkpoint=''
if checkpoint!='':
net.load_state_dict(torch.load(checkpoint))
net_name='res18'
txt_name = './log/' + net_name + 'log.txt'
train_acc_list=[]
test_acc_list=[]
print("===================================Start Training===================================")
for epoch in range(100):
train_acc=train(epoch)
test_acc=test()
with open(txt_name, 'a') as f:
f.write(str(train_acc) + ' '+str(test_acc) + '\n')
train_acc_list.append(train_acc)
test_acc_list.append(test_acc)
if (epoch+1)%30 == 0:
torch.save(net.state_dict(),'{}-{}-{}-{}-{:.3f}-{:.3f}.pt'.format(net_name,epoch+1,batchsize,lr,train_acc,test_acc))
print("=================================Training Finished==================================")
print(train_acc_list)
print(test_acc_list)
推理代码
from model.resnet import MyNet
from model.resnet import ResBlock
import torch
from torchvision import transforms
from torchvision import datasets
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch.optim as optim
img_dir='autodl-tmp/RAF/test/1/test_0008_aligned.jpg'
checkpoint='autodl-tmp/ckpt/res18-100-64-0.01-0.814-0.899.pt'
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
#net_name='res18_3'
normalize = transforms.Normalize(mean=[0.5,0.5,0.5],
std=[0.5,0.5,0.5])
transform=transforms.Compose([transforms.Resize((32, 32)),
transforms.ToTensor(),
normalize])
from PIL import Image
img = Image.open(img_dir)
img_t=transform(img)
batch=torch.unsqueeze(img_t,0)
batch=batch.to(device)
net=MyNet(ResBlock)
net.to(device)
if checkpoint!='':
net.load_state_dict(torch.load(checkpoint))
out=net(batch)
_, predicted = torch.max(out.data, dim=1)
index=predicted.item()
cls=['surprise','fear','disgust','happiness','sadness','anger','neutral']
print(cls[index])
完整代码
import torch
from torchvision import transforms
from torchvision import datasets
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch.optim as optim
train_dir='autodl-tmp/RAF/train'
test_dir='autodl-tmp/RAF/test'
checkpoint='autodl-tmp/ckpt/res18-100-64-0.01-0.814-0.899.pt'
net_name='res18_3'
batch_size = 64
epochs=30
lr=0.001
save_dir='./autodl-tmp/ckpt/'+net_name+'/'
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
normalize = transforms.Normalize(mean=[0.5,0.5,0.5],
std=[0.5,0.5,0.5])
train_dataset = datasets.ImageFolder(train_dir,
transforms.Compose([
transforms.RandomResizedCrop((32, 32)),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
normalize]))
test_dataset = datasets.ImageFolder(test_dir,
transforms.Compose([
transforms.Resize((32, 32)),
transforms.ToTensor(),
normalize]))
train_loader = DataLoader(train_dataset,
batch_size=batch_size,
shuffle=True)
test_loader = DataLoader(test_dataset,
batch_size=batch_size,
shuffle=True)
from model.resnet import MyNet
from model.resnet import ResBlock
net=MyNet(ResBlock)
net.to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=lr, momentum=0.5)
def train(epoch):
running_loss = 0.0
correct = 0
total = 0
net.train()
for batch_idx, data in enumerate(train_loader, 0):
inputs, target = data
inputs, target = inputs.to(device), target.to(device)
optimizer.zero_grad()
# forward + backward + update
outputs = net(inputs)
_, predicted = torch.max(outputs.data, dim=1)
total += target.size(0)
correct += (predicted == target).sum().item()
loss = criterion(outputs, target)
loss.backward()
optimizer.step()
running_loss += loss.item()
if (batch_idx+1) % 30 == 0:
print('[%d, %3d] loss: %.3f acc: %.3f' % (epoch + 1, batch_idx + 1, running_loss / 30, correct / total))
#print(predicted)
#print(total)
#print(correct)
running_loss = 0.0
print('[%d] Accuracy on train set: %d %% [%d/%d]' % (epoch+1, 100 * correct / total, correct, total))
return correct / total
def test():
net.eval()
correct = 0
total = 0
with torch.no_grad():
for data in test_loader:
inputs, target = data
inputs, target = inputs.to(device), target.to(device)
outputs = net(inputs)
_, predicted = torch.max(outputs.data, dim=1)
total += target.size(0)
correct += (predicted == target).sum().item()
#print(predicted)
#print(target.size(0))
#print((predicted == target).sum().item())
print('[%d] Accuracy on test set: %d %% [%d/%d]' % (epoch+1, 100 * correct / total, correct, total))
return correct / total
if checkpoint!='':
net.load_state_dict(torch.load(checkpoint))
txt_name = save_dir + net_name + '_log.txt'
import os
if not os.path.exists(save_dir):
os.makedirs(save_dir)
train_acc_list=[]
test_acc_list=[]
print("===================================Start Training===================================")
for epoch in range(epochs):
train_acc=train(epoch)
test_acc=test()
with open(txt_name, 'a') as f:
f.write(str(train_acc) + ' '+str(test_acc) + '\n')
train_acc_list.append(train_acc)
test_acc_list.append(test_acc)
if (epoch+1)%10 == 0:
torch.save(net.state_dict(),save_dir+'{}-{}-{}-{}-{:.3f}-{:.3f}.pt'.format(net_name,epoch+1,batch_size,lr,train_acc,test_acc))
print("=================================Training Finished==================================")
print(train_acc_list)
print(test_acc_list)
pywebio推理
from model.resnet import MyNet
from model.resnet import ResBlock
import torch
from torchvision import transforms
from torchvision import datasets
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch.optim as optim
from pywebio.input import *
from pywebio.output import *
from pywebio.pin import *
from pywebio import start_server
from PIL import Image
from io import BytesIO
checkpoint='model/res18-100-64-0.01-0.814-0.899.pt'
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
normalize = transforms.Normalize(mean=[0.5,0.5,0.5],
std=[0.5,0.5,0.5])
transform=transforms.Compose([transforms.Resize((32, 32)),
transforms.ToTensor(),
normalize])
net=MyNet(ResBlock)
net.to(device)
if checkpoint!='':
net.load_state_dict(torch.load(checkpoint))
def mypage():
img = file_upload("Select a image:", accept="image/*",placeholder='请上传图片')
#put_file('1.png',img['content'])
img=Image.open(BytesIO(img['content']))
#put_text(type(img))
#put_text(str(img))
img_t=transform(img)
batch=torch.unsqueeze(img_t,0)
batch=batch.to(device)
out=net(batch)
_, predicted = torch.max(out.data, dim=1)
index=predicted.item()
cls=['surprise','fear','disgust','happiness','sadness','anger','neutral']
put_text(str(cls[index]))
if __name__ == '__main__':
start_server(
applications=[mypage,],
debug=True,
auto_open_webbrowser=False,
remote_access=False,
)
如果碰到RuntimeError: This event loop is already running
查阅资料后,发现所使用的 Python 编辑器为 Spyder,其连接着 IPython 内核,而 IPython 内核本身在事件循环上运行,而 asyncio 不允许嵌套其事件循环,因此会出现如上图的错误信息。
因此需要加上:
import nest_asyncio
nest_asyncio.apply()
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)