Tensorflow 笔记 RNN 预测时间序列

2017-02-10 13:22:33 +08:00
 LittleUqeer

最近重看了一遍大作手回忆录,就想能否让机器识别一下 M 顶、 W 底这种相当主观的技术分析。

假设各种技术分析图形可以看作是各种技术分析因子在一段时间内的相对比例变化,

这样可以认为将技术分析因子进行标准化之后不会损失太多的信息。

将因子随交易时间推进的变换看作时间序列 in_length 表示时间序列的长度,

in_width 看作时间序列的宽度也就是并列的多种技术分析因子数值。

不考虑时间推进因素的影响可以使用 CNN 等非时间序列 DNN 处理。

考虑时间推进对因子变化有影响,此时刻因子变化和股价涨跌受到前面状态的影响的时间序列可以使用 HMM 或 RNN 处理。

HMM 常见的假设为状态 i_t 只受到状态 i_{t-1}时刻的影响, RNN 则泛泛认为可以使用训练集来学习到时间序列的非线性关系。

本帖对于 RNN 结构进行简单探索。

定长时间序列

训练 RNN 模型定长序列,假设在 T_i 交易日可以通过前 m 交易日技术分析走势预判后第 n 交易日股价涨跌幅度,并且 RNN 模型可以自动从 T_{i-m}到 T_i 时间序列学习到这种预测关系。

输入数据格式[批次,步长,多因子] 其中步长表示从 T_{i-m}到 T_i 时间序列

class.fit(trainX, trainY)训练模型

clf.pred_prob(trainX) 预测返回概率矩阵

clf.pred_signal(trainX) 预测返回标签

trainX 输入格式 [row, in_length, in_width]

trainY 输入格式 [row]

batch_size=128 喂入批次大小

display_step=5 显示步长

layer_units_num=2000 隐藏层单元数目

training_epoch=100 训练次数

class test_1(object):
    def __init__(self,
                batch_size = 128,
                learning_rate = 0.001,
                training_epoch = 10,
                display_step = 5,
                layer_units_num = 100):
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.training_epoch = training_epoch
        self.display_step = display_step
        self.layer_units_num = layer_units_num
        
    def dense_to_one_hot(self,labels_dense):
        """标签 转换 one hot 编码
        输入 labels_dense 必须为非负数
        2016-11-21
        """
        num_classes = len(np.unique(labels_dense)) # np.unique 去掉重复函数
        raws_labels = labels_dense.shape[0]
        index_offset = np.arange(raws_labels) * num_classes
        labels_one_hot = np.zeros((raws_labels, num_classes))
        labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
        return labels_one_hot  
        
    def Preprocessing(self, trainX, trainY, seed=False):
        trainY = self.dense_to_one_hot(trainY)
        self.in_length= in_length= trainX.shape[1]
        self.in_width= in_width= trainX.shape[2]
        self.out_classes= out_classes= trainY.shape[1]
        
        if seed:
            tf.set_random_seed(20170204)
        weights = {
            'out': tf.Variable(tf.truncated_normal(shape=[self.layer_units_num, out_classes], 
                                                mean=0., stddev=1., seed=None, dtype=tf.float32),
                               trainable=True, name='Weight_full_out')
        }
        
        biases = {
            'out': tf.Variable(tf.truncated_normal([out_classes]), trainable=True, name= 'Biases_full_out')
        }    
        
        self.weights = weights
        self.biases = biases        
        
        X = tf.placeholder(dtype=tf.float32, shape=[None, in_length, in_width], name='trainX') # 批次,时间序列,多因子
        Y = tf.placeholder(dtype= tf.float32, shape=[None, out_classes], name='trainY') 
        keep_prob = tf.placeholder(dtype= tf.float32)
        self.X = X
        self.Y = Y
        self.keep_prob = keep_prob  
        
    def Network(self):
        
        with tf.name_scope('layer_1'):            
            monolayer_1 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num, 
                                                       forget_bias=1., state_is_tuple=True, activation=tf.tanh)            
            monolayer_1 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_1, output_keep_prob= keep_prob)

        
        with tf.name_scope('layer_2'):        
            monolayer_2 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num,
                                                       forget_bias=1., state_is_tuple=True, activation=tf.tanh)
            monolayer_2 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_2, output_keep_prob= keep_prob)
        
        with tf.name_scope('layer_3'):                  
            monolayer_3 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num,
                                                       forget_bias=1., state_is_tuple=True, activation=tf.tanh)
            monolayer_3 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_3, output_keep_prob= keep_prob)
        
        with tf.name_scope('layer_Final'):
            monolayer_final = tf.nn.rnn_cell.BasicLSTMCell(num_units=self.layer_units_num, 
                                                       forget_bias=1., state_is_tuple=True, activation=tf.tanh)
        
        with tf.name_scope('Layer_Structure_Combination'):
            layer_units_num = self.layer_units_num
            Layers = tf.nn.rnn_cell.MultiRNNCell(cells=[monolayer_1,monolayer_2,monolayer_3,monolayer_final],
                                                state_is_tuple = True)    
        self.Layers = Layers
        return Layers
        
    def Model(self):
        X = self.X
        keep_prob = self.keep_prob
        
        X = tf.transpose(X, [1, 0, 2])
        X = tf.reshape(X, [-1,self.in_width])
        X = tf.split(split_dim=0, num_split=self.in_length, value=X)
        
        with tf.name_scope('layer_1'):            
            monolayer_1 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num, 
                                                       forget_bias=1., state_is_tuple=True, activation=tf.tanh)            
            monolayer_1 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_1, output_keep_prob= keep_prob)

        
        with tf.name_scope('layer_2'):        
            monolayer_2 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num,
                                                       forget_bias=1., state_is_tuple=True, activation=tf.tanh)
            monolayer_2 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_2, output_keep_prob= keep_prob)
        
        with tf.name_scope('layer_3'):                  
            monolayer_3 = tf.nn.rnn_cell.BasicLSTMCell(num_units= self.layer_units_num,
                                                       forget_bias=1., state_is_tuple=True, activation=tf.tanh)
            monolayer_3 = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer_3, output_keep_prob= keep_prob)
        
        with tf.name_scope('layer_Final'):
            monolayer_final = tf.nn.rnn_cell.BasicLSTMCell(num_units=self.layer_units_num, 
                                                       forget_bias=1., state_is_tuple=True, activation=tf.tanh)
        
        with tf.name_scope('Layer_Structure_Combination'):
            layer_units_num = self.layer_units_num
            Layers = tf.nn.rnn_cell.MultiRNNCell(cells=[monolayer_1,monolayer_2,monolayer_3,monolayer_final],
                                                state_is_tuple = True)    
        
        outputs,_ = tf.nn.rnn(cell=monolayer_final, inputs=X, dtype=tf.float32)
        output = outputs[-1]

        return tf.nn.bias_add(value= tf.matmul(output, self.weights['out']), bias= self.biases['out'])  
        
    def train(self, trainX, trainY, seed=False):
        self.sess = tf.InteractiveSession()
        
        self.Preprocessing(trainX, trainY, seed)
        
        tmp = self.Model()
        
        self.predict = tf.nn.softmax(tmp)
        
        self.cost = tf.reduce_sum(tf.nn.softmax_cross_entropy_with_logits(tmp, self.Y))
        
        optimizer = tf.train.AdamOptimizer(learning_rate= self.learning_rate) # 0 设置训练器
        grads_and_vars = optimizer.compute_gradients(self.cost)
        for i, (grid, var) in enumerate(grads_and_vars):
            if grid != None:
                grid = tf.clip_by_value(grid, -1., 1.)
                grads_and_vars[i] = (grid, var)
        optimizer = optimizer.apply_gradients(grads_and_vars)
        self.optimizer = optimizer
        
        self.correct_pred = tf.equal(tf.argmax(tmp,1), tf.argmax(self.Y,1))
        accuracy = tf.reduce_mean(tf.cast(self.correct_pred, tf.float32))
        self.accuracy = accuracy
        
        #self.init = tf.global_variables_initializer()   
        self.init = tf.initialize_all_variables()
    def fit(self,trainX, trainY, dropout = 0.3, seed=True):
        self.train(trainX, trainY, seed=True)
        sess = self.sess
        sess.run(self.init)
        batch_size = self.batch_size
        trainY = self.dense_to_one_hot(trainY)
        for ep in range(self.training_epoch):
            for i in range(int(len(trainX)/batch_size)+1):
                if i < int(len(trainX)/batch_size)+1:
                    batch_x = trainX[i*batch_size : (i+1)*batch_size]
                    batch_y = trainY[i*batch_size : (i+1)*batch_size]
                elif i== int(len(trainX)/batch_size)+1:
                    batch_x = trainX[-batch_size:]
                    batch_y = trainY[-batch_size:]
                sess.run(self.optimizer, feed_dict={self.X:batch_x, self.Y:batch_y, self.keep_prob:(1.-dropout)})
            if ep%self.display_step==0:                
                loss, acc = sess.run([self.cost,self.accuracy], feed_dict={self.X:trainX, self.Y:trainY, self.keep_prob:1.})
                print (str(ep)+"th "+'Epoch Loss = {:.5f}'.format(loss)+" Training Accuracy={:.5f}".format(acc))
        self.sess= sess
        print("Optimization Finished!") 
    
    def pred_prob(self, testX):
        sess = self.sess
        batch_size = self.batch_size
        trainX = testX
        predict_output = np.zeros([1,self.out_classes])
        for i in range(int(len(trainX)/batch_size)+1):
            if i < int(len(trainX)/batch_size)+1:
                batch_x = trainX[i*batch_size : (i+1)*batch_size]
                batch_y = trainY[i*batch_size : (i+1)*batch_size]
            elif i== int(len(trainX)/batch_size)+1:
                batch_x = trainX[-batch_size:]
                batch_y = trainY[-batch_size:]
            tp = sess.run(self.predict,feed_dict={self.X:batch_x, self.keep_prob:1.})
            predict_output = np.row_stack([predict_output, tp])
        predict_output = np.delete(predict_output, obj=0, axis=0)
        return predict_output
    
    def pred(self, testX):
        pred_prob = self.pred_prob(testX)
        return np.argmax(pred_prob, axis=1)

变长时间序列

训练 RNN 模型定长序列,假设在 T_i 交易日可以通过前 m 交易日技术分析走势预判后第 n 交易日股价涨跌幅度, RNN 模型可以从 T_{i-m}到 T_i 时间序列学习到这种预测关系,但是无法训练得到准确的稀疏权重。对应 T 交易日进行预判的时候考虑前 2 周走势或者前 2 个月的走势,这个时候模型无法隐式学习,需要指定训练集为变长序列对应涨跌标签。

输入 对于变长时间序列 设格式为

[batch_size, real_length, in_width]

其中 real_length 表示变长时间序列输入步长,为一个变化值,使用 LSTM 进行预测的时候将输入数据修改格式为

batch_size, in_length, in_width

其中 in_length 使用 real_length 中的最大步长值,空余部分使用 0 填充

修改原始稠密矩阵为稀疏矩阵统一格式进行输入。

输出裁剪 由于输入 tensor 为稀疏矩阵,则对应的 RNN 网络计算得到的矩阵为稀疏矩阵(对于全为 0 的填充稀疏部分进行数值优化的时候梯度为 0 实际不变化)

将拓扑结构图得到的矩阵进行裁剪,使得输出 tensor 格式从

[batch_size, in_length, layer_units_num]

转换为

[batch_size, 1, layer_units_num]

softmax [batch_size, out_classes]

这里 1 表示这里对输入步长取实际输入步长最后一步。

import functools
from functools import reduce  
import numpy as np
import tensorflow as tf

def lazy_property(function):
    attribute = '_' + function.__name__

    @property
    @functools.wraps(function)
    def wrapper(self):
        if not hasattr(self, attribute):
            setattr(self, attribute, function(self))
        return getattr(self, attribute)
    return wrapper

class test(object):
    def __init__(self,
                batch_size = 128,
                learning_rate = 0.001,
                error = .01,
                display_step = 5,
                layer_units_num = 200):
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.error = error
        self.display_step = display_step
        self.layer_units_num = layer_units_num
        
    def dense_to_one_hot(self,labels_dense):
        """标签 转换 one hot 编码
        输入 labels_dense 必须为非负数
        2016-11-21
        """
        num_classes = len(np.unique(labels_dense)) # np.unique 去掉重复函数
        raws_labels = labels_dense.shape[0]
        index_offset = np.arange(raws_labels) * num_classes
        labels_one_hot = np.zeros((raws_labels, num_classes))
        labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
        return labels_one_hot  
    
    # 获得权重和偏置
    @staticmethod
    def _weight_and_bias(in_size, out_size):
        weight = tf.truncated_normal([in_size, out_size], stddev=0.01)
        bias = tf.constant(0.1, shape=[out_size])
        return tf.Variable(weight), tf.Variable(bias)
    
    @lazy_property
    def length(self):
        dense_sign = tf.sign(tf.reduce_max(tf.abs(self.X),reduction_indices=2))
        length = tf.reduce_sum(input_tensor=dense_sign, reduction_indices=1)
        length = tf.cast(length, tf.int32)
        return length
    
    @staticmethod
    def _final_relevant(output, length):
        # length 输入时间序列的实际长度
        # in_length 表示输入时间序列长度
        # max_length 表示最大时间序列长度,也就是稀疏矩阵 最大时间序列长度
        batch_size = tf.shape(output)[0]
        max_length = int(output.get_shape()[1])
        output_size = int(output.get_shape()[2])
        index = tf.range(start=0, limit=batch_size)*max_length + (length-1) # 这里使用 max_length 开创间隔,使用 length-1 表示实际位置,最后一个输出的位置
        flat = tf.reshape(output, [-1,output_size]) # 将输出展平, batch_size*length in_width
        relevant = tf.gather(flat, index) # 根据实际长度选出最后一个输出 output 状态使用
        return relevant  
    
    def Preprocessing(self, trainX, trainY):
        self.in_length= in_length= trainX.shape[1]
        self.in_width= in_width= trainX.shape[2]
        self.out_classes= out_classes= trainY.shape[1]
        
        self.X = tf.placeholder(dtype=tf.float32, shape=[None, in_length, in_width], name='trainX') # 批次,时间序列,多因子
        self.Y = tf.placeholder(dtype= tf.float32, shape=[None, out_classes], name='trainY') 
        self.keep_prob = tf.placeholder(dtype= tf.float32)
    
    def str2float(self,s):  
        def char2num(s):  
            return {'0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9}[s]  
        n = s.index('.')  
        return reduce(lambda x,y:x*10+y,map(char2num,s[:n]+s[n+1:]))/(10**n)  
    
    def Interface(self):        
        # 4 层 GRU 结构描述
        monolayer = tf.nn.rnn_cell.GRUCell(num_units= self.layer_units_num)
        monolayer = tf.nn.rnn_cell.DropoutWrapper(cell=monolayer, output_keep_prob=self.keep_prob)
        monolayer_final = tf.nn.rnn_cell.GRUCell(num_units= self.layer_units_num)
        layers = tf.nn.rnn_cell.MultiRNNCell([monolayer]*3+[monolayer_final])
        # 激活 注意 in_length 表示输入序列步长, length 表示实际步长
        output,_ = tf.nn.dynamic_rnn(cell= layers, inputs= self.X, dtype= tf.float32, sequence_length= self.length)        
        output = self._final_relevant(output, self.length)
        
        weights, biases = self._weight_and_bias(self.layer_units_num, self.out_classes)        
        Prediction = tf.nn.bias_add(tf.matmul(output, weights),biases)
        return Prediction
    
    def Graph(self, trainX, trainY):
        try:
            tf.InteractiveSession.close()
        except:
            pass
        self.sess = tf.InteractiveSession()
        tf.get_default_session()
        self.Preprocessing(trainX, trainY)
        tmp = self.Interface()
        
        self.pred = tf.nn.softmax(tmp)
        self.cost = tf.reduce_sum(tf.nn.softmax_cross_entropy_with_logits(tmp, self.Y))
        
        optimizer = tf.train.AdamOptimizer(learning_rate= self.learning_rate) # 0 设置训练器
        grads_and_vars = optimizer.compute_gradients(self.cost)
        for i, (grid, var) in enumerate(grads_and_vars):
            if grid != None:
                grid = tf.clip_by_value(grid, -1., 1.)
                grads_and_vars[i] = (grid, var)
        optimizer = optimizer.apply_gradients(grads_and_vars)
        self.optimizer = optimizer
        
        self.correct_pred = tf.equal(tf.argmax(tmp,1), tf.argmax(self.Y,1))
        self.accuracy = tf.reduce_mean(tf.cast(self.correct_pred, tf.float32))
        #self.init = tf.global_variables_initializer()
        self.init = tf.initialize_all_variables()
    def fit(self, trainX, trainY, dropout= 0.618):
        # 对标签 one_hot 编码
        trainY = self.dense_to_one_hot(trainY)
        
        self.Graph(trainX, trainY)
        self.sess.run(self.init)
        batch_size = self.batch_size
        sig =10.
        ep = 0
        while (sig > self.error):
            for i in range(int(len(trainX)/batch_size)+1):
                if i < int(len(trainX)/batch_size)+1:
                    batch_x = trainX[i*batch_size : (i+1)*batch_size]
                    batch_y = trainY[i*batch_size : (i+1)*batch_size]
                elif i== int(len(trainX)/batch_size)+1:
                    batch_x = trainX[-batch_size:]
                    batch_y = trainY[-batch_size:]
                self.sess.run(self.optimizer,feed_dict={self.X:batch_x, self.Y:batch_y, self.keep_prob:(1.-dropout)})
            sig = self.sess.run(self.accuracy, feed_dict={self.X:trainX, self.Y:trainY, self.keep_prob:1.})
            if ep%self.display_step==0:                
                loss = self.sess.run(self.cost, feed_dict={self.X:trainX, self.Y:trainY, self.keep_prob:1.})
                print (str(ep)+"th "+'Epoch Loss = {:.5f}'.format(loss)+" Training Accuracy={:.5f}".format(sig))
            ep += 1
        print("Optimization Finished!")                
        
    def pred_prob(self, testX):
        batch_size = self.batch_size
        trainX = testX
        predict_output = np.zeros([1,self.out_classes])
        for i in range(int(len(trainX)/batch_size)+1):
            if i < int(len(trainX)/batch_size)+1:
                batch_x = trainX[i*batch_size : (i+1)*batch_size]
                batch_y = trainY[i*batch_size : (i+1)*batch_size]
            elif i== int(len(trainX)/batch_size)+1:
                batch_x = trainX[-batch_size:]
                batch_y = trainY[-batch_size:]
            tp = self.sess.run(self.pred, feed_dict={self.X:batch_x, self.keep_prob:1.})
            predict_output = np.row_stack([predict_output, tp])
        predict_output = np.delete(predict_output, obj=0, axis=0)
        return predict_output
    
    def pred_signal(self, testX):
        pred_prob = self.pred_prob(testX)
        return np.argmax(pred_prob, axis=1)

因限制原因,图片与源代码戳这里: https://uqer.io/community/share/589d3cc2c1e3cc00567fdbea

17958 次点击
所在节点    TensorFlow
3 条回复
tomleader0828
2017-02-10 14:17:33 +08:00
mark
pming1
2017-02-10 14:19:37 +08:00
不明觉厉
pathbox
2017-02-10 21:54:54 +08:00
在考虑要不要学 Python

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/339544

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX