lstm_basic.py 3.37 KB
Newer Older
Larkin Heintzman's avatar
Larkin Heintzman committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
import numpy as np
from keras.layers import Bidirectional
from keras.utils import np_utils
from keras.preprocessing.sequence import pad_sequences
import keras
from keras import losses
from model_arches import lstm_model

def build_input_data(samples = 100, time_steps = 5, features = 10):

    # slice up in_data to time_step-sized chunks
    idx = np.arange(time_steps)[None, :] + np.arange(samples+time_steps)[:, None]
    chunk = np.round(np.random.rand(np.max(idx)+1, features),1)
    in_data = chunk[idx[:-1,-1]].reshape(-1,1,1)

    out_idx = idx[1:,-1]
    out_data = chunk[out_idx]
    # out_data = np.vstack([out_data, 0])

    return in_data, out_data

def make_categorical(x_data, y_data):
    num_values = len(np.unique(np.concatenate([x_data, np.reshape(y_data,(-1,1,1))])))
    x_data_categorical = keras.utils.to_categorical((num_values-1)*x_data, num_values)
    y_data_categorical = keras.utils.to_categorical((num_values-1)*y_data, num_values)
    return [x_data_categorical, y_data_categorical]


if __name__ == "__main__":
    
    # fix random seed for reproducibility
    np.random.seed(7)

    train_ratio = 0.80 # percentage of data to be used for training
    eval_ratio = 0.25 # percentage of test data to be used for evaluation
    total_samples = np.int(20)
    categorical = False
    time_len = 1

    [x_dat_1, y_dat_1] = build_input_data(samples = total_samples, time_steps = time_len, features = 1)
    [x_dat_2, y_dat_2] = build_input_data(samples = total_samples, time_steps = time_len, features = 1)

    if categorical:
        [x_dat_1, y_dat_1] = make_categorical(x_dat_1, y_dat_1)
        [x_dat_2, y_dat_2] = make_categorical(x_dat_2, y_dat_2)
    x_dat = [x_dat_1,x_dat_2]
    y_dat = [y_dat_1,y_dat_2]

    # create basic lstm train_model
    batch_size = 1
    epoch_num = 750
    feature_num = x_dat[0].shape[2]

    train_model = lstm_model(batch_size = batch_size, time_steps = time_len, features = feature_num)
    predict_model = lstm_model(batch_size = 1, time_steps = time_len, features = feature_num)

    print("fit train_model to training data")

    # train_model.fit(train_x, train_y, batch_size=batch_size, epochs=epoch_num, shuffle = False, verbose = 2)
    for e in range(epoch_num):
        train_model.fit(x_dat[0], y_dat[0], batch_size=batch_size, epochs=1, shuffle = False, verbose = 2)
        train_model.reset_states()
        # train_model.fit(x_dat[1], y_dat[1], batch_size=batch_size, epochs=1, shuffle = False, verbose = 0)
        # train_model.reset_states()
        # train_model.fit(x_dat[1], y_dat[1], batch_size=batch_size, epochs=1, shuffle = False, verbose = 2)
        # train_model.reset_states()
        print("epoch " + str(e))

    print("moving weights to prediction model")
    weights = train_model.get_weights()
    predict_model.set_weights(weights)

    print("generating predictions for samples")
    pred = [[],[]]
    for i in range(total_samples):
        pred[0].append(predict_model.predict(np.reshape(x_dat[0][i],(1,time_len,feature_num))))
        # pred[1].append(predict_model.predict(np.reshape(x_dat[1][i],(1,time_len,feature_num))))

    for (x,p,y) in zip(x_dat[0], pred[0], y_dat[0]):
        if categorical:
            print("x = {:2.1f}, y = {:2.1f}, yp = {:2.1f}".format(np.argmax(x), np.argmax(y), np.argmax(p)))
        else:
            print("x = {:2.1f}, y = {:2.1f}, yp = {:2.1f}".format(x.item(), y.item(), p.item()))
    print("done.")