【金融】【pytorch】使用深度学习预测期货收盘价涨跌——全连接神经网络模型构建与训练
【金融】【pytorch】使用深度学习预测期货收盘价涨跌——全连接神经网络模型构建与训练模型构建与训练模型构建与训练def get_accuracy(SR,GT,threshold=0.5):SR = SR > thresholdGT = GT == torch.max(GT)corr = torch.sum(SR==GT)# tensor_size = SR.size(0)*SR.size
·
【金融】【pytorch】使用深度学习预测期货收盘价涨跌——全连接神经网络模型构建与训练
模型构建与训练
def get_accuracy(SR,GT,threshold=0.5):
SR = SR > threshold
GT = GT == torch.max(GT)
corr = torch.sum(SR==GT)
# tensor_size = SR.size(0)*SR.size(1)*SR.size(2)*SR.size(3)
tensor_size = SR.size(0)*SR.size(1)
acc = float(corr)/float(tensor_size)
return acc
def get_recall(SR,GT,threshold=0.5):
# Sensitivity == Recall
SR = SR > threshold
GT = GT == torch.max(GT)
# TP : True Positive
# FN : False Negative
TP = ((SR==1)&(GT==1))
FN = ((SR==0)&(GT==1))
SE = float(torch.sum(TP))/(float(torch.sum(TP+FN)) + 1e-6)
return SE
def get_specificity(SR,GT,threshold=0.5):
SR = SR > threshold
GT = GT == torch.max(GT)
# TN : True Negative
# FP : False Positive
TN = ((SR==0)&(GT==0))
FP = ((SR==1)&(GT==0))
SP = float(torch.sum(TN))/(float(torch.sum(TN+FP)) + 1e-6)
return SP
def get_precision(SR,GT,threshold=0.5):
SR = SR > threshold
GT = GT == torch.max(GT)
# TP : True Positive
# FP : False Positive
TP = ((SR==1)&(GT==1))
FP = ((SR==1)&(GT==0))
PC = float(torch.sum(TP))/(float(torch.sum(TP+FP)) + 1e-6)
return PC
loss_history = []
input_size = miData.shape[1]
class ANN(nn.Module):
def __init__(self):
super(ANN, self).__init__() # 面向对象中的继承
self.l1 = nn.Linear(input_size, 50)
# self.s = nn.Sigmoid()
# self.s = nn.ReLU()
self.s = nn.LeakyReLU()
self.l2 = nn.Linear(50 + input_size, 50)
self.l3 = nn.Linear(50 + input_size, 50)
self.l4 = nn.Linear(50 + input_size, 50)
self.l5 = nn.Linear(50 + input_size, 50)
self.l6 = nn.Linear(50 + input_size, 50)
self.l7 = nn.Linear(50 + input_size, 50)
self.l8 = nn.Linear(50 + input_size, 50)
self.l9 = nn.Linear(50 + input_size, 50)
self.l10 = nn.Linear(50 + input_size, 50)
self.f1 = nn.Linear(50, 50)
self.f2 = nn.Linear(50, 50)
self.f3 = nn.Linear(50, 50)
self.f4 = nn.Linear(50, 50)
self.f5 = nn.Linear(50, 50)
self.f6 = nn.Linear(50, 50)
self.f7 = nn.Linear(50, 50)
self.f8 = nn.Linear(50, 50)
self.f9 = nn.Linear(50, 50)
self.f = nn.Linear(50, 2)
def forward(self, x):
out1 = self.l1(x[0,:,:])
out1 = self.s(out1)
out1 = self.s(self.f1(out1))
out2 = torch.cat((out1, x[1,:,:]), dim = 1)
out2 = self.l2(out2)
out2 = self.s(out2)
out2 = self.s(self.f2(out2))
out3 = torch.cat((out2, x[2,:,:]), dim = 1)
out3 = self.l3(out3)
out3 = self.s(out3)
out3 = self.s(self.f3(out3))
out4 = torch.cat((out3, x[3,:,:]), dim = 1)
out4 = self.l4(out4)
out4 = self.s(out4)
out4 = self.s(self.f4(out4))
out5 = torch.cat((out4, x[4,:,:]), dim = 1)
out5 = self.l5(out5)
out5 = self.s(out5)
out5 = self.s(self.f5(out5))
out6 = torch.cat((out5, x[5,:,:]), dim = 1)
out6 = self.l6(out6)
out6 = self.s(out6)
out6 = self.s(self.f6(out6))
out7 = torch.cat((out6, x[6,:,:]), dim = 1)
out7 = self.l7(out7)
out7 = self.s(out7)
out7 = self.s(self.f7(out7))
out8 = torch.cat((out7, x[7,:,:]), dim = 1)
out8 = self.l8(out8)
out8 = self.s(out8)
out8 = self.s(self.f8(out8))
out9 = torch.cat((out8, x[8,:,:]), dim = 1)
out9 = self.l9(out9)
out9 = self.s(out9)
out9 = self.s(self.f9(out9))
out10 = torch.cat((out9, x[9,:,:]), dim = 1)
out10 = self.l10(out10)
out10 = self.s(out10)
out = self.f(out10)
out = self.s(out)
return out
ann = ANN()
optimizer = torch.optim.Adam(ann.parameters(),lr = 0.001)
loss_func = nn.CrossEntropyLoss()
# loss_func = nn.MSELoss()
loss_history = []
accuracy_his = []
precision_his = []
recall_his = []
specificity_his = []
all_y = torch.tensor([])
all_p = torch.tensor([])
for k in range(len(end_ptr)):
if end_ptr[k] >= len(miData):
break
# trainX, trainY = create_dataset(miData[train_ptr[k]:test_ptr[k],:], yData[train_ptr[k]:test_ptr[k]], 10)
# trainLoaderX, trainLoaderY, validateLoaderX, validateLoaderY = trainSet_split(trainX, trainY)
trainLoaderX, trainLoaderY = create_dataset(miData[train_ptr[k]:test_ptr[k],:], yData[train_ptr[k]:test_ptr[k]], 10)
testLoaderX, testLoaderY = create_Test_dataset(miData[test_ptr[k]-10:end_ptr[k],:], yData[test_ptr[k]-10:end_ptr[k]], 10)
print('\nDataSet No.{} data row {}-{}-{}'.format(k, train_ptr[k], test_ptr[k], end_ptr[k]))
# 训练集和验证集
loss_sum_flag = 10 # 用来判断loss是否下降
fall_cnt = 0
train_len = len(trainLoaderX)
train_loss_his = []
for epoch in range(0, 1000):
loss_sum_item = 0
for i, var_x in enumerate(trainLoaderX, 0):
# var_x = Variable(x_train).type(torch.FloatTensor)
# var_y = Variable(y_train).type(torch.FloatTensor)
var_y = trainLoaderY[i]
out = ann(var_x)
# loss = loss_func(out[-1], var_y[-1].view(-1))
# loss = loss_func(out.view(-1, 2), var_y.view(-1))
loss = loss_func(out.view(-1, 2), var_y[-1].view(-1))
optimizer.zero_grad()
loss.backward()
optimizer.step()
loss_sum_item += loss.item()
if loss_sum_item < loss_sum_flag:
loss_sum_flag = loss_sum_item
fall_cnt += 1
if fall_cnt % 100 == 0:
print('\nDataSet Train Epoch:{}, Avg Loss:{:.10f}'.format(epoch, loss_sum_item/train_len))
else:
print('>',end='')
else:
# fall_cnt += 1
print('-',end='')
train_loss_his.append(loss_sum_item)
plt.plot(train_loss_his)
plt.show()
torch.save(obj=ann.state_dict(), f="main_models/ANN_k"+str(k)+".pth")
# 测试
print('Test')
test_y = torch.tensor([])
test_p = torch.tensor([])
softm_p = torch.tensor([])
for i, var_x in enumerate(testLoaderX, 0):
var_y = testLoaderY[i]
out = ann(var_x)
# loss = loss_func(out[-1], var_y[-1].view(-1))
# loss = loss_func(out.view(-1, 2), var_y.view(-1))
loss = loss_func(out.view(-1, 2), var_y[-1].view(-1))
# 取最后一个数,由于batch_size不为1
test_y = torch.cat((test_y, var_y[-1]), 0)
# test_p = torch.cat((test_p, out[-1]), 1)
# if (i+1) % 20==0:
# print('DataSet No.{}, Test step:{}, Loss:{:.5f}'.format(k, i, loss.item()))
loss_history.append(loss.item())
# ind_y = torch.max(var_y[-1], dim = 1)
ind_p = torch.max(out, dim = 1)
# print(out[-1],end=' ')
# 用于计算ROC
softMax_func = nn.Softmax(dim=1)
out_p = softMax_func(out)
softm_p = torch.cat((softm_p, out_p), 0)
# test_y = torch.cat((test_y, ind_y.indices.view(-1, 1)), 1)
test_p = torch.cat((test_p, ind_p.indices.view(-1, 1)), 0)
all_y = torch.cat((all_y, test_y), 0)
all_p = torch.cat((all_p, softm_p), 0)
print((test_p + test_y*10).view(-1))
Acc = get_accuracy(test_p, test_y)
accuracy_his.append(Acc)
print('------------- DataSet:{}, Accuracy:{:.5f} -------------'.format(k, Acc))
Pc = get_precision(test_p, test_y)
precision_his.append(Pc)
print('------------- DataSet:{}, Precision:{:.5f} -------------'.format(k, Pc))
Recall = get_recall(test_p, test_y)
recall_his.append(Recall)
print('------------- DataSet:{}, Recall:{:.5f} -------------'.format(k, Recall))
Sp = get_specificity(test_p, test_y)
specificity_his.append(Sp)
print('------------- DataSet:{}, Specificity:{:.5f} -------------'.format(k, Sp))

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。
更多推荐
所有评论(0)