avatarNickolas Discolll

Summary

This text discusses the use of advanced machine learning techniques, specifically convolutional neural networks (CNNs), to predict stock market movements.

Abstract

The stock market is a complex entity, and accurately predicting its movements is a significant challenge. The text introduces a project called "Going Deeper with Convolutional Neural Network for Stock Market Prediction" that uses advanced CNNs and a robust methodology to predict stock market trends. The project leverages candlestick charts as the primary input model for various neural networks, including DeepCNN, ResNet 50, VGG16, VGG19, Random Forest, and KNN. The data for this project is meticulously gathered from Yahoo! Finance, focusing on time series data for a comprehensive analysis. The dataset includes data from 50 leading companies in the Taiwan 0050.TW index and the top 10 companies from the Indonesia Stock Exchange. The methodology involves preparing the environment, converting OHLC stock market data to Candlestick charts, training the DeepCNN model, and evaluating the performance of the models using metrics such as Accuracy, Specificity, Sensitivity, MCC (Matthews Correlation Coefficient), and F1 Score.

Opinions

  • The text expresses the opinion that predicting stock market movements is a complex and challenging task.
  • The project introduced in the text aims to tackle this challenge by leveraging advanced machine learning techniques, specifically CNNs.
  • The use of candlestick charts as the primary input model for the neural networks is a notable approach in this project.
  • The dataset used in the project is diverse and comprehensive, including data from leading companies in Taiwan and Indonesia.
  • The methodology involves a robust process of preparing the environment, converting data, training the model, and evaluating performance.
  • The project aims to predict whether the stock market price will rise in the near future.
  • The performance of the models is evaluated using several metrics, including Accuracy, Specificity, Sensitivity, MCC, and F1 Score.
Photo by Maxim Hopman on Unsplash

A Deep Dive into Convolutional Neural Networks for Financial Forecasting

Leveraging Advanced Machine Learning Techniques to Decipher Market Trends and Movements

The stock market is an ever-changing and complex entity, and accurately predicting its movements is a challenge that has fascinated and perplexed analysts for decades. “Going Deeper with Convolutional Neural Network for Stock Market Prediction” is a groundbreaking repository that tackles this challenge head-on. This project aims to predict whether the stock market price will rise in the near future by leveraging the power of advanced convolutional neural networks (CNNs) and a robust methodology.

Download the source code and dataset from my substack. Click here

Data Collection

The data for this project is meticulously gathered from Yahoo! Finance, focusing on time series data for a comprehensive analysis. It includes data from 50 leading companies in the Taiwan 0050.TW index and the top 10 companies from the Indonesia Stock Exchange. This diverse dataset ensures a broad and inclusive approach to stock market prediction.

Methodology

Our approach utilizes candlestick charts as the primary input model for various advanced neural networks. The models employed in this project include:

  • DeepCNN
  • ResNet 50
  • VGG16
  • VGG19
  • Random Forest
  • KNN

These models are chosen for their proven effectiveness in pattern recognition and predictive analysis, especially in complex datasets like stock markets.

Usage

Prepare Environment

We recommend using a virtual environment for optimal results:

  1. Create a virtual environment: python3 -m venv .env
  2. Ensure you’re running Python 3.5
  3. Install required packages: pip install -U -r requirements.txt

Prepare Dataset

Convert OHLCV (Open/High/Low/Close/Volume) stock market data to Candlestick charts:

  1. Run binary preprocessing: python run_binary_preprocessing.py <ticker> <tradingdays> <windows>.
  • Example: python run_binary_preprocessing.py 2880.TW 20 50
  1. Generate the dataset: python generatedata.py <pathdir> <origindir> <destinationdir>.
  • Example: python generatedata.py dataset 20_50/2880.TW dataset_2880TW_20_50
  1. Remove the alpha channel from images:
  • Navigate to the dataset directory: cd /dataset/dataset_2880TW_20_50
  • Execute: find . -name "*.png" -exec convert "{}" -alpha off "{}" \;

Training

Train the DeepCNN model:

  • Command: python myDeepCNN.py -i <datasetdir> -e <numberofepoch> -d <dimensionsize> -b <batchsize> -o <outputresultreport>.
  • Example: python myDeepCNN.py -i dataset/dataset_2880TW_20_50 -e 50 -d 50 -b 8 -o outputresult.txt

Performance Evaluation

Evaluate the performance of the models using metrics such as Accuracy, Specificity, Sensitivity, MCC (Matthews Correlation Coefficient), and F1 Score.

Let’s start coding:

myVgg19.py

def build_dataset(data_directory, img_width):
    (X, y, tags) = dataset.dataset(data_directory, int(img_width))
    nb_classes = len(tags)
    sample_count = len(y)
    train_size = sample_count
    print('train size : {}'.format(train_size))
    feature = X
    label = np_utils.to_categorical(y, nb_classes)
    return (feature, label, nb_classes)

This Python code snippet is used to build a dataset for machine learning models. The function takes in two parameters, data_directory and img_width, which represent the directory where the data is stored and the desired width of the images in the dataset, respectively. Inside the function, a call is made to another dataset function from the dataset module, which returns three variables: X, y, and tags. X represents the features or independent variables of the dataset, y represents the labels or dependent variables, and tags is a list of the different categories or classes in the dataset. The number of classes is determined by taking the length of the tags list. The length of the y variable is also stored as sample_count. The feature variable is set to X, and the labels are converted to categorical using the np_utils.to_categorical function, which converts the labels from integers to one-hot encoded vectors. The final result of the function is a tuple containing the features, labels, and the number of classes in the dataset. This function is useful for quickly preparing a dataset for training and testing machine learning models.

def build_model(SHAPE, nb_classes, bn_axis, seed=None):
    if seed:
        np.random.seed(seed)
    input_layer = Input(shape=SHAPE)
    x = Conv2D(64, (3, 3), activation='relu', padding='same', name='block1_conv1')(input_layer)
    x = Conv2D(64, (3, 3), activation='relu', padding='same', name='block1_conv2')(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name='block1_pool')(x)
    x = Conv2D(128, (3, 3), activation='relu', padding='same', name='block2_conv1')(x)
    x = Conv2D(128, (3, 3), activation='relu', padding='same', name='block2_conv2')(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name='block2_pool')(x)
    x = Conv2D(256, (3, 3), activation='relu', padding='same', name='block3_conv1')(x)
    x = Conv2D(256, (3, 3), activation='relu', padding='same', name='block3_conv2')(x)
    x = Conv2D(256, (3, 3), activation='relu', padding='same', name='block3_conv3')(x)
    x = Conv2D(256, (3, 3), activation='relu', padding='same', name='block3_conv4')(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name='block3_pool')(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same', name='block4_conv1')(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same', name='block4_conv2')(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same', name='block4_conv3')(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same', name='block4_conv4')(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name='block4_pool')(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same', name='block5_conv1')(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same', name='block5_conv2')(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same', name='block5_conv3')(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same', name='block5_conv4')(x)
    x = Flatten(name='flatten')(x)
    x = Dense(4096, activation='relu', name='fc1')(x)
    x = Dense(4096, activation='relu', name='fc2')(x)
    x = Dense(nb_classes, activation='softmax', name='predictions')(x)
    model = Model(input_layer, x)
    return model

This code snippet is a function that builds a convolutional neural network CNN model for image classification. The first line specifies the parameters for the model, including the input shape, number of classes, and batch normalization axis. The function then sets a seed for random number generation, which allows for reproducibility of the model. The next lines define convolutional layers, with different numbers of filters and activation functions, to process the input image. These layers are stacked on top of each other, with max pooling layers in between to reduce the dimensions of the output. The number of filters increases as the network deepens, allowing for more complex patterns to be learned. After the convolutional layers, the model flattens the output and passes it through fully connected layers, also called dense layers. These layers further process the features learned by the convolutional layers. The number of neurons in these layers gradually decreases, helping to control the complexity of the model and prevent overfitting. Finally, the model outputs a prediction for the input image using the softmax activation function. The last line of the code creates the model by defining the input layer and output layer and returns it. This allows the model to be used for training and predicting on new images.

def main():
    start_time = time.monotonic()
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('-i', '--input', help='an input directory of dataset', required=True)
    parser.add_argument('-d', '--dimension', help='a image dimension', type=int, default=48)
    parser.add_argument('-c', '--channel', help='a image channel', type=int, default=3)
    parser.add_argument('-e', '--epochs', help='num of epochs', type=int, default=10)
    parser.add_argument('-b', '--batch_size', help='num of batch_size', type=int, default=64)
    parser.add_argument('-o', '--output', help='a result file', type=str, default='hasilnya.txt')
    args = parser.parse_args()
    (img_width, img_height) = (args.dimension, args.dimension)
    channel = args.channel
    epochs = args.epochs
    batch_size = args.batch_size
    SHAPE = (img_width, img_height, channel)
    bn_axis = 3 if K.image_dim_ordering() == 'tf' else 1
    data_directory = args.input
    period_name = data_directory.split('/')
    print('loading dataset')
    (X_train, Y_train, nb_classes) = build_dataset('{}/train'.format(data_directory), args.dimension)
    (X_test, Y_test, nb_classes) = build_dataset('{}/test'.format(data_directory), args.dimension)
    print('number of classes : {}'.format(nb_classes))
    model = build_model(SHAPE, nb_classes, bn_axis)
    model.compile(optimizer=Adam(lr=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])
    model.fit(X_train, Y_train, batch_size=batch_size, epochs=epochs)
    model.save('{}epochs_{}batch_vgg19_model_{}.h5'.format(epochs, batch_size, data_directory.replace('/', '_')), overwrite=True)
    predicted = model.predict(X_test)
    y_pred = np.argmax(predicted, axis=1)
    Y_test = np.argmax(Y_test, axis=1)
    cm = confusion_matrix(Y_test, y_pred)
    report = classification_report(Y_test, y_pred)
    tn = cm[0][0]
    fn = cm[1][0]
    tp = cm[1][1]
    fp = cm[0][1]
    if tp == 0:
        tp = 1
    if tn == 0:
        tn = 1
    if fp == 0:
        fp = 1
    if fn == 0:
        fn = 1
    TPR = float(tp) / (float(tp) + float(fn))
    FPR = float(fp) / (float(fp) + float(tn))
    accuracy = round((float(tp) + float(tn)) / (float(tp) + float(fp) + float(fn) + float(tn)), 3)
    specitivity = round(float(tn) / (float(tn) + float(fp)), 3)
    sensitivity = round(float(tp) / (float(tp) + float(fn)), 3)
    mcc = round((float(tp) * float(tn) - float(fp) * float(fn)) / math.sqrt((float(tp) + float(fp)) * (float(tp) + float(fn)) * (float(tn) + float(fp)) * (float(tn) + float(fn))), 3)
    f_output = open(args.output, 'a')
    f_output.write('=======\n')
    f_output.write('{}epochs_{}batch_vgg19\n'.format(epochs, batch_size))
    f_output.write('TN: {}\n'.format(tn))
    f_output.write('FN: {}\n'.format(fn))
    f_output.write('TP: {}\n'.format(tp))
    f_output.write('FP: {}\n'.format(fp))
    f_output.write('TPR: {}\n'.format(TPR))
    f_output.write('FPR: {}\n'.format(FPR))
    f_output.write('accuracy: {}\n'.format(accuracy))
    f_output.write('specitivity: {}\n'.format(specitivity))
    f_output.write('sensitivity : {}\n'.format(sensitivity))
    f_output.write('mcc : {}\n'.format(mcc))
    f_output.write('{}'.format(report))
    f_output.write('=======\n')
    f_output.close()
    end_time = time.monotonic()
    print('Duration : {}'.format(timedelta(seconds=end_time - start_time)))

It first sets up the necessary variables and arguments for the model, such as the input directory, image dimensions, and number of epochs and batch size. Then, it loads and splits the dataset into training and test sets. Next, it builds the model using the VGG19 architecture and compiles it with an optimizer and loss function. The model is then trained on the training data. After training, the model is saved and used to make predictions on the test set. The results are then analyzed using metrics such as accuracy, sensitivity, and FPR, which are calculated from a confusion matrix. Finally, the results are written to an output file along with a report and the duration of the training process is printed.

randomforest.py

def build_dataset(data_directory, img_width):
    (X, y, tags) = dataset.dataset(data_directory, int(img_width))
    nb_classes = len(tags)
    feature = X
    label = np_utils.to_categorical(y, nb_classes)
    return (feature, label, nb_classes)

This code snippet defines a function called build_dataset that takes two arguments: data_directory and img_width. It then calls a function called dataset that is imported from another module, passing in the data_directory and img_width as arguments. The output of the dataset function is assigned to three variables — X, y and tags. The value of tags is used to determine the number of classes in the data, which is assigned to the variable nb_classes. The variable X is then assigned to a new variable called feature, while the variable y is converted to categorical labels using the np_utils.to_categorical function, which takes in the y variable and the number of classes. The resulting categorical labels are assigned to the variable label. Finally, the function returns a tuple containing the feature, label and nb_classes variables. This code snippet is likely used to create a dataset for machine learning or deep learning models, where the data is organized into features and labels.

def random_forest_classifier(features, target):
    
    clf = RandomForestClassifier()
    clf.fit(features, target)
    return clf

This code snippet creates a random forest classifier model using the scikit-learn library. The function takes in two arguments, features and targets, which represent the input data and the corresponding labels, respectively. The next line creates an instance of the RandomForestClassifier class, which is a type of machine learning model that uses an ensemble of decision trees to make predictions. The next line fits the model to the provided data, meaning it trains the model by finding the best combinations of features to use for making predictions. Finally, the function returns the trained model. This code could be used to quickly build and train a random forest classifier model for a given dataset.

def main():
    start_time = time.monotonic()
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('-i', '--input', help='an input directory of dataset', required=True)
    parser.add_argument('-d', '--dimension', help='a image dimension', type=int, default=50)
    parser.add_argument('-o', '--output', help='a result file', type=str, default='hasilnya.txt')
    args = parser.parse_args()
    data_directory = args.input
    print('loading dataset')
    (X_train, Y_train, nb_classes) = build_dataset('{}/train'.format(data_directory), args.dimension)
    (X_test, Y_test, nb_classes) = build_dataset('{}/test'.format(data_directory), args.dimension)
    print('number of classes : {}'.format(nb_classes))
    trained_model = random_forest_classifier(X_train, Y_train)
    joblib.dump(trained_model, 'randomforest_model_{}.pkl'.format(data_directory.replace('/', '_')))
    predicted = trained_model.predict(X_test)
    y_pred = np.argmax(predicted, axis=1)
    Y_test = np.argmax(Y_test, axis=1)
    cm = confusion_matrix(Y_test, y_pred)
    report = classification_report(Y_test, y_pred)
    tn = cm[0][0]
    fn = cm[1][0]
    tp = cm[1][1]
    fp = cm[0][1]
    if tp == 0:
        tp = 1
    if tn == 0:
        tn = 1
    if fp == 0:
        fp = 1
    if fn == 0:
        fn = 1
    TPR = float(tp) / (float(tp) + float(fn))
    FPR = float(fp) / (float(fp) + float(tn))
    accuracy = round((float(tp) + float(tn)) / (float(tp) + float(fp) + float(fn) + float(tn)), 3)
    specitivity = round(float(tn) / (float(tn) + float(fp)), 3)
    sensitivity = round(float(tp) / (float(tp) + float(fn)), 3)
    mcc = round((float(tp) * float(tn) - float(fp) * float(fn)) / math.sqrt((float(tp) + float(fp)) * (float(tp) + float(fn)) * (float(tn) + float(fp)) * (float(tn) + float(fn))), 3)
    f_output = open(args.output, 'a')
    f_output.write('=======\n')
    f_output.write('randomforest_model_{}\n'.format(data_directory.replace('/', '_')))
    f_output.write('TN: {}\n'.format(tn))
    f_output.write('FN: {}\n'.format(fn))
    f_output.write('TP: {}\n'.format(tp))
    f_output.write('FP: {}\n'.format(fp))
    f_output.write('TPR: {}\n'.format(TPR))
    f_output.write('FPR: {}\n'.format(FPR))
    f_output.write('accuracy: {}\n'.format(accuracy))
    f_output.write('specitivity: {}\n'.format(specitivity))
    f_output.write('sensitivity : {}\n'.format(sensitivity))
    f_output.write('mcc : {}\n'.format(mcc))
    f_output.write('{}'.format(report))
    f_output.write('=======\n')
    f_output.close()
    end_time = time.monotonic()
    print('Duration : {}'.format(timedelta(seconds=end_time - start_time)))

This python code snippet performs a random forest classification on a dataset. First, it imports necessary libraries and defines a main function. Then, it uses argparse to parse user input for the input directory, image dimension, and output file. It then loads the dataset and creates a trained model using the random_forest_classifier function. This model is then saved using joblib. Next, it uses the trained model to predict classes for the test dataset and calculates performance metrics such as true positive rate TPR, false positive rate FPR, accuracy, specitivity, sensitivity, and Matthews Correlation Coefficient MCC. Finally, it writes the performance metrics and classification report to the output file and displays the duration of the process. This code snippet is useful for evaluating the performance of a classification model on a given dataset.

myVgg16.py

def build_dataset(data_directory, img_width):
    (X, y, tags) = dataset.dataset(data_directory, int(img_width))
    nb_classes = len(tags)
    sample_count = len(y)
    train_size = sample_count
    print('train size : {}'.format(train_size))
    feature = X
    label = np_utils.to_categorical(y, nb_classes)
    return (feature, label, nb_classes)

This code snippet creates a function called build_dataset that takes in two inputs: a data directory and an image width. The function then calls another function, dataset, using these inputs and stores the outputs in three variables: X, y, and tags. X represents the features or data samples, y represents the labels or categories for each sample, and tags represents the different possible labels. The function then calculates the number of classes by finding the length of the tags variable. It also stores the number of samples in a variable called sample_count and sets the train size to be equal to this value. Finally, the code uses the to_categorical function from the numpy.utils library to convert the labels into a categorical format and returns the features, labels, and number of classes as outputs for the function. This function can be used to organize and preprocess a dataset for use in machine learning algorithms.

def build_model(SHAPE, nb_classes, bn_axis, seed=None):
    if seed:
        np.random.seed(seed)
    input_layer = Input(shape=SHAPE)
    x = Conv2D(64, (3, 3), activation='relu', padding='same', name='block1_conv1')(input_layer)
    x = Conv2D(64, (3, 3), activation='relu', padding='same', name='block1_conv2')(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name='block1_pool')(x)
    x = Conv2D(128, (3, 3), activation='relu', padding='same', name='block2_conv1')(x)
    x = Conv2D(128, (3, 3), activation='relu', padding='same', name='block2_conv2')(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name='block2_pool')(x)
    x = Conv2D(256, (3, 3), activation='relu', padding='same', name='block3_conv1')(x)
    x = Conv2D(256, (3, 3), activation='relu', padding='same', name='block3_conv2')(x)
    x = Conv2D(256, (3, 3), activation='relu', padding='same', name='block3_conv3')(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name='block3_pool')(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same', name='block4_conv1')(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same', name='block4_conv2')(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same', name='block4_conv3')(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), name='block4_pool')(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same', name='block5_conv1')(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same', name='block5_conv2')(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same', name='block5_conv3')(x)
    x = Flatten(name='flatten')(x)
    x = Dense(4096, activation='relu', name='fc1')(x)
    x = Dense(4096, activation='relu', name='fc2')(x)
    x = Dense(nb_classes, activation='softmax', name='predictions')(x)
    model = Model(input_layer, x)
    return model

This code snippet defines a function called build_model that takes in four parameters: SHAPE, nb_classes, bn_axis, and seed. The if statement checks if a seed value was provided and if so, sets the numpy random seed to that value. The next line creates an input layer with the specified shape. Then, a series of convolutional layers and max pooling layers are defined. These convolutional layers use different filter sizes and activations to extract features from the input data. The max pooling layers downsample the feature maps to reduce the number of parameters and thus prevent overfitting. After these convolutional and pooling layers, there is a set of fully connected layers that are used to classify the extracted features. The number of neurons in these layers is specified by the nb_classes parameter. Finally, the model is created using the input layer and the output layer and then returned by the function. This function can be used to create a deep neural network model for image classification tasks.

def main():
    start_time = time.monotonic()
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('-i', '--input', help='an input directory of dataset', required=True)
    parser.add_argument('-d', '--dimension', help='a image dimension', type=int, default=48)
    parser.add_argument('-c', '--channel', help='a image channel', type=int, default=3)
    parser.add_argument('-e', '--epochs', help='num of epochs', type=int, default=10)
    parser.add_argument('-b', '--batch_size', help='num of batch_size', type=int, default=64)
    parser.add_argument('-o', '--output', help='a result file', type=str, default='hasilnya.txt')
    args = parser.parse_args()
    (img_width, img_height) = (args.dimension, args.dimension)
    channel = args.channel
    epochs = args.epochs
    batch_size = args.batch_size
    SHAPE = (img_width, img_height, channel)
    bn_axis = 3 if K.image_dim_ordering() == 'tf' else 1
    data_directory = args.input
    period_name = data_directory.split('/')
    print('loading dataset')
    (X_train, Y_train, nb_classes) = build_dataset('{}/train'.format(data_directory), args.dimension)
    (X_test, Y_test, nb_classes) = build_dataset('{}/test'.format(data_directory), args.dimension)
    print('number of classes : {}'.format(nb_classes))
    model = build_model(SHAPE, nb_classes, bn_axis)
    model.compile(optimizer=Adam(lr=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])
    model.fit(X_train, Y_train, batch_size=batch_size, epochs=epochs)
    model.save('{}epochs_{}batch_vgg16_model_{}.h5'.format(epochs, batch_size, data_directory.replace('/', '_')), overwrite=True)
    predicted = model.predict(X_test)
    y_pred = np.argmax(predicted, axis=1)
    Y_test = np.argmax(Y_test, axis=1)
    cm = confusion_matrix(Y_test, y_pred)
    report = classification_report(Y_test, y_pred)
    tn = cm[0][0]
    fn = cm[1][0]
    tp = cm[1][1]
    fp = cm[0][1]
    if tp == 0:
        tp = 1
    if tn == 0:
        tn = 1
    if fp == 0:
        fp = 1
    if fn == 0:
        fn = 1
    TPR = float(tp) / (float(tp) + float(fn))
    FPR = float(fp) / (float(fp) + float(tn))
    accuracy = round((float(tp) + float(tn)) / (float(tp) + float(fp) + float(fn) + float(tn)), 3)
    specitivity = round(float(tn) / (float(tn) + float(fp)), 3)
    sensitivity = round(float(tp) / (float(tp) + float(fn)), 3)
    mcc = round((float(tp) * float(tn) - float(fp) * float(fn)) / math.sqrt((float(tp) + float(fp)) * (float(tp) + float(fn)) * (float(tn) + float(fp)) * (float(tn) + float(fn))), 3)
    f_output = open(args.output, 'a')
    f_output.write('=======\n')
    f_output.write('{}epochs_{}batch_vgg16\n'.format(epochs, batch_size))
    f_output.write('TN: {}\n'.format(tn))
    f_output.write('FN: {}\n'.format(fn))
    f_output.write('TP: {}\n'.format(tp))
    f_output.write('FP: {}\n'.format(fp))
    f_output.write('TPR: {}\n'.format(TPR))
    f_output.write('FPR: {}\n'.format(FPR))
    f_output.write('accuracy: {}\n'.format(accuracy))
    f_output.write('specitivity: {}\n'.format(specitivity))
    f_output.write('sensitivity : {}\n'.format(sensitivity))
    f_output.write('mcc : {}\n'.format(mcc))
    f_output.write('{}'.format(report))
    f_output.write('=======\n')
    f_output.close()
    end_time = time.monotonic()
    print('Duration : {}'.format(timedelta(seconds=end_time - start_time)))

This code snippet is a function called main that sets up several arguments and parameters for an image classification task. It uses a time.monotonic function to track the start and end time of the task. The argparse library is used to define arguments that will be passed into the script, such as the input directory, image dimension, number of epochs, batch size, and output file name. These arguments are then parsed and saved into variables. Additionally, the code uses the keras library to build a model and compile it with an optimizer and loss metric. The model is then trained using the specified number of epochs and batch size. Once trained, the model is saved and used to predict on a test dataset. The results are then evaluated using various metrics, such as accuracy, sensitivity, specificity, and the Matthews Correlation Coefficient MCC. These results are then written to an output file. Ultimately, this snippet of code is used to effectively train and test an image classification model and evaluate its performance.

preproccess_binclass.py

def isnan(value):
    try:
        import math
        return math.isnan(float(value))
    except:
        return False

This code snippet defines a function called isnan which takes in a value as its parameter. The function then attempts to import the math module and use the isnan function from it to check if the value is not a number NaN. If the value is not a number, the function returns True. If there is an error while importing the math module or using the isnan function, the function returns False. This code is useful for checking if a value is a valid number or not, and can be used in data cleaning or validation processes.

def removeOutput(finput):
    if Path(finput).is_file():
        os.remove(finput)

This code snippet defines a function called removeOutput that takes in a parameter called finput. The function checks if the input finput is a file using the is_file method of the Path class. If it is a file, then it uses the os.remove method to delete the file from the system. This function can be used to easily remove a file from the system by passing in its file path as the finput parameter.

def main():
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('-i', '--input', help='a csv file of stock data', required=True)
    parser.add_argument('-l', '--seq_len', help='num of sequence length', default=20)
    parser.add_argument('-lf', '--label_file', help='a label_file')
    parser.add_argument('-d', '--dimension', help='a dimension value', type=int, default=48)
    parser.add_argument('-t', '--dataset_type', help='training or testing datasets')
    parser.add_argument('-m', '--mode', help='mode of preprocessing data', required=True)
    parser.add_argument('-v', '--use_volume', help='combine with volume.', default=False)
    args = parser.parse_args()
    if args.mode == 'ohlc2cs':
        ohlc2cs(args.input, args.seq_len, args.dataset_type, args.dimension, args.use_volume)
    if args.mode == 'createLabel':
        createLabel(args.input, args.seq_len)
    if args.mode == 'img2dt':
        image2dataset(args.input, args.label_file)
    if args.mode == 'countImg':
        countImage(args.input)

This code snippet is defining the main function and setting up an argument parser to handle command line inputs for various parameters. The parser is first created with a default formatter class to display help information about the arguments. Then, arguments are added using the add_argument method, specifying their names, help messages, and required/default values. The function then calls the parse_args method to parse the command line inputs and store them in the args variable. The rest of the code checks the value of the mode argument and performs different operations based on its value, such as converting stock data to a specific format, creating labels, and counting images. This code allows for easy handling of different types of data and operations through command line inputs.

def image2dataset(input, label_file):
    label_dict = {}
    with open(label_file) as f:
        for line in f:
            (key, val) = line.split(',')
            label_dict[key] = val.rstrip()
    path = '{}/{}'.format(os.getcwd(), input)
    print(path)
    for filename in os.listdir(path):
        if filename is not '':
            for (k, v) in label_dict.items():
                splitname = filename.split('_')
                (f, e) = os.path.splitext(filename)
                newname = '{}_{}'.format(splitname[0], splitname[1])
                if newname == k:
                    new_name = '{}{}.png'.format(v, f)
                    os.rename('{}/{}'.format(path, filename), '{}/{}'.format(path, new_name))
                    break
    folders = ['1', '0']
    for folder in folders:
        if not os.path.exists('{}/classes/{}'.format(path, folder)):
            os.makedirs('{}/classes/{}'.format(path, folder))
    for filename in os.listdir(path):
        if filename is not '':
            if filename[:1] == '1':
                move('{}/{}'.format(path, filename), '{}/classes/1/{}'.format(path, filename))
            elif filename[:1] == '0':
                move('{}/{}'.format(path, filename), '{}/classes/0/{}'.format(path, filename))

This code snippet is a function that converts images into a dataset for machine learning or data analysis purposes. The input parameter specifies the directory containing the images, while the label_file parameter specifies the file that contains the labels for each image. The function first reads in the label file and creates a dictionary with the image names as keys and the corresponding labels as values. It then loops through all the images in the input directory and checks if their names match any of the keys in the dictionary. If there is a match, the image is renamed with the label and moved to a new directory. This process is repeated for all the images in the input directory, with the images being classified into different folders based on their labels. This function is useful for preparing image data for classification tasks or other machine learning applications.

def createLabel(fname, seq_len):
    print('Creating label . . .')
    filename = fname.split('/')
    removeOutput('{}_label_{}.txt'.format(filename[1][:-4], seq_len))
    df = pd.read_csv(fname, parse_dates=True, index_col=0)
    df.fillna(0)
    df.reset_index(inplace=True)
    df['Date'] = df['Date'].map(mdates.date2num)
    for i in range(0, len(df)):
        c = df.ix[i:i + int(seq_len), :]
        starting = 0
        endvalue = 0
        label = ''
        if len(c) == int(seq_len) + 1:
            starting = c['Close'].iloc[-2]
            endvalue = c['Close'].iloc[-1]
            if endvalue > starting:
                label = 1
            else:
                label = 0
            with open('{}_label_{}.txt'.format(filename[1][:-4], seq_len), 'a') as the_file:
                the_file.write('{}-{},{}'.format(filename[1][:-4], i, label))
                the_file.write('\n')
    print('Create label finished.')

This code snippet is a function that creates a label for a given file and sequence length. It first prints a message indicating that it is creating a label. Then, it takes in two arguments: the name of the file and the sequence length. The code then removes the output file if it already exists, using the file name and sequence length to create a new file name. Next, it uses a library called pandas to read the file and convert the dates in the file into numbers. Then, it loops through the data and checks the closing values for a given sequence. If the end value is higher than the starting value, the label is set to 1, otherwise it is set to 0. The label is then written to a new file, along with the file name and index. Once all the data has been looped through, the label creation is finished and a message is printed indicating this. Essentially, this code snippet is creating a label indicating whether the stocks closing value increased or decreased for a given sequence, using a pandas library to manipulate the data and create the label.

def countImage(input):
    num_file = sum([len(files) for (r, d, files) in os.walk(input)])
    num_dir = sum([len(d) for (r, d, files) in os.walk(input)])
    print('num of files : {}\nnum of dir : {}'.format(num_file, num_dir))

This python code snippet uses the os module to count the number of files and directories contained within the specified input location. It first uses the os.walk function to recursively traverse through all the directories and subdirectories within the input location. For each iteration, it finds the number of files and directories in that particular directory and adds it to a list. Then, the sum function is used to add up all the elements in the list, giving the total number of files and directories in the entire input location. Finally, the result is printed in a user-friendly format using the string formatting syntax. This code snippet can be useful when trying to analyze the contents of a directory or to detect any changes in the number of files and directories in a particular location.

def ohlc2cs(fname, seq_len, dataset_type, dimension, use_volume):
    print('Converting olhc to candlestick')
    symbol = fname.split('_')[0]
    symbol = symbol.split('/')[1]
    print(symbol)
    path = '{}'.format(os.getcwd())
    if not os.path.exists('{}/dataset/{}_{}/{}/{}'.format(path, seq_len, dimension, symbol, dataset_type)):
        os.makedirs('{}/dataset/{}_{}/{}/{}'.format(path, seq_len, dimension, symbol, dataset_type))
    df = pd.read_csv(fname, parse_dates=True, index_col=0)
    df.fillna(0)
    plt.style.use('dark_background')
    df.reset_index(inplace=True)
    df['Date'] = df['Date'].map(mdates.date2num)
    for i in range(0, len(df)):
        c = df.ix[i:i + int(seq_len) - 1, :]
        if len(c) == int(seq_len):
            my_dpi = 96
            fig = plt.figure(figsize=(dimension / my_dpi, dimension / my_dpi), dpi=my_dpi)
            ax1 = fig.add_subplot(1, 1, 1)
            candlestick2_ochl(ax1, c['Open'], c['Close'], c['High'], c['Low'], width=1, colorup='
            ax1.grid(False)
            ax1.set_xticklabels([])
            ax1.set_yticklabels([])
            ax1.xaxis.set_visible(False)
            ax1.yaxis.set_visible(False)
            ax1.axis('off')
            if use_volume:
                ax2 = ax1.twinx()
                bc = volume_overlay(ax2, c['Open'], c['Close'], c['Volume'], colorup='
                ax2.add_collection(bc)
                ax2.grid(False)
                ax2.set_xticklabels([])
                ax2.set_yticklabels([])
                ax2.xaxis.set_visible(False)
                ax2.yaxis.set_visible(False)
                ax2.axis('off')
            pngfile = 'dataset/{}_{}/{}/{}/{}-{}.png'.format(seq_len, dimension, symbol, dataset_type, fname[11:-4], i)
            fig.savefig(pngfile, pad_inches=0, transparent=False)
            plt.close(fig)
    print('Converting olhc to candlestik finished.')

This python code snippet converts a raw data file from OHLC Open, High, Low, Close format to a candlestick chart, which is a different graphical representation of the same information. It does this by taking in the file name, sequence length, dataset type, dimension, and whether to use volume information as parameters. It then converts the file and saves the resulting candlestick chart as a new image file. First, the code splits the file name to extract the symbol of the financial instrument and prints it to the screen. Then, it sets the current working directory and creates a new directory for the converted dataset if it does not already exist. Next, the code reads the raw data file into a dataframe, fills any missing information with zeros, and sets the plot style to dark background. It then resets the index of the dataframe and converts the date column to a numerical format. The code then iterates through each row of the dataframe and creates a candlestick chart with the specified sequence length. If the length of the data is equal to the sequence length, it creates a new figure with the desired dimension and adds a subplot. The candlestick chart is plotted on the subplot and certain formatting options are applied to the chart, such as removing the grid and axis labels. If the parameter to use volume information is true, the code also adds a volume overlay to the chart on a separate axis. Once the chart is complete, it is saved as a new image file in the previously created directory. After all data rows have been processed, the code closes the figure and prints a message indicating that the conversion is finished.

generatedata.py

def cre8outputdir(pathdir, targetdir):
    if not os.path.exists('{}/{}'.format(pathdir, targetdir)):
        os.mkdir('{}/{}'.format(pathdir, targetdir))
    if not os.path.exists('{}/{}/train'.format(pathdir, targetdir)):
        os.mkdir('{}/{}/train'.format(pathdir, targetdir))
    if not os.path.exists('{}/{}/test'.format(pathdir, targetdir)):
        os.mkdir('{}/{}/test'.format(pathdir, targetdir))
    if not os.path.exists('{}/{}/train/0'.format(pathdir, targetdir)):
        os.mkdir('{}/{}/train/0'.format(pathdir, targetdir))
    if not os.path.exists('{}/{}/train/1'.format(pathdir, targetdir)):
        os.mkdir('{}/{}/train/1'.format(pathdir, targetdir))
    if not os.path.exists('{}/{}/test/0'.format(pathdir, targetdir)):
        os.mkdir('{}/{}/test/0'.format(pathdir, targetdir))
    if not os.path.exists('{}/{}/test/1'.format(pathdir, targetdir)):
        os.mkdir('{}/{}/test/1'.format(pathdir, targetdir))

This code snippet creates a directory structure for a machine learning project. It uses the os library to check if certain directories exist within a specified path, and if they do not exist, it creates them. The directories are organized into a main target folder, with subfolders for train and test data, as well as categories for class 0 and class 1 within the training and testing folders. The code ensures that the necessary directories are created so that the project can be properly organized and data can be easily accessed for training and testing.

myDeepCNN.py

def build_dataset(data_directory, img_width):
    (X, y, tags) = dataset.dataset(data_directory, int(img_width))
    nb_classes = len(tags)
    sample_count = len(y)
    train_size = sample_count
    print('train size : {}'.format(train_size))
    feature = X
    label = np_utils.to_categorical(y, nb_classes)
    return (feature, label, nb_classes)

This code snippet defines a function called build_dataset which takes in two parameters, a data_directory and an image width. The first step in the function is to use another predefined function called dataset to create three variables called X, y, and tags. This function most likely converts the data in the specified directory into a suitable format for building a dataset. The next few lines calculate the number of classes in the data and the total number of samples. Finally, the code sets two variables called feature and label, which are the input and output of the dataset, respectively. The last line converts the output data into a categorical format using a predefined function called np_utils.to_categorical. The function then returns these three variables, along with the number of classes. Overall, the purpose of this function is to take in a data directory and convert it into a properly formatted dataset that can be used for machine learning or other data analysis tasks.

def build_model(SHAPE, nb_classes, bn_axis, seed=None):
    if seed:
        np.random.seed(seed)
    input_layer = Input(shape=SHAPE)
    x = Conv2D(32, 3, 3, init='glorot_uniform', border_mode='same', activation='relu')(input_layer)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Conv2D(48, 3, 3, init='glorot_uniform', border_mode='same', activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.25)(x)
    x = Conv2D(64, 3, 3, init='glorot_uniform', border_mode='same', activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Conv2D(96, 3, 3, init='glorot_uniform', border_mode='same', activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.25)(x)
    x = Flatten()(x)
    x = Dense(output_dim=256, activation='relu')(x)
    x = Dropout(0.5)(x)
    x = Dense(output_dim=2, activation='softmax')(x)
    model = Model(input_layer, x)
    return model

This code snippet is defining a function called build_model that takes in four parameters: SHAPE, nb_classes, bn_axis, and seed. The if statement on line 3 checks if a seed value has been provided, and if so, sets the random seed for the numpy library. The following lines of code create various layers for a convolutional neural network CNN model. Convolutional layers are used to extract features from images, while MaxPooling layers are used to reduce the dimensionality of the extracted features. The Dropout layers are used to prevent overfitting by randomly dropping out a percentage of the neurons during training. The Flatten layer flattens the output from the previous layers into a vector, which is then fed into two Dense layers for classification. The first Dense layer has 256 output units and uses the ReLU activation function, while the second Dense layer has 2 output units and uses the softmax activation function. Finally, the function returns the model created using the input layer and the output layer. This function can be used to easily build a CNN model with customizable parameters for different image classification tasks.

def main():
    start_time = time.monotonic()
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('-i', '--input', help='an input directory of dataset', required=True)
    parser.add_argument('-d', '--dimension', help='a image dimension', type=int, default=48)
    parser.add_argument('-c', '--channel', help='a image channel', type=int, default=3)
    parser.add_argument('-e', '--epochs', help='num of epochs', type=int, default=10)
    parser.add_argument('-b', '--batch_size', help='num of batch_size', type=int, default=64)
    parser.add_argument('-o', '--output', help='a result file', type=str, default='hasilnya.txt')
    args = parser.parse_args()
    (img_width, img_height) = (args.dimension, args.dimension)
    channel = args.channel
    epochs = args.epochs
    batch_size = args.batch_size
    SHAPE = (img_width, img_height, channel)
    bn_axis = 3 if K.image_dim_ordering() == 'tf' else 1
    data_directory = args.input
    print('loading dataset')
    (X_train, Y_train, nb_classes) = build_dataset('{}/train'.format(data_directory), args.dimension)
    (X_test, Y_test, nb_classes) = build_dataset('{}/test'.format(data_directory), args.dimension)
    print('number of classes : {}'.format(nb_classes))
    model = build_model(SHAPE, nb_classes, bn_axis)
    model.compile(optimizer=Adam(lr=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])
    model.fit(X_train, Y_train, batch_size=batch_size, epochs=epochs)
    model.save('{}epochs_{}batch_cnn_model_{}.h5'.format(epochs, batch_size, data_directory.replace('/', '_')), overwrite=True)
    predicted = model.predict(X_test)
    y_pred = np.argmax(predicted, axis=1)
    Y_test = np.argmax(Y_test, axis=1)
    cm = confusion_matrix(Y_test, y_pred)
    report = classification_report(Y_test, y_pred)
    tn = cm[0][0]
    fn = cm[1][0]
    tp = cm[1][1]
    fp = cm[0][1]
    if tp == 0:
        tp = 1
    if tn == 0:
        tn = 1
    if fp == 0:
        fp = 1
    if fn == 0:
        fn = 1
    TPR = float(tp) / (float(tp) + float(fn))
    FPR = float(fp) / (float(fp) + float(tn))
    accuracy = round((float(tp) + float(tn)) / (float(tp) + float(fp) + float(fn) + float(tn)), 3)
    specitivity = round(float(tn) / (float(tn) + float(fp)), 3)
    sensitivity = round(float(tp) / (float(tp) + float(fn)), 3)
    mcc = round((float(tp) * float(tn) - float(fp) * float(fn)) / math.sqrt((float(tp) + float(fp)) * (float(tp) + float(fn)) * (float(tn) + float(fp)) * (float(tn) + float(fn))), 3)
    f_output = open(args.output, 'a')
    f_output.write('=======\n')
    f_output.write('{}epochs_{}batch_cnn\n'.format(epochs, batch_size))
    f_output.write('TN: {}\n'.format(tn))
    f_output.write('FN: {}\n'.format(fn))
    f_output.write('TP: {}\n'.format(tp))
    f_output.write('FP: {}\n'.format(fp))
    f_output.write('TPR: {}\n'.format(TPR))
    f_output.write('FPR: {}\n'.format(FPR))
    f_output.write('accuracy: {}\n'.format(accuracy))
    f_output.write('specitivity: {}\n'.format(specitivity))
    f_output.write('sensitivity : {}\n'.format(sensitivity))
    f_output.write('mcc : {}\n'.format(mcc))
    f_output.write('{}'.format(report))
    f_output.write('=======\n')
    f_output.close()
    end_time = time.monotonic()
    print('Duration : {}'.format(timedelta(seconds=end_time - start_time)))

This python code snippet is a function that executes a machine learning task. First, it imports the necessary libraries and sets the start time of the process. Then, it creates an argument parser that allows the user to pass in arguments when running the code. These arguments specify the input directory of the dataset, the dimensions and channels of the images, the number of epochs and batch size for training, and the output file where the results will be saved. The code then parses these arguments and assigns them to variables for later use. Next, it loads the dataset and builds the model using the specified dimensions and classes. The model is compiled with an optimizer and loss function, and then trained on the dataset. Once training is complete, the model is saved. Finally, the model is used to predict on the test dataset and calculate the metrics such as sensitivity and accuracy. These metrics are then saved to the output file along with the classification report and the total duration of the process is recorded and printed.

knn.py

def build_dataset(data_directory, img_width):
    (X, y, tags) = dataset.dataset(data_directory, int(img_width))
    nb_classes = len(tags)
    feature = X
    label = np_utils.to_categorical(y, nb_classes)
    return (feature, label, nb_classes)

This code is a function with the intent of building a dataset. The function takes in two arguments: data_directory, which is the directory where the dataset is located, and img_width, which is the width of the images in the dataset. The first line of the function assigns three variables: X, y, and tags. These variables are used to store the dataset, the labels, and the tags associated with the dataset. The function then calls the dataset function, passing in the data_directory and img_width as arguments. This dataset function returns a dataset, which is then assigned to the X variable, and the dataset labels are assigned to the y variable. The number of classes in the dataset is calculated by taking the length of the tags variable. The X variable is then assigned to the feature variable, and the labels are converted to categorical values using the np_utils.to_categorical function and assigned to the label variable. The function then returns three values: the feature, the label, and the number of classes. This can then be used to build a machine learning model using the dataset.

def knn_classifier(features, target):
    clf = neighbors.KNeighborsClassifier(algorithm='kd_tree')
    clf.fit(features, target)
    return clf

This code snippet defines a function called knn_classifier, which takes two parameters: features and target. Inside the function, it creates an object called clf that represents a K Nearest Neighbors classifier from the neighbors module. The algorithm parameter is set to kd_tree to specify the algorithm to be used for finding nearest neighbors. The features and target data are then used to train the classifier using the fit method, which finds the optimal boundaries between different classes in the data. Finally, the trained classifier clf is returned as the output of the function, which can then be used to make predictions on new data. This code snippet essentially creates and trains a K Nearest Neighbors classifier, and returns it for later use in making predictions.

def main():
    start_time = time.monotonic()
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('-i', '--input', help='an input directory of dataset', required=True)
    parser.add_argument('-d', '--dimension', help='a image dimension', type=int, default=50)
    parser.add_argument('-o', '--output', help='a result file', type=str, default='hasilnya.txt')
    args = parser.parse_args()
    data_directory = args.input
    print('loading dataset')
    (X_train, Y_train, nb_classes) = build_dataset('{}/train'.format(data_directory), args.dimension)
    (X_test, Y_test, nb_classes) = build_dataset('{}/test'.format(data_directory), args.dimension)
    print('number of classes : {}'.format(nb_classes))
    trained_model = knn_classifier(X_train, Y_train)
    predicted = trained_model.predict(X_test)
    y_pred = np.argmax(predicted, axis=1)
    Y_test = np.argmax(Y_test, axis=1)
    cm = confusion_matrix(Y_test, y_pred)
    report = classification_report(Y_test, y_pred)
    tn = cm[0][0]
    fn = cm[1][0]
    tp = cm[1][1]
    fp = cm[0][1]
    if tp == 0:
        tp = 1
    if tn == 0:
        tn = 1
    if fp == 0:
        fp = 1
    if fn == 0:
        fn = 1
    TPR = float(tp) / (float(tp) + float(fn))
    FPR = float(fp) / (float(fp) + float(tn))
    accuracy = round((float(tp) + float(tn)) / (float(tp) + float(fp) + float(fn) + float(tn)), 3)
    specitivity = round(float(tn) / (float(tn) + float(fp)), 3)
    sensitivity = round(float(tp) / (float(tp) + float(fn)), 3)
    mcc = round((float(tp) * float(tn) - float(fp) * float(fn)) / math.sqrt((float(tp) + float(fp)) * (float(tp) + float(fn)) * (float(tn) + float(fp)) * (float(tn) + float(fn))), 3)
    f_output = open(args.output, 'a')
    f_output.write('=======\n')
    f_output.write('knn_model_{}\n'.format(data_directory.replace('/', '_')))
    f_output.write('TN: {}\n'.format(tn))
    f_output.write('FN: {}\n'.format(fn))
    f_output.write('TP: {}\n'.format(tp))
    f_output.write('FP: {}\n'.format(fp))
    f_output.write('TPR: {}\n'.format(TPR))
    f_output.write('FPR: {}\n'.format(FPR))
    f_output.write('accuracy: {}\n'.format(accuracy))
    f_output.write('specitivity: {}\n'.format(specitivity))
    f_output.write('sensitivity : {}\n'.format(sensitivity))
    f_output.write('mcc : {}\n'.format(mcc))
    f_output.write('{}'.format(report))
    f_output.write('=======\n')
    f_output.close()
    end_time = time.monotonic()
    print('Duration : {}'.format(timedelta(seconds=end_time - start_time)))

The function first starts a timer to record the execution time of the code and then uses the argparse module to allow the user to input various command-line arguments to specify the input directory, image dimension, and output file. The build_dataset function is then called to prepare the training and testing data from the specified input directory and the specified image dimension. The number of classes in the dataset is also determined. Next, the KNN model is trained on the training data and then used to make predictions on the testing data. The predicted and actual values are then compared and a confusion matrix is created. The code then calculates various performance metrics such as true positive rate, false positive rate, accuracy, specificity, sensitivity, and Matthews correlation coefficient. These metrics are then written to the specified output file. Finally, the code calculates the total execution time and prints it. Overall, this code snippet serves as an evaluation and reporting tool for a KNN classification model on a dataset.

resnet50.py

def build_dataset(data_directory, img_width):
    (X, y, tags) = dataset.dataset(data_directory, int(img_width))
    nb_classes = len(tags)
    sample_count = len(y)
    train_size = sample_count
    print('train size : {}'.format(train_size))
    feature = X
    label = np_utils.to_categorical(y, nb_classes)
    return (feature, label, nb_classes)

This code snippet defines a function called build_dataset that takes two arguments: data_directory and img_width. It then sets three variables: X, y, and tags to the values returned by the dataset function, which takes the data_directory and intimg_width as arguments. The variables X and y contain the features and labels of the dataset respectively, while tags is a list of different class labels. The snippet then calculates the number of classes in the dataset nb_classes and the total number of samples sample_count. Finally, it converts the labels into categorical values using the np_utils.to_categoritical function and returns the features, labels, and number of classes as a tuple. This function is useful for building a dataset with a specific image width for machine learning or deep learning tasks.

def identity_block(input_tensor, kernel_size, filters, stage, block):
    
    (filters1, filters2, filters3) = filters
    if K.image_data_format() == 'channels_last':
        bn_axis = 3
    else:
        bn_axis = 1
    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'
    x = Conv2D(filters1, (1, 1), name=conv_name_base + '2a')(input_tensor)
    x = BatchNormalization(axis=bn_axis, name=bn_name_base + '2a')(x)
    x = Activation('relu')(x)
    x = Conv2D(filters2, kernel_size, padding='same', name=conv_name_base + '2b')(x)
    x = BatchNormalization(axis=bn_axis, name=bn_name_base + '2b')(x)
    x = Activation('relu')(x)
    x = Conv2D(filters3, (1, 1), name=conv_name_base + '2c')(x)
    x = BatchNormalization(axis=bn_axis, name=bn_name_base + '2c')(x)
    x = add([x, input_tensor])
    x = Activation('relu')(x)
    return x

This code snippet is used to create an identity block in a convolutional neural network. An identity block is a part of a ResNet Residual Network architecture, which is a type of deep learning network that is commonly used in image recognition tasks. The identity block is responsible for performing feature extraction on an input image. The code snippet takes in various parameters such as the input tensor which contains the image data, the size of the kernel, the number of filters, the stage and block number. The filters are further divided into three values and stored in the variables filters1, filters2, and filters3. Next, the code checks the image data format and assigns an appropriate value to the bn_axis variable. The variables conv_name_base and bn_name_base are used to create names for the different stages and blocks of the identity block. The code then performs a series of convolution, batch normalization, and activation operations on the input tensor. Finally, the output is added to the input tensor, and an activation function is applied to the result. This output is then returned, which can be used as input for subsequent identity blocks or other layers in the network. This code snippet is crucial in creating a ResNet architecture and plays a significant role in successful image recognition in deep learning.

def conv_block(input_tensor, kernel_size, filters, stage, block, strides=(2, 2)):
    
    (filters1, filters2, filters3) = filters
    if K.image_data_format() == 'channels_last':
        bn_axis = 3
    else:
        bn_axis = 1
    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'
    x = Conv2D(filters1, (1, 1), strides=strides, name=conv_name_base + '2a')(input_tensor)
    x = BatchNormalization(axis=bn_axis, name=bn_name_base + '2a')(x)
    x = Activation('relu')(x)
    x = Conv2D(filters2, kernel_size, padding='same', name=conv_name_base + '2b')(x)
    x = BatchNormalization(axis=bn_axis, name=bn_name_base + '2b')(x)
    x = Activation('relu')(x)
    x = Conv2D(filters3, (1, 1), name=conv_name_base + '2c')(x)
    x = BatchNormalization(axis=bn_axis, name=bn_name_base + '2c')(x)
    shortcut = Conv2D(filters3, (1, 1), strides=strides, name=conv_name_base + '1')(input_tensor)
    shortcut = BatchNormalization(axis=bn_axis, name=bn_name_base + '1')(shortcut)
    x = add([x, shortcut])
    x = Activation('relu')(x)
    return x

This code snippet represents a function called conv_block that takes in several inputs including an input_tensor which is a 4D tensor containing the input data, kernel_size which determines the size of the convolutional kernel, filters which is a three-dimensional tuple specifying the number of filters in each convolutional layer, stage which indicates the stage of the convolutional block, and block which is used to differentiate between different blocks within the same stage. The function also has a default value for strides which is a tuple of two integers. The first few lines of code determine the formatting for the data depending on whether the image data is in channels last or channels first format. This is important because the code is meant to work with both formats. The rest of the code is used to build a convolutional neural network. It creates different layers of the network including convolutional layers, batch normalization layers, and activation layers. The overall structure of the network is known as a residual network which is a popular architecture for deep neural networks. It was first introduced to address the problem of vanishing gradients in deep networks. In this particular implementation, the function builds a block that contains three convolutional layers, each with its own batch normalization and activation layer. The output of these layers is combined with a shortcut connection, which is essentially a convolutional layer that maps the input directly to the output. This shortcut connection is used to help the network learn identity mappings when necessary, which makes training easier.

def build_model(SHAPE, nb_classes, bn_axis, seed=None):
    if seed:
        np.random.seed(seed)
    input_layer = Input(shape=SHAPE)
    x = ZeroPadding2D((3, 3))(input_layer)
    x = Conv2D(64, 7, 7, subsample=(2, 2), name='conv1')(x)
    x = BatchNormalization(axis=bn_axis, name='bn_conv1')(x)
    x = Activation('relu')(x)
    x = conv_block(x, 3, [64, 64, 256], stage=2, block='a', strides=(1, 1))
    x = identity_block(x, 3, [64, 64, 256], stage=2, block='b')
    x = identity_block(x, 3, [64, 64, 256], stage=2, block='c')
    x = conv_block(x, 3, [128, 128, 512], stage=3, block='a')
    x = identity_block(x, 3, [128, 128, 512], stage=3, block='b')
    x = identity_block(x, 3, [128, 128, 512], stage=3, block='c')
    x = identity_block(x, 3, [128, 128, 512], stage=3, block='d')
    x = conv_block(x, 3, [256, 256, 1024], stage=4, block='a')
    x = identity_block(x, 3, [256, 256, 1024], stage=4, block='b')
    x = identity_block(x, 3, [256, 256, 1024], stage=4, block='c')
    x = identity_block(x, 3, [256, 256, 1024], stage=4, block='d')
    x = identity_block(x, 3, [256, 256, 1024], stage=4, block='e')
    x = identity_block(x, 3, [256, 256, 1024], stage=4, block='f')
    x = conv_block(x, 3, [512, 512, 2048], stage=5, block='a')
    x = identity_block(x, 3, [512, 512, 2048], stage=5, block='b')
    x = identity_block(x, 3, [512, 512, 2048], stage=5, block='c')
    x = Flatten()(x)
    x = Dense(nb_classes, activation='softmax', name='fc10')(x)
    model = Model(input_layer, x)
    return model

This code snippet is used to build a model for a convolutional neural network CNN. The function takes in parameters for the input shape, number of classes for classification, and the bn_axis batch normalization axis and optionally, a seed for reproducing results. The if seed statement ensures that the random seed is set if a seed is provided. The input layer is defined using the input shape parameter, and the data is padded with zeros. Then, a series of convolutional blocks and identity blocks are added to the model. Each of these blocks performs a series of operations, such as convolution, batch normalization, and activation. These layers help to extract features from the input data and feed them into the model. The model then creates a final convolutional block followed by a flatten layer and a fully connected layer. Finally, the model is returned with the specified input and output layers. This code snippet essentially builds a CNN model with multiple convolutional and identity blocks for feature extraction and classification.

def main():
    start_time = time.monotonic()
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('-i', '--input', help='an input directory of dataset', required=True)
    parser.add_argument('-d', '--dimension', help='a image dimension', type=int, default=48)
    parser.add_argument('-c', '--channel', help='a image channel', type=int, default=3)
    parser.add_argument('-e', '--epochs', help='num of epochs', type=int, default=10)
    parser.add_argument('-b', '--batch_size', help='num of batch_size', type=int, default=64)
    parser.add_argument('-o', '--output', help='a result file', type=str, default='hasilnya.txt')
    args = parser.parse_args()
    (img_width, img_height) = (args.dimension, args.dimension)
    channel = args.channel
    epochs = args.epochs
    batch_size = args.batch_size
    SHAPE = (img_width, img_height, channel)
    bn_axis = 3 if K.image_dim_ordering() == 'tf' else 1
    data_directory = args.input
    period_name = data_directory.split('/')
    print('loading dataset')
    (X_train, Y_train, nb_classes) = build_dataset('{}/train'.format(data_directory), args.dimension)
    (X_test, Y_test, nb_classes) = build_dataset('{}/test'.format(data_directory), args.dimension)
    print('number of classes : {}'.format(nb_classes))
    model = build_model(SHAPE, nb_classes, bn_axis)
    model.compile(optimizer=Adam(lr=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])
    model.fit(X_train, Y_train, batch_size=batch_size, epochs=epochs)
    model.save('{}epochs_{}batch_resnet50_model_{}.h5'.format(epochs, batch_size, data_directory.replace('/', '_')), overwrite=True)
    predicted = model.predict(X_test)
    y_pred = np.argmax(predicted, axis=1)
    Y_test = np.argmax(Y_test, axis=1)
    cm = confusion_matrix(Y_test, y_pred)
    report = classification_report(Y_test, y_pred)
    tn = cm[0][0]
    fn = cm[1][0]
    tp = cm[1][1]
    fp = cm[0][1]
    if tp == 0:
        tp = 1
    if tn == 0:
        tn = 1
    if fp == 0:
        fp = 1
    if fn == 0:
        fn = 1
    TPR = float(tp) / (float(tp) + float(fn))
    FPR = float(fp) / (float(fp) + float(tn))
    accuracy = round((float(tp) + float(tn)) / (float(tp) + float(fp) + float(fn) + float(tn)), 3)
    specitivity = round(float(tn) / (float(tn) + float(fp)), 3)
    sensitivity = round(float(tp) / (float(tp) + float(fn)), 3)
    mcc = round((float(tp) * float(tn) - float(fp) * float(fn)) / math.sqrt((float(tp) + float(fp)) * (float(tp) + float(fn)) * (float(tn) + float(fp)) * (float(tn) + float(fn))), 3)
    f_output = open(args.output, 'a')
    f_output.write('=======\n')
    f_output.write('{}epochs_{}batch_resnet50\n'.format(epochs, batch_size))
    f_output.write('TN: {}\n'.format(tn))
    f_output.write('FN: {}\n'.format(fn))
    f_output.write('TP: {}\n'.format(tp))
    f_output.write('FP: {}\n'.format(fp))
    f_output.write('TPR: {}\n'.format(TPR))
    f_output.write('FPR: {}\n'.format(FPR))
    f_output.write('accuracy: {}\n'.format(accuracy))
    f_output.write('specitivity: {}\n'.format(specitivity))
    f_output.write('sensitivity : {}\n'.format(sensitivity))
    f_output.write('mcc : {}\n'.format(mcc))
    f_output.write('{}'.format(report))
    f_output.write('=======\n')
    f_output.close()
    end_time = time.monotonic()
    print('Duration : {}'.format(timedelta(seconds=end_time - start_time)))

The main function serves as the starting point for the code and it contains several lines of code that set up the necessary components for training and evaluation. The start_time variable records the current time using the time.monotonic function. The parser variable is used to create an ArgumentParser object, which enables the code to take in command line arguments. These arguments, such as the location of the dataset and the desired image dimensions, will be used to customize the training process. The add_argument function is then called several times to add specific arguments to the parser. These arguments have parameters such as help to provide a description of the argument, type to specify the datatype, and default to set a default value. The parse_args function is then called to parse the arguments and store them in the args variable. The code then uses these arguments to build the dataset and specify the parameters for the neural network model. After the model is trained, the code evaluates its performance by calculating metrics such as accuracy, sensitivity, and specificity. These metrics are then written to a text file using the open and write functions. Finally, the end_time variable records the current time once again and the timedelta function is used to calculate the duration of the code.

dataset_traditional.py

def dataset(base_dir, n):
    d = defaultdict(list)
    for (root, subdirs, files) in os.walk(base_dir):
        for filename in files:
            file_path = os.path.join(root, filename)
            assert file_path.startswith(base_dir)
            suffix = file_path[len(base_dir):]
            suffix = suffix.lstrip('/')
            label = suffix.split('/')[0]
            d[label].append(file_path)
    tags = sorted(d.keys())
    X = []
    y = []
    for (class_index, class_name) in enumerate(tags):
        filenames = d[class_name]
        for filename in filenames:
            img = scipy.misc.imread(filename)
            new_shape = img.shape[0] * img.shape[1] * 3
            img_as_array = img[:, :, :3].reshape(new_shape)
            (height, width, chan) = img.shape
            assert chan == 3
            X.append(img_as_array)
            y.append(class_index)
    X = np.array(X).astype(np.float32)
    y = np.array(y)
    return (X, y, tags)

This python code snippet is a function that takes in two parameters, a directory path and an integer. The function initializes a dictionary using the defaultdict function and creates empty lists as values. It then uses the os.walk function to iterate through the files in the given directory and append their file paths to the corresponding value list in the dictionary based on their label. The labels are determined by the file path structure, with the first subdirectory in the path indicating the label. After all the file paths have been added to the dictionary, the function then sorts the keys labels and uses them to create two empty lists. It then iterates through the dictionary, retrieving the file paths for each label and reading the images using the scipy.misc.imread function. The images are then reshaped into a single vector and appended to the first list, while the corresponding label index is appended to the second list. The lists are then converted into numpy arrays and returned along with the list of labels. Essentially, this function takes in a directory containing images with subdirectories indicating different classes, reads and reshapes the images, and returns a dataset matrix and labels for each image, which can be used for training or testing a machine learning model.

get_data.py

def main():
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('-sd', '--start_date', type=str, default='1990-01-01', help='Start date parameter value - format YYYY-MM-DD')
    parser.add_argument('-ed', '--end_date', type=str, default=arrow.now().format('YYYY-MM-DD'), help='End date parameter - format YYYY-MM-DD')
    parser.add_argument('-t', '--ticker', nargs='+', help='<Required> Set flag', required=True)
    parser.add_argument('-s', '--source', help='<Required> set source', required=True)
    parser.add_argument('-a', '--attempt', help='set max attempt to download', default=10)
    parser.add_argument('-e', '--exist', help='check exist stock history file', default=False)
    parser.add_argument('-p', '--prefix', help='add prefix in output name')
    args = parser.parse_args()
    prefix_name = ''
    if not os.path.isdir('../stockdatas'):
        os.mkdir('../stockdatas')
    if len(args.prefix) > 1:
        prefix_name = args.prefix
    if args.source == 'tiingo':
        for ticker in set(args.ticker):
            fetch_tiingo_data(ticker, args.start_date, args.end_date, '../stockdatas/{}_{}.csv'.format(ticker, prefix_name))
    elif args.source == 'yahoo':
        for ticker in set(args.ticker):
            fetch_yahoo_data(ticker, args.start_date, args.end_date, '../stockdatas/{}_{}.csv'.format(ticker, prefix_name), args.attempt, args.exist)

This python code snippet is a code that downloads historical stock data from either the tiingo or yahoo source. It takes in command line arguments using the argparse library. These arguments specify the start date, end date, ticker symbol, source, max attempt to download, whether to check if the file already exists, and the preferred output file name prefix. The code creates a parser with the specified arguments and uses it to parse the given arguments. If the prefix is longer than 1 character, it is assigned to the variable prefix_name. The code then makes a directory named stockdatas if one does not already exist. Lastly, it loops through the ticker symbols and uses the specified sources function to download the data and save it in a csv file, with the ticker symbol and prefix as part of the file name. If the source is yahoo, the max attempt and existence of the file are also taken into account.

def fetch_tiingo_data(ticker, start_date, end_date, fname):
    url = 'https://api.tiingo.com/tiingo/daily/{ticker}/prices?startDate={start_date}&endDate={end_date}&token={token}'
    token = 'ca5a6f47a99ae61051e4de63b26f727b1709a01d'
    data = pd.read_json(url.format(ticker=ticker, start_date=start_date, end_date=end_date, token=token))
    data.to_csv(fname, columns=['date', 'open', 'close', 'high', 'low', 'volume', 'adjClose'], index=False)

This code snippet is used for fetching daily stock price data from the website tiingo.com for a specific stock ticker within a specified time period. The function fetch_tiingo_data takes four arguments: ticker, start_date, end_date, and fname. The first argument, ticker, is the stock symbol. The second and third arguments, start_date and end_date, represent the start and end dates for the data to be fetched, respectively. The last argument, fname, is the name of the file where the data will be saved. The code then uses a url with placeholders for the arguments to retrieve the data from tiingo.com. A token is also included in the url for authentication purposes. Once the data is fetched, it is converted from JSON format to a Pandas dataframe using the read_json method. The desired columns are then selected and the data is saved in a CSV file specified by the fname argument. Essentially, this code snippet automates the process of retrieving and saving daily stock price data for a specific stock ticker.

def fetch_yahoo_data(ticker, start_date, end_date, fname, max_attempt, check_exist):
    if os.path.exists(fname) == True and check_exist:
        print('file exist')
    else:
        if os.path.exists(fname):
            os.remove(fname)
        for attempt in range(max_attempt):
            time.sleep(2)
            try:
                dat = data.get_data_yahoo(''.join('{}'.format(ticker)), start=start_date, end=end_date)
                dat.to_csv(fname)
            except Exception as e:
                if attempt < max_attempt - 1:
                    print('Attempt {}: {}'.format(attempt + 1, str(e)))
                else:
                    raise
            else:
                break

The code snippet is a function that fetches data from Yahoo Finance. It takes in the parameters of the stock ticker, start and end dates for the data, a file name, the maximum number of attempts, and a flag to check if the file already exists. The first if statement checks if the file already exists and if the check_exist flag is set to true. If both conditions are met, it prints a message stating that the file exists. If the file does not exist, or if the check_exist flag is set to false, the code moves to the else statement. Here, it checks if the file exists and if it does, it removes it. This ensures that there is no existing file with the same name. Next, a for loop is initiated with the number of attempts set by the user. The loop will keep running until the maximum number of attempts is reached. Within the loop, there is a time delay set to two seconds to avoid overwhelming the Yahoo Finance servers with requests. Inside the loop, the code attempts to retrieve the data from Yahoo Finance using the stock ticker, start and end dates provided. If successful, the data is saved to a CSV file with the specified file name. However, if there is an exception, the code will print a message stating the attempt number and the error. If the number of attempts is less than the maximum allowed, the loop will continue to try again. But if it reaches the maximum number of attempts, the code will raise an error. Once the data is successfully retrieved and saved, the code breaks out of the loop and the function ends.

def dataset(base_dir, n):
    d = defaultdict(list)
    for (root, subdirs, files) in os.walk(base_dir):
        for filename in files:
            file_path = os.path.join(root, filename)
            assert file_path.startswith(base_dir)
            suffix = file_path[len(base_dir):]
            suffix = suffix.lstrip('/')
            label = suffix.split('/')[0]
            d[label].append(file_path)
    tags = sorted(d.keys())
    X = []
    y = []
    for (class_index, class_name) in enumerate(tags):
        filenames = d[class_name]
        for filename in filenames:
            img = scipy.misc.imread(filename)
            (height, width, chan) = img.shape
            assert chan == 3
            X.append(img)
            y.append(class_index)
    X = np.array(X).astype(np.float32)
    y = np.array(y)
    return (X, y, tags)

This code snippet is used to load a dataset that is in a specific format. The function takes in two parameters, the base directory of the dataset and the number of the images being used. The code defines a defaultdict, which is a dictionary that has default values for keys that have not been set yet. The for loops are used to walk through the dataset and access each file. The file path and suffix are created to manipulate the file path and get the label of each image. Finally, the tags are sorted and the images are loaded and stored as an array, along with their corresponding labels. This function is useful for quickly loading and organizing a dataset for machine learning tasks.

Stock Market
Deep Learning
Data Science
Finance
Recommended from ReadMedium