Hyunjun

add python test directory

......@@ -13,9 +13,15 @@ python 코드에서 가중치를 pickle 파일에 저장하는데 c++에서는 pickle 파일을 읽는
pickletools라는 라이브러리가 있지만 에러가 많고, 자바, 파이썬, C++ 여러 언어를 지원해 무겁기 때문에 속도를 올리기 위한 컨버팅 작업에는 적절하지 않음
그래서, 파라미터를 저장하는 부분을 csv로 바꿔 C++에서 그 파일로 읽기위해 params.pkl을 params.csv로 바꾸는 코드를 추가함
5. params.pkl->params.txt
입력처리할 때 csv파일로 읽으면 속도가 느림
이 또한 속도를 올리기 위한 컨버팅 작업의 목적에 맞지 않기 때문에 params.pkl 파일을 csv 파일이 아닌 txt 파일로 바꿈
6. python test 코드 추가
test하는 부분만 골라내기 위해 python test 코드를 추가(test.py), simple_convnet 내용 추가
\ No newline at end of file
test하는 부분만 골라내기 위해 python test 코드를 추가(test.py), simple_convnet 내용 추가
7. python test 폴더 추가
python test 폴더에는 test에 필요하지 않은 train 부분을 삭제함
\ No newline at end of file
......
No preview for this file type
No preview for this file type
# coding: utf-8
try:
import urllib.request
except ImportError:
raise ImportError('You should use Python 3.x')
import os.path
import gzip
import pickle
import os
import numpy as np
key_file = {
'train':'cifar10-train.gz',
'test':'cifar10-test.gz'
}
dataset_dir = os.path.dirname(os.path.abspath('/Users/HyeonJun/Desktop/simple_convnet/dataset'))
save_file = dataset_dir + "/cifar10.pkl"
train_num = 50000
test_num = 10000
img_dim = (3, 32, 32)
img_size = 3072
def _load_label(file_name):
file_path = dataset_dir + "/" + file_name
print("Converting " + file_name + " to NumPy Array ...")
with gzip.open(file_path, 'rb') as f:
labels = np.frombuffer(f.read(), np.uint8, offset=0)
labels = labels.reshape(-1, img_size+1)
labels = labels.T
print("Done")
return labels[0]
def _load_img(file_name):
file_path = dataset_dir + "/" + file_name
print("Converting " + file_name + " to NumPy Array ...")
with gzip.open(file_path, 'rb') as f:
data = np.frombuffer(f.read(), np.uint8, offset=0)
data = data.reshape(-1, img_size+1)
data = np.delete(data, 0, 1)
print("Done")
return data
def _convert_numpy():
dataset = {}
dataset['train_img'] = _load_img(key_file['train'])
dataset['train_label'] = _load_label(key_file['train'])
dataset['test_img'] = _load_img(key_file['test'])
dataset['test_label'] = _load_label(key_file['test'])
return dataset
def init_cifar10():
dataset = _convert_numpy()
print("Creating pickle file ...")
with open(save_file, 'wb') as f:
pickle.dump(dataset, f, -1)
print("Done!")
def _change_one_hot_label(X):
T = np.zeros((X.size, 10))
for idx, row in enumerate(T):
row[X[idx]] = 1
return T
def load_cifar10(normalize=True, flatten=True, one_hot_label=False):
"""CIFAR-10データセットの読み込み
Parameters
----------
normalize : 画像のピクセル値を0.0~1.0に正規化する
one_hot_label :
one_hot_labelがTrueの場合、ラベルはone-hot配列として返す
one-hot配列とは、たとえば[0,0,1,0,0,0,0,0,0,0]のような配列
flatten : 画像を一次元配列に平にするかどうか
Returns
-------
(訓練画像, 訓練ラベル), (テスト画像, テストラベル)
"""
if not os.path.exists(save_file):
init_cifar10()
with open(save_file, 'rb') as f:
dataset = pickle.load(f)
if normalize:
for key in ('train_img', 'test_img'):
dataset[key] = dataset[key].astype(np.float32)
dataset[key] /= 255.0
if one_hot_label:
dataset['train_label'] = _change_one_hot_label(dataset['train_label'])
dataset['test_label'] = _change_one_hot_label(dataset['test_label'])
if not flatten:
for key in ('train_img', 'test_img'):
dataset[key] = dataset[key].reshape(-1, 3, 32, 32)
return (dataset['train_img'], dataset['train_label']), (dataset['test_img'], dataset['test_label'])
if __name__ == '__main__':
init_cifar10()
# coding: utf-8
#import cupy as cp
import numpy as cp
import numpy as np
def cross_entropy_error(y, t):
if y.ndim == 1:
t = t.reshape(1, t.size)
y = y.reshape(1, y.size)
# 教師データがone-hot-vectorの場合、正解ラベルのインデックスに変換
if t.size == y.size:
t = t.argmax(axis=1)
batch_size = y.shape[0]
return -cp.sum(cp.log(y[cp.arange(batch_size), t])) / batch_size
# coding: utf-8
#import cupy as cp
import numpy as cp
import numpy as np
from functions import *
from util import im2col, col2im, DW_im2col
class Relu:
def __init__(self):
self.mask = None
def forward(self, x):
self.mask = (x <= 0)
out = x.copy()
out[self.mask] = 0
return out
def backward(self, dout):
dout[self.mask] = 0
dx = dout
return dx
class SoftmaxWithLoss:
def __init__(self):
self.loss = None
self.y = None # softmaxの出力
self.t = None # 教師データ
def forward(self, x, t):
self.t = t
self.y = softmax(x)
self.loss = cross_entropy_error(self.y, self.t)
return self.loss
def backward(self, dout=1):
batch_size = self.t.shape[0]
if self.t.size == self.y.size: # 教師データがone-hot-vectorの場合
dx = (self.y - self.t) / batch_size
else:
dx = self.y.copy()
dx[np.arange(batch_size), self.t] -= 1
dx = dx / batch_size
return dx
class LightNormalization:
"""
"""
def __init__(self, momentum=0.9, running_mean=None, running_var=None):
self.momentum = momentum
self.input_shape = None # Conv層の場合は4次元、全結合層の場合は2次元
# テスト時に使用する平均と分散
self.running_mean = running_mean
self.running_var = running_var
# backward時に使用する中間データ
self.batch_size = None
self.xc = None
self.std = None
def forward(self, x, train_flg=True):
self.input_shape = x.shape
if x.ndim == 2:
N, D = x.shape
x = x.reshape(N, D, 1, 1)
x = x.transpose(0, 2, 3, 1)
out = self.__forward(x, train_flg)
out = out.transpose(0, 3, 1, 2)
return out.reshape(*self.input_shape)
def __forward(self, x, train_flg):
if self.running_mean is None:
N, H, W, C = x.shape
self.running_mean = cp.zeros(C, dtype=np.float32)
self.running_var = cp.zeros(C, dtype=np.float32)
if train_flg:
mu = x.mean(axis=(0, 1, 2))
xc = x - mu
var = cp.mean(xc**2, axis=(0, 1, 2), dtype=np.float32)
std = cp.sqrt(var + 10e-7, dtype=np.float32)
xn = xc / std
self.batch_size = x.shape[0]
self.xc = xc
self.xn = xn
self.std = std
self.running_mean = self.momentum * self.running_mean + (1-self.momentum) * mu
self.running_var = self.momentum * self.running_var + (1-self.momentum) * var
else:
xc = x - self.running_mean
xn = xc / ((cp.sqrt(self.running_var + 10e-7, dtype=np.float32)))
out = xn
return out
def backward(self, dout):
if dout.ndim == 2:
N, D = dout.shape
dout = dout.reshape(N, D, 1, 1)
dout = dout.transpose(0, 2, 3, 1)
dx = self.__backward(dout)
dx = dx.transpose(0, 3, 1, 2)
dx = dx.reshape(*self.input_shape)
return dx
def __backward(self, dout):
dxn = dout
dxc = dxn / self.std
dstd = -cp.sum((dxn * self.xc) / (self.std * self.std), axis=0)
dvar = 0.5 * dstd / self.std
dxc += (2.0 / self.batch_size) * self.xc * dvar
dmu = cp.sum(dxc, axis=0)
dx = dxc - dmu / self.batch_size
return dx
class Convolution:
def __init__(self, W, stride=1, pad=0):
self.W = W
self.stride = stride
self.pad = pad
self.x = None
self.col = None
self.col_W = None
self.dW = None
def forward(self, x):
FN, C, FH, FW = self.W.shape
N, C, H, W = x.shape
out_h = 1 + int((H + 2*self.pad - FH) / self.stride)
out_w = 1 + int((W + 2*self.pad - FW) / self.stride)
col = im2col(x, FH, FW, self.stride, self.pad)
col_W = self.W.reshape(FN, -1).T
out = cp.dot(col, col_W)
out = out.reshape(N, out_h, out_w, -1).transpose(0, 3, 1, 2)
self.x = x
self.col = col
self.col_W = col_W
return out
def backward(self, dout):
FN, C, FH, FW = self.W.shape
dout = dout.transpose(0,2,3,1).reshape(-1, FN)
self.dW = cp.dot(self.col.T, dout)
self.dW = self.dW.transpose(1, 0).reshape(FN, C, FH, FW)
dcol = cp.dot(dout, self.col_W.T)
dx = col2im(dcol, self.x.shape, FH, FW, self.stride, self.pad)
return dx
class Pooling:
def __init__(self, pool_h, pool_w, stride=1, pad=0):
self.pool_h = pool_h
self.pool_w = pool_w
self.stride = stride
self.pad = pad
self.x = None
self.arg_max = None
def forward(self, x):
N, C, H, W = x.shape
out_h = int(1 + (H - self.pool_h) / self.stride)
out_w = int(1 + (W - self.pool_w) / self.stride)
col = im2col(x, self.pool_h, self.pool_w, self.stride, self.pad)
col = col.reshape(-1, self.pool_h*self.pool_w)
arg_max = cp.argmax(col, axis=1)
out = cp.array(cp.max(col, axis=1), dtype=np.float32)
out = out.reshape(N, out_h, out_w, C).transpose(0, 3, 1, 2)
self.x = x
self.arg_max = arg_max
return out
def backward(self, dout):
dout = dout.transpose(0, 2, 3, 1)
pool_size = self.pool_h * self.pool_w
dmax = cp.zeros((dout.size, pool_size), dtype=np.float32)
dmax[cp.arange(self.arg_max.size), self.arg_max.flatten()] = dout.flatten()
dmax = dmax.reshape(dout.shape + (pool_size,))
dcol = dmax.reshape(dmax.shape[0] * dmax.shape[1] * dmax.shape[2], -1)
dx = col2im(dcol, self.x.shape, self.pool_h, self.pool_w, self.stride, self.pad)
return dx
class DW_Convolution:
def __init__(self, W, stride=1, pad=0):
self.W = W
self.stride = stride
self.pad = pad
self.x = None
self.col = None
self.col_W = None
self.dW = None
self.db = None
def forward(self, x):
FN, C, FH, FW = self.W.shape
N, C, H, W = x.shape
out_h = 1 + int((H + 2*self.pad - FH) / self.stride)
out_w = 1 + int((W + 2*self.pad - FW) / self.stride)
col = DW_im2col(x, FH, FW, self.stride, self.pad)
col_W = self.W.reshape(FN, -1).T
outlist = []
outlist = np.zeros((FN, N*H*W, 1))
for count in range(FN):
outlist[count] = np.dot(col[count, :, :], col_W[:, count]).reshape(-1,1)
out = outlist.transpose(1,0,2)
out = out.reshape(N, out_h, out_w, -1).transpose(0, 3, 1, 2)
self.x = x
self.col = col
self.col_W = col_W
return out
def backward(self, dout):
FN, C, FH, FW = self.W.shape
N, XC, H, W = dout.shape
dout = dout.transpose(0,2,3,1).reshape(-1, FN)
dW_list = np.zeros((FN, FH*FW))
dcol_list = np.zeros((N * H * W, FN, FH * FW))
for count in range(FN):
dW_list[count] = np.dot(self.col[count].transpose(1,0), dout[:, count])
dcol_list[:,count,:] = np.dot(dout[:,count].reshape(-1,1), self.col_W.T[count,:].reshape(1,-1))
self.dW = dW_list
self.dW = self.dW.reshape(FN, C, FH, FW)
dcol = dcol_list
dx = col2im(dcol, self.x.shape, FH, FW, self.stride, self.pad)
return dx
class Affine:
def __init__(self, W):
self.W =W
# self.b = b
self.x = None
self.original_x_shape = None
#self.dW = None
# self.db = None
def forward(self, x):
self.original_x_shape = x.shape
x = x.reshape(x.shape[0], -1)
self.x = x
out = cp.dot(self.x, self.W) #+ self.b
return out
def backward(self, dout):
dx = cp.dot(dout, self.W.T)
self.dW = cp.dot(self.x.T, dout)
# self.db = cp.sum(dout, axis=0)
dx = dx.reshape(*self.original_x_shape) # return dx
No preview for this file type
import sys, os
sys.path.append(os.pardir)
import pickle
import numpy as cp
import numpy as np
from collections import OrderedDict
from layers import *
class SimpleConvNet:
def __init__(self, input_dim=(3, 32, 32),
conv_param={'filter_num':(32, 32, 64), 'filter_size':3, 'pad':1, 'stride':1},
hidden_size=512, output_size=10, weight_init_std=0.01, pretrained=False):
filter_num = conv_param['filter_num']
filter_size = conv_param['filter_size']
filter_pad = conv_param['pad']
filter_stride = conv_param['stride']
input_size = input_dim[1]
conv_output_size = (input_size - filter_size + 2*filter_pad) / filter_stride + 1
conv_data_size = int(filter_num[0] * conv_output_size * conv_output_size )
pool1_output_size = int(filter_num[1] * (conv_output_size/2) * (conv_output_size/2))
pool2_output_size = int(filter_num[2] * (conv_output_size/4) * (conv_output_size/4))
pool3_output_size = int(filter_num[2] * (conv_output_size/8) * (conv_output_size/8))
self.params = {}
if pretrained:
weights = self.load_weights()
self.params['W1'] = cp.array(weights['W1'], dtype=np.float32)
self.params['W2'] = cp.array(weights['W2'], dtype=np.float32)
self.params['W3'] = cp.array(weights['W3'], dtype=np.float32)
self.params['W4'] = cp.array(weights['W4'], dtype=np.float32)
self.params['W5'] = cp.array(weights['W5'], dtype=np.float32)
self.params['W6'] = cp.array(weights['W6'], dtype=np.float32)
self.params['W7'] = cp.array(weights['W7'], dtype=np.float32)
else:
self.params['W1'] = cp.array( weight_init_std * \
cp.random.randn(filter_num[0], input_dim[0], filter_size, filter_size), dtype=np.float32)
self.params['W2'] = cp.array( weight_init_std * \
cp.random.randn(filter_num[1], filter_num[0], 1, 1), dtype=np.float32)
self.params['W3'] = cp.array( weight_init_std * \
cp.random.randn(filter_num[1], 1, filter_size, filter_size), dtype=np.float32)
self.params['W4'] = cp.array( weight_init_std * \
cp.random.randn(filter_num[2], filter_num[1], 1, 1), dtype=np.float32)
self.params['W5'] = cp.array( weight_init_std * \
cp.random.randn(filter_num[2], 1, filter_size, filter_size), dtype=np.float32)
self.params['W6'] = cp.array( weight_init_std * \
cp.random.randn(pool3_output_size, hidden_size), dtype=np.float32)
self.params['W7'] = cp.array( weight_init_std * \
cp.random.randn(hidden_size, output_size), dtype=np.float32)
self.layers = OrderedDict()
self.layers['Conv1'] = Convolution(self.params['W1'],
conv_param['stride'], conv_param['pad'])
self.layers['LightNorm1'] = LightNormalization()
self.layers['Relu1'] = Relu()
self.layers['Pool1'] = Pooling(pool_h=2, pool_w=2, stride=2)
self.layers['Conv2'] = Convolution(self.params['W2'],
1, 0)
self.layers['LightNorm2'] = LightNormalization()
self.layers['Relu2'] = Relu()
self.layers['Conv3'] = DW_Convolution(self.params['W3'],
conv_param['stride'], conv_param['pad'])
self.layers['LightNorm3'] = LightNormalization()
self.layers['Relu3'] = Relu()
self.layers['Pool2'] = Pooling(pool_h=2, pool_w=2, stride=2)
self.layers['Conv4'] = Convolution(self.params['W4'],
1, 0)
self.layers['LightNorm4'] = LightNormalization()
self.layers['Relu4'] = Relu()
self.layers['Conv5'] = DW_Convolution(self.params['W5'],
conv_param['stride'], conv_param['pad'])
self.layers['LightNorm5'] = LightNormalization()
self.layers['Relu5'] = Relu()
self.layers['Pool3'] = Pooling(pool_h=2, pool_w=2, stride=2)
self.layers['Affine4'] = Affine(self.params['W6'])
self.layers['LightNorm6'] = LightNormalization()
self.layers['Relu6'] = Relu()
self.layers['Affine5'] = Affine(self.params['W7'])
self.last_layer = SoftmaxWithLoss()
def predict(self, x):
for layer in self.layers.values():
x = layer.forward(x)
return x
def accuracy(self, x, t, batch_size=100):
if t.ndim != 1 : t = np.argmax(t, axis=1)
acc = 0.0
for i in range(int(x.shape[0] / batch_size)):
tx = x[i*batch_size:(i+1)*batch_size]
tt = t[i*batch_size:(i+1)*batch_size]
y = self.predict(tx)
y = np.argmax(y, axis=1)
print("answer : ", tt)
print("predict : ", y)
acc += np.sum(y == tt) #numpy
return acc / x.shape[0]
def gradient(self, x, t):
self.loss(x, t)
dout = 1
dout = self.last_layer.backward(dout)
layers = list(self.layers.values())
layers.reverse()
for layer in layers:
dout = layer.backward(dout)
grads = {}
grads['W1'] = self.layers['Conv1'].dW
grads['W2'] = self.layers['Conv2'].dW
grads['W3'] = self.layers['Conv3'].dW
grads['W4'] = self.layers['Conv4'].dW
grads['W5'] = self.layers['Conv5'].dW
grads['W6'] = self.layers['Affine4'].dW
grads['W7'] = self.layers['Affine5'].dW
return grads
# ���� ����ġ �ҷ����� / SimpleconvNet�� pretrained ���� �߰��� : True�� ����ġ �о� ����
def load_weights(self, file_name='params.pkl'):
weights = []
with open(file_name, 'rb') as f:
weights = pickle.load(f)
return weights
from simple_convnet4 import *
from dataset.cifar10 import load_cifar10
def batch_(data, lbl, pre, size = 100):
return data[pre: pre+size], lbl[pre: pre+size]
network = SimpleConvNet(input_dim=(3,32,32),
conv_param = {'filter_num': (32, 32, 64), 'filter_size': 3, 'pad': 1, 'stride': 1},
hidden_size=512, output_size=10, weight_init_std=0.01, pretrained=True)
(x_train, t_train), (x_test, t_test) = load_cifar10(flatten=False)
print("Length of test data: ",len(x_test))
batch_size = 100
epoch = int(len(x_test) / batch_size)
acc = 0
for i in range(epoch):
t_img, t_lbl = batch_(x_test, t_test, i*batch_size, batch_size)
t = network.accuracy(t_img, t_lbl, batch_size)
acc += t * batch_size
print("Accuracy : ",str(acc / len(x_test)*100),'%')
# coding: utf-8
#import cupy as cp
import numpy as cp
import numpy as np
def DW_im2col(input_data, filter_h, filter_w, stride=1, pad=0):
"""다수의 이미지를 입력받아 2차원 배열로 변환한다(평탄화).
Parameters
----------
input_data : 4차원 배열 형태의 입력 데이터(이미지 수, 채널 수, 높이, 너비)
filter_h : 필터의 높이
filter_w : 필터의 너비
stride : 스트라이드
pad : 패딩
Returns
-------
col : 2차원 배열
"""
N, C, H, W = input_data.shape
out_h = (H + 2 * pad - filter_h) // stride + 1
out_w = (W + 2 * pad - filter_w) // stride + 1
img = np.pad(input_data, [(0, 0), (0, 0), (pad, pad), (pad, pad)], 'constant')
col = np.zeros((N, C, filter_h, filter_w, out_h, out_w))
for y in range(filter_h):
y_max = y + stride * out_h
for x in range(filter_w):
x_max = x + stride * out_w
col[:, :, y, x, :, :] = img[:, :, y:y_max:stride, x:x_max:stride]
col = col.transpose(1, 0, 4, 5, 2, 3).reshape(C, N * out_h * out_w, -1)
return col
def im2col(input_data, filter_h, filter_w, stride=1, pad=0):
"""
Parameters
----------
input_data : (データ数, チャンネル, 高さ, 幅)の4次元配列からなる入力データ
filter_h : フィルターの高さ
filter_w : フィルターの幅
stride : ストライド
pad : パディング
Returns
-------
col : 2次元配列
"""
N, C, H, W = input_data.shape
out_h = (H + 2*pad - filter_h)//stride + 1
out_w = (W + 2*pad - filter_w)//stride + 1
img = cp.pad(input_data, [(0,0), (0,0), (pad, pad), (pad, pad)], 'constant')
col = cp.zeros((N, C, filter_h, filter_w, out_h, out_w), dtype=np.float32)
for y in range(filter_h):
y_max = y + stride*out_h
for x in range(filter_w):
x_max = x + stride*out_w
col[:, :, y, x, :, :] = img[:, :, y:y_max:stride, x:x_max:stride]
col = col.transpose(0, 4, 5, 1, 2, 3).reshape(N*out_h*out_w, -1)
return col
def col2im(col, input_shape, filter_h, filter_w, stride=1, pad=0):
"""
Parameters
----------
col :
input_shape : 入力データの形状(例:(10, 1, 28, 28))
filter_h :
filter_w
stride
pad
Returns
-------
"""
N, C, H, W = input_shape
out_h = (H + 2*pad - filter_h)//stride + 1
out_w = (W + 2*pad - filter_w)//stride + 1
col = col.reshape(N, out_h, out_w, C, filter_h, filter_w).transpose(0, 3, 4, 5, 1, 2)
img = cp.zeros((N, C, H + 2*pad + stride - 1, W + 2*pad + stride - 1), dtype=np.float32)
for y in range(filter_h):
y_max = y + stride*out_h
for x in range(filter_w):
x_max = x + stride*out_w
img[:, :, y:y_max:stride, x:x_max:stride] += col[:, :, y, x, :, :]
return img[:, :, pad:H + pad, pad:W + pad]