-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtrain_util.py
139 lines (123 loc) · 4.58 KB
/
train_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/usr/bin/env python
# coding=utf-8
'''
* @File : train_evaluate.py
* @Time : 2020/03/02 00:27:45
* @Author : Hanielxx
* @Version : 1.0
* @Desc : 包含train和evaluate函数
'''
import scipy
import torch
import numpy as np
import torch.utils.data as tud
USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device('cuda' if USE_CUDA else 'cpu')
def train(model, iterator, optimizer, criterion, device='cpu'):
'''
Desc:
用于训练模型的函数,返回训练的平均epoch误差
Args:
model: torch.nn.model -- 待训练的模型
iterator -- 包含x和y的dataloader
optimizer -- 优化器
criterion -- 损失函数
device -- 指定的设备,可以是'cpu'或'cuda'
Returns:
loss: float -- 每个epoch的平均损失
'''
# 设置model的状态为train
model.train()
epoch_loss = 0
# x: tensor[batch_size, seq_size],每个样本的词的idx,y: 对应label
for i, (x, y) in enumerate(iterator):
x, y = x.to(DEVICE), y.to(DEVICE)
# 在BP之前需要zero_grad,将之前的梯度置零
optimizer.zero_grad()
predictions, _ = model(x)
predictions = predictions.view(-1, 1)
# 计算loss并且backward
loss = criterion(predictions.flatten().to(DEVICE), y.to(DEVICE))
loss.backward()
optimizer.step()
epoch_loss += loss.item()
return epoch_loss / len(iterator)
def evaluate(model, iterator, criterion, device='cpu'):
'''
Desc:
测试模型的函数,返回验证集的平均epoch误差
Args:
model: torch.nn.model -- 带验证的模型
iterator: dataloader -- 用于验证模型的数据集
criterion -- 损失函数
device -- 运行的设备,可以是'cpu'或'cuda'
Returns:
res -- 保存loss和可选pred
'''
# 设置model状态为evaluate
model.eval()
epoch_loss = 0.
epoch_pcc = 0.
epoch_acc = 0.
# 验证时不用计算梯度,也不用BP
with torch.no_grad():
for i, (x, y) in enumerate(iterator):
x, y = x.to(DEVICE), y.to(DEVICE)
predictions, _ = model(x)
# 计算Loss
loss = criterion(predictions.flatten().to(DEVICE), y.to(DEVICE))
epoch_loss += loss.item()
# 返回对应数据
return epoch_loss / len(iterator)
def predict_samples(model, data=None):
'''
Desc:
使用model预测某几个样本的结果
Args:
model -- 训练好的model
data: list or tud.dataloader -- 保存待预测数据,这些序列需要已经被分词好
list: 保存的是对应下标的encoded序列,只返回预测值
tud.dataloader: 保存测试数据和对应的label,__getitem__方法中应该返回一个样本和对应的label,返回预测值和真实值
base_to_idx: dict -- 词到下标的对应关系字典
Returns:
pre: ndarray(n, ) -- 预测出的结果
freq: ndarray(n, ) -- 真实的label,只有在data类型为dataloader时才返回
'''
# 异常处理
if data is None:
raise ValueError("data为分割好的待预测的文本序列,不可为None")
# 设置模型为evaluate
model.eval()
# 对list和ndarray的处理
lst = [list, np.ndarray]
if type(data) in lst:
res_x = torch.tensor(np.array(data)).long().to(DEVICE)
res_freq = np.empty(0)
energies = []
# batch_size设置为64,分batch预测
for i in range(0, res_x.shape[0], 64):
batch_x = res_x[i:i + 64, :]
batch_pre, energy = model(batch_x)
if batch_pre.is_cuda:
batch_pre = batch_pre.cpu()
energy = energy.cpu()
energies.append(energy)
batch_pre = batch_pre.data.numpy().flatten()
energy = energy.data.numpy()
res_freq = np.concatenate((res_freq, batch_pre))
return res_freq, energies
# 对DataLoader的处理
elif type(data) == tud.DataLoader:
pre = freq = np.empty(0)
energies = []
for i, (x, y) in enumerate(data):
prediction, energy = model(x)
if prediction.is_cuda:
prediction = prediction.cpu()
energy = energy.cpu()
prediction = prediction.data.numpy().flatten()
energies.append(energy.data.numpy())
y = y.numpy().flatten()
pre = np.concatenate((pre, prediction))
freq = np.concatenate((freq, y))
return pre, freq, energies