最近重看了一遍大作手回忆录,就想能否让机器识别一下 M 顶、 W 底这种相当主观的技术分析。
假设各种技术分析图形可以看作是各种技术分析因子在一段时间内的相对比例变化,
这样可以认为将技术分析因子进行标准化之后不会损失太多的信息。
将因子随交易时间推进的变换看作时间序列 in_length 表示时间序列的长度,
in_width 看作时间序列的宽度也就是并列的多种技术分析因子数值。
不考虑时间推进因素的影响可以使用 CNN 等非时间序列 DNN 处理。
考虑时间推进对因子变化有影响,此时刻因子变化和股价涨跌受到前面状态的影响的时间序列可以使用 HMM 或 RNN 处理。
HMM 常见的假设为状态 i_t 只受到状态 i_{t-1}时刻的影响, RNN 则泛泛认为可以使用训练集来学习到时间序列的非线性关系。
本帖对于 RNN 结构进行简单探索。
训练 RNN 模型定长序列,假设在 T_i 交易日可以通过前 m 交易日技术分析走势预判后第 n 交易日股价涨跌幅度,并且 RNN 模型可以自动从 T_{i-m}到 T_i 时间序列学习到这种预测关系。
输入数据格式[批次,步长,多因子] 其中步长表示从 T_{i-m}到 T_i 时间序列
class.fit(trainX, trainY)训练模型
clf.pred_prob(trainX) 预测返回概率矩阵
clf.pred_signal(trainX) 预测返回标签
trainX 输入格式 [row, in_length, in_width]
trainY 输入格式 [row]
batch_size=128 喂入批次大小
display_step=5 显示步长
layer_units_num=2000 隐藏层单元数目
training_epoch=100 训练次数
class test_1(object):
def __init__(self,
batch_size = 128,
learning_rate = 0.001,
training_epoch = 10,
display_step = 5,
layer_units_num = 100):
self.batch_size = batch_size
self.learning_rate = learning_rate
self.training_epoch = training_epoch
self.display_step = display_step
self.layer_units_num = layer_units_num
def dense_to_one_hot(self,labels_dense):
"""标签 转换 one hot 编码
输入 labels_dense 必须为非负数
2016-11-21
"""
num_classes = len(np.unique(labels_dense)) # np.unique 去掉重复函数
raws_labels = labels_dense.shape[0]
index_offset = np.arange(raws_labels) * num_classes
labels_one_hot = np.zeros((raws_labels, num_classes))
labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
return labels_one_hot
def Preprocessing(self, trainX, trainY, seed=False):
trainY = self.dense_to_one_hot(trainY)
self.in_length= in_length= trainX.shape[1]
self.in_width= in_width= trainX.shape[2]
self.out_classes= out_classes= trainY.shape[1]
if seed:
tf.set_random_seed(20170204)
weights = {
'out': tf.Variable(tf.truncated_normal(shape=[self.layer_units_num, out_classes],
mean=0., stddev=1., seed=None, dtype=tf.float32),
trainable=True, name='Weight_full_out')
}
biases = {
'out': tf.Variable(tf.truncated_normal([out_classes]), trainable=True, name= 'Biases_full_out')
}
self.weights = weights
self.biases = biases
X = tf.placeholder(dtype=tf.float32, shape=[None, in_length, in_width], name='trainX') # 批次,时间序列,多因子
Y = tf.placeholder(dtype= tf.float32, shape=[None, out_classes], name='trainY')
keep_prob = tf.placeholder(dtype= tf.float32)
self.X = X
self.Y = Y
self.keep_prob = keep_prob
def Network(self):
with tf.name_scope('layer_1'):
monolayer_1 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num,
forget_bias=1., state_is_tuple=True, activation=tf.tanh)
monolayer_1 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_1, output_keep_prob= keep_prob)
with tf.name_scope('layer_2'):
monolayer_2 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num,
forget_bias=1., state_is_tuple=True, activation=tf.tanh)
monolayer_2 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_2, output_keep_prob= keep_prob)
with tf.name_scope('layer_3'):
monolayer_3 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num,
forget_bias=1., state_is_tuple=True, activation=tf.tanh)
monolayer_3 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_3, output_keep_prob= keep_prob)
with tf.name_scope('layer_Final'):
monolayer_final = tf.nn.rnn_cell.BasicLSTMCell(num_units=self.layer_units_num,
forget_bias=1., state_is_tuple=True, activation=tf.tanh)
with tf.name_scope('Layer_Structure_Combination'):
layer_units_num = self.layer_units_num
Layers = tf.nn.rnn_cell.MultiRNNCell(cells=[monolayer_1,monolayer_2,monolayer_3,monolayer_final],
state_is_tuple = True)
self.Layers = Layers
return Layers
def Model(self):
X = self.X
keep_prob = self.keep_prob
X = tf.transpose(X, [1, 0, 2])
X = tf.reshape(X, [-1,self.in_width])
X = tf.split(split_dim=0, num_split=self.in_length, value=X)
with tf.name_scope('layer_1'):
monolayer_1 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num,
forget_bias=1., state_is_tuple=True, activation=tf.tanh)
monolayer_1 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_1, output_keep_prob= keep_prob)
with tf.name_scope('layer_2'):
monolayer_2 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num,
forget_bias=1., state_is_tuple=True, activation=tf.tanh)
monolayer_2 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_2, output_keep_prob= keep_prob)
with tf.name_scope('layer_3'):
monolayer_3 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num,
forget_bias=1., state_is_tuple=True, activation=tf.tanh)
monolayer_3 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_3, output_keep_prob= keep_prob)
with tf.name_scope('layer_Final'):
monolayer_final = tf.nn.rnn_cell.BasicLSTMCell(num_units=self.layer_units_num,
forget_bias=1., state_is_tuple=True, activation=tf.tanh)
with tf.name_scope('Layer_Structure_Combination'):
layer_units_num = self.layer_units_num
Layers = tf.nn.rnn_cell.MultiRNNCell(cells=[monolayer_1,monolayer_2,monolayer_3,monolayer_final],
state_is_tuple = True)
outputs,_ = tf.nn.rnn(cell=monolayer_final, inputs=X, dtype=tf.float32)
output = outputs[-1]
return tf.nn.bias_add(value= tf.matmul(output, self.weights['out']), bias= self.biases['out'])
def train(self, trainX, trainY, seed=False):
self.sess = tf.InteractiveSession()
self.Preprocessing(trainX, trainY, seed)
tmp = self.Model()
self.predict = tf.nn.softmax(tmp)
self.cost = tf.reduce_sum(tf.nn.softmax_cross_entropy_with_logits(tmp, self.Y))
optimizer = tf.train.AdamOptimizer(learning_rate= self.learning_rate) # 0 设置训练器
grads_and_vars = optimizer.compute_gradients(self.cost)
for i, (grid, var) in enumerate(grads_and_vars):
if grid != None:
grid = tf.clip_by_value(grid, -1., 1.)
grads_and_vars[i] = (grid, var)
optimizer = optimizer.apply_gradients(grads_and_vars)
self.optimizer = optimizer
self.correct_pred = tf.equal(tf.argmax(tmp,1), tf.argmax(self.Y,1))
accuracy = tf.reduce_mean(tf.cast(self.correct_pred, tf.float32))
self.accuracy = accuracy
#self.init = tf.global_variables_initializer()
self.init = tf.initialize_all_variables()
def fit(self,trainX, trainY, dropout = 0.3, seed=True):
self.train(trainX, trainY, seed=True)
sess = self.sess
sess.run(self.init)
batch_size = self.batch_size
trainY = self.dense_to_one_hot(trainY)
for ep in range(self.training_epoch):
for i in range(int(len(trainX)/batch_size)+1):
if i < int(len(trainX)/batch_size)+1:
batch_x = trainX[i*batch_size : (i+1)*batch_size]
batch_y = trainY[i*batch_size : (i+1)*batch_size]
elif i== int(len(trainX)/batch_size)+1:
batch_x = trainX[-batch_size:]
batch_y = trainY[-batch_size:]
sess.run(self.optimizer, feed_dict={self.X:batch_x, self.Y:batch_y, self.keep_prob:(1.-dropout)})
if ep%self.display_step==0:
loss, acc = sess.run([self.cost,self.accuracy], feed_dict={self.X:trainX, self.Y:trainY, self.keep_prob:1.})
print (str(ep)+"th "+'Epoch Loss = {:.5f}'.format(loss)+" Training Accuracy={:.5f}".format(acc))
self.sess= sess
print("Optimization Finished!")
def pred_prob(self, testX):
sess = self.sess
batch_size = self.batch_size
trainX = testX
predict_output = np.zeros([1,self.out_classes])
for i in range(int(len(trainX)/batch_size)+1):
if i < int(len(trainX)/batch_size)+1:
batch_x = trainX[i*batch_size : (i+1)*batch_size]
batch_y = trainY[i*batch_size : (i+1)*batch_size]
elif i== int(len(trainX)/batch_size)+1:
batch_x = trainX[-batch_size:]
batch_y = trainY[-batch_size:]
tp = sess.run(self.predict,feed_dict={self.X:batch_x, self.keep_prob:1.})
predict_output = np.row_stack([predict_output, tp])
predict_output = np.delete(predict_output, obj=0, axis=0)
return predict_output
def pred(self, testX):
pred_prob = self.pred_prob(testX)
return np.argmax(pred_prob, axis=1)
训练 RNN 模型定长序列,假设在 T_i 交易日可以通过前 m 交易日技术分析走势预判后第 n 交易日股价涨跌幅度, RNN 模型可以从 T_{i-m}到 T_i 时间序列学习到这种预测关系,但是无法训练得到准确的稀疏权重。对应 T 交易日进行预判的时候考虑前 2 周走势或者前 2 个月的走势,这个时候模型无法隐式学习,需要指定训练集为变长序列对应涨跌标签。
输入 对于变长时间序列 设格式为
[batch_size, real_length, in_width]
其中 real_length 表示变长时间序列输入步长,为一个变化值,使用 LSTM 进行预测的时候将输入数据修改格式为
batch_size, in_length, in_width
其中 in_length 使用 real_length 中的最大步长值,空余部分使用 0 填充
修改原始稠密矩阵为稀疏矩阵统一格式进行输入。
输出裁剪 由于输入 tensor 为稀疏矩阵,则对应的 RNN 网络计算得到的矩阵为稀疏矩阵(对于全为 0 的填充稀疏部分进行数值优化的时候梯度为 0 实际不变化)
将拓扑结构图得到的矩阵进行裁剪,使得输出 tensor 格式从
[batch_size, in_length, layer_units_num]
转换为
[batch_size, 1, layer_units_num]
softmax [batch_size, out_classes]
这里 1 表示这里对输入步长取实际输入步长最后一步。
import functools
from functools import reduce
import numpy as np
import tensorflow as tf
def lazy_property(function):
attribute = '_' + function.__name__
@property
@functools.wraps(function)
def wrapper(self):
if not hasattr(self, attribute):
setattr(self, attribute, function(self))
return getattr(self, attribute)
return wrapper
class test(object):
def __init__(self,
batch_size = 128,
learning_rate = 0.001,
error = .01,
display_step = 5,
layer_units_num = 200):
self.batch_size = batch_size
self.learning_rate = learning_rate
self.error = error
self.display_step = display_step
self.layer_units_num = layer_units_num
def dense_to_one_hot(self,labels_dense):
"""标签 转换 one hot 编码
输入 labels_dense 必须为非负数
2016-11-21
"""
num_classes = len(np.unique(labels_dense)) # np.unique 去掉重复函数
raws_labels = labels_dense.shape[0]
index_offset = np.arange(raws_labels) * num_classes
labels_one_hot = np.zeros((raws_labels, num_classes))
labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
return labels_one_hot
# 获得权重和偏置
@staticmethod
def _weight_and_bias(in_size, out_size):
weight = tf.truncated_normal([in_size, out_size], stddev=0.01)
bias = tf.constant(0.1, shape=[out_size])
return tf.Variable(weight), tf.Variable(bias)
@lazy_property
def length(self):
dense_sign = tf.sign(tf.reduce_max(tf.abs(self.X),reduction_indices=2))
length = tf.reduce_sum(input_tensor=dense_sign, reduction_indices=1)
length = tf.cast(length, tf.int32)
return length
@staticmethod
def _final_relevant(output, length):
# length 输入时间序列的实际长度
# in_length 表示输入时间序列长度
# max_length 表示最大时间序列长度,也就是稀疏矩阵 最大时间序列长度
batch_size = tf.shape(output)[0]
max_length = int(output.get_shape()[1])
output_size = int(output.get_shape()[2])
index = tf.range(start=0, limit=batch_size)*max_length + (length-1) # 这里使用 max_length 开创间隔,使用 length-1 表示实际位置,最后一个输出的位置
flat = tf.reshape(output, [-1,output_size]) # 将输出展平, batch_size*length in_width
relevant = tf.gather(flat, index) # 根据实际长度选出最后一个输出 output 状态使用
return relevant
def Preprocessing(self, trainX, trainY):
self.in_length= in_length= trainX.shape[1]
self.in_width= in_width= trainX.shape[2]
self.out_classes= out_classes= trainY.shape[1]
self.X = tf.placeholder(dtype=tf.float32, shape=[None, in_length, in_width], name='trainX') # 批次,时间序列,多因子
self.Y = tf.placeholder(dtype= tf.float32, shape=[None, out_classes], name='trainY')
self.keep_prob = tf.placeholder(dtype= tf.float32)
def str2float(self,s):
def char2num(s):
return {'0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9}[s]
n = s.index('.')
return reduce(lambda x,y:x*10+y,map(char2num,s[:n]+s[n+1:]))/(10**n)
def Interface(self):
# 4 层 GRU 结构描述
monolayer = tf.nn.rnn_cell.GRUCell(num_units= self.layer_units_num)
monolayer = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer, output_keep_prob=self.keep_prob)
monolayer_final = tf.nn.rnn_cell.GRUCell(num_units= self.layer_units_num)
layers = tf.nn.rnn_cell.MultiRNNCell([monolayer]*3+[monolayer_final])
# 激活 注意 in_length 表示输入序列步长, length 表示实际步长
output,_ = tf.nn.dynamic_rnn(cell= layers, inputs= self.X, dtype= tf.float32, sequence_length= self.length)
output = self._final_relevant(output, self.length)
weights, biases = self._weight_and_bias(self.layer_units_num, self.out_classes)
Prediction = tf.nn.bias_add(tf.matmul(output, weights),biases)
return Prediction
def Graph(self, trainX, trainY):
try:
tf.InteractiveSession.close()
except:
pass
self.sess = tf.InteractiveSession()
tf.get_default_session()
self.Preprocessing(trainX, trainY)
tmp = self.Interface()
self.pred = tf.nn.softmax(tmp)
self.cost = tf.reduce_sum(tf.nn.softmax_cross_entropy_with_logits(tmp, self.Y))
optimizer = tf.train.AdamOptimizer(learning_rate= self.learning_rate) # 0 设置训练器
grads_and_vars = optimizer.compute_gradients(self.cost)
for i, (grid, var) in enumerate(grads_and_vars):
if grid != None:
grid = tf.clip_by_value(grid, -1., 1.)
grads_and_vars[i] = (grid, var)
optimizer = optimizer.apply_gradients(grads_and_vars)
self.optimizer = optimizer
self.correct_pred = tf.equal(tf.argmax(tmp,1), tf.argmax(self.Y,1))
self.accuracy = tf.reduce_mean(tf.cast(self.correct_pred, tf.float32))
#self.init = tf.global_variables_initializer()
self.init = tf.initialize_all_variables()
def fit(self, trainX, trainY, dropout= 0.618):
# 对标签 one_hot 编码
trainY = self.dense_to_one_hot(trainY)
self.Graph(trainX, trainY)
self.sess.run(self.init)
batch_size = self.batch_size
sig =10.
ep = 0
while (sig > self.error):
for i in range(int(len(trainX)/batch_size)+1):
if i < int(len(trainX)/batch_size)+1:
batch_x = trainX[i*batch_size : (i+1)*batch_size]
batch_y = trainY[i*batch_size : (i+1)*batch_size]
elif i== int(len(trainX)/batch_size)+1:
batch_x = trainX[-batch_size:]
batch_y = trainY[-batch_size:]
self.sess.run(self.optimizer,feed_dict={self.X:batch_x, self.Y:batch_y, self.keep_prob:(1.-dropout)})
sig = self.sess.run(self.accuracy, feed_dict={self.X:trainX, self.Y:trainY, self.keep_prob:1.})
if ep%self.display_step==0:
loss = self.sess.run(self.cost, feed_dict={self.X:trainX, self.Y:trainY, self.keep_prob:1.})
print (str(ep)+"th "+'Epoch Loss = {:.5f}'.format(loss)+" Training Accuracy={:.5f}".format(sig))
ep += 1
print("Optimization Finished!")
def pred_prob(self, testX):
batch_size = self.batch_size
trainX = testX
predict_output = np.zeros([1,self.out_classes])
for i in range(int(len(trainX)/batch_size)+1):
if i < int(len(trainX)/batch_size)+1:
batch_x = trainX[i*batch_size : (i+1)*batch_size]
batch_y = trainY[i*batch_size : (i+1)*batch_size]
elif i== int(len(trainX)/batch_size)+1:
batch_x = trainX[-batch_size:]
batch_y = trainY[-batch_size:]
tp = self.sess.run(self.pred, feed_dict={self.X:batch_x, self.keep_prob:1.})
predict_output = np.row_stack([predict_output, tp])
predict_output = np.delete(predict_output, obj=0, axis=0)
return predict_output
def pred_signal(self, testX):
pred_prob = self.pred_prob(testX)
return np.argmax(pred_prob, axis=1)
因限制原因,图片与源代码戳这里: https://uqer.io/community/share/589d3cc2c1e3cc00567fdbea
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.