#!/usr/bin/env python
# -*- coding: utf-8 -*-
import numpy as np
from numpy import linalg as LA
class Activation:
def f(self, x, **args):
raise NotImplementedError("Should have implemented this")
def grad(self, y, dy):
raise NotImplementedError("Should have implemented this")
class Linear(Activation):
def f(self, x):
return x
def grad(self, y, dy):
return dy
class Sigmoid(Activation):
def f(self, x):
return 1/(1+np.exp(-x))
def grad(self, y, dy):
return y*(1-y)*dy
class Relu(Activation):
def f(self, x):
return x*(x>0)
def grad(self, y, dy):
return dy*(y>0)
class Softmax(Activation):
def f(self, x, axis=1):
x = x-np.max(x, axis=axis, keepdims=True)
return np.exp(x)/np.sum(np.exp(x), axis=axis, keepdims=True)
def grad(self, y, dy):
return y/(y.shape[0])+y*dy
class Dense:
activation_map = {
'relu': Relu,
'softmax': Softmax,
'sigmoid': Sigmoid,
'linear': Linear,
}
def __init__(self, output_dim, input_dim=0, activation='relu'):
self.output_dim = output_dim
self.input_dim = input_dim
if activation in self.activation_map:
self.activation = self.activation_map[activation]()
else:
raise Exception('activation %s not implemented' % activation)
def initialize_parameter(self):
self.w = np.random.randn(self.input_dim, self.output_dim)*np.sqrt(6/(self.input_dim+self.output_dim))
self.b = np.zeros((1, self.output_dim))
def initialize_optimizer(self, optimizer, l_rate):
self.optimizer = optimizer
self.l_rate = l_rate
if self.optimizer == 'adam':
self.t, self.s_w, self.r_w, self.s_b, self.r_b = 0, 0, 0, 0, 0
self.rho1, self.rho2, self.delta = 0.9, 0.999, 1e-8
else:
raise Exception('optimizer %s not implemented' % self.optimizer)
def forward(self, x):
self.x = x
self.h = np.dot(self.x, self.w)+self.b
self.a = self.activation.f(self.h)
return self.a
def backward(self, da):
self.da = da
self.dh = self.activation.grad(self.a, self.da)
self.dw = np.dot(self.x.T, self.dh)
self.db = (1/self.x.shape[0])*np.sum(self.dh, axis=0, keepdims=True)
self.dx = np.dot(self.dh, self.w.T)
return self.dx
def update_parameter(self):
if self.optimizer == 'adam':
self.t = self.t+1
self.s_w = self.rho1*self.s_w+(1-self.rho1)*self.dw
self.r_w = self.rho2*self.r_w+(1-self.rho2)*(self.dw**2)
s_w_ = self.s_w/(1-self.rho1**self.t)
r_w_ = self.r_w/(1-self.rho2**self.t)
self.w = self.w-self.l_rate*s_w_/(np.sqrt(r_w_)+self.delta)
self.s_b = self.rho1*self.s_b+(1-self.rho1)*self.db
self.r_b = self.rho2*self.r_b+(1-self.rho2)*(self.db**2)
s_b_ = self.s_b/(1-self.rho1**self.t)
r_b_ = self.r_b/(1-self.rho2**self.t)
self.b = self.b-self.l_rate*s_b_/(np.sqrt(r_b_)+self.delta)
else:
raise Exception('optimizer %s not implemented' % self.optimizer)
class Sequential:
def __init__(self):
self.layers = []
self.loss = 'categorical_crossentropy'
self.optimizer = 'adam'
def add(self, layer):
self.layers.append(layer)
def compile(self, loss='categorical_crossentropy', optimizer='adam', l_rate=0.001):
self.loss = loss
self.optimizer = optimizer
for idx in range(len(self.layers)-1):
self.layers[idx+1].input_dim = self.layers[idx].output_dim
for layer in self.layers:
layer.initialize_optimizer(optimizer, l_rate)
def forward_propagation(self, x, y):
for layer in self.layers:
a = layer.forward(x)
x = a
if self.loss == 'categorical_crossentropy':
loss = -(1/y.shape[0])*np.sum(np.log(a)*y)
elif self.loss == 'mse':
loss = 0.5*(1/y.shape[0])*np.square(LA.norm(a-y))
else:
raise Exception('loss %s not implemented' % self.loss)
return a, loss
def backward_propagation(self, a, y):
if self.loss == 'categorical_crossentropy':
da = -(1/y.shape[0])*(y/a)
elif self.loss == 'mse':
da = (1/y.shape[0])*(a-y)
else:
raise Exception('loss %s not implemented' % self.loss)
for layer in self.layers[::-1]:
da = layer.backward(da)
layer.update_parameter()
def fit(self, x, y, epochs=10, batch_size=200):
for layer in self.layers:
layer.initialize_parameter()
batch_count = int(x.shape[0]/batch_size)
for i in range(epochs):
for j in range(batch_count):
start, end = j*batch_size, (j+1)*batch_size
a, _ = self.forward_propagation(x[start:end], y[start:end])
self.backward_propagation(a, y[start:end])
_, loss = self.forward_propagation(x, y)
print("epoch %d/%d: loss %f" % (i+1, epochs, loss))
def print_parameters(self):
for idx, layer in enumerate(self.layers):
print('layer %d parameters:' % (idx+1))
print(layer.w, layer.b)
# A simple linear regression demo
if __name__ == '__main__':
w, b = np.array([[1.0], [2.0], [3.0]]), 5
x = np.random.randn(300, 3)*100
noise = np.random.randn(300, 1)*0.1
y = np.dot(x, w)+noise+b
model = Sequential()
model.add(Dense(1, input_dim=3, activation='linear'))
model.compile(loss='mse', optimizer='adam')
model.fit(x, y, epochs=5000, batch_size=100)
model.print_parameters()
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.