avatarRashida Nasrin Sucky

Summary

The web content describes the use of autoencoders in TensorFlow and Keras for anomaly detection, an unsupervised learning technique that compresses and reconstructs input data to reduce noise and dimensionality.

Abstract

The article introduces the concept of autoencoders as an unsupervised learning method within TensorFlow and Keras frameworks, focusing on their application in anomaly detection. It explains how autoencoders encode input data to a reduced form, effectively compressing it, and then reconstruct it to eliminate noise and reduce dimensionality, thus highlighting the actual features of the data. The author illustrates this process with a practical example using a public dataset of images, where class 5 is considered valid data and class 1 as anomalies. The model development section details the construction of a convolutional autoencoder, including the encoder and decoder components, and demonstrates how to train the model using only the input data, without labels. The article concludes with the successful detection of anomalies in the test dataset, validating the model's effectiveness, and invites readers to explore further reading on related topics.

Opinions

  • The author believes that autoencoders are a powerful tool for noise reduction, dimensionality reduction, and anomaly detection.
  • The article conveys that unsupervised learning methods like autoencoders are particularly useful when labeled data is scarce or unavailable.
  • The author suggests that the autoencoder method is versatile and can be applied to a variety of tasks beyond anomaly detection.
  • There is an emphasis on the importance of using advanced tools such as TensorFlow and Keras for handling complex data in machine learning tasks.
  • The author encourages the use of GPU acceleration in Google Colab for faster model training, indicating a preference for efficiency in model development.
  • The successful identification of all anomalies in the dataset is presented as a validation of the autoencoder approach described in the tutorial.
Photo by Leiada Krozjhen on Unsplash

Anomaly Detection in TensorFlow and Keras Using the Autoencoder Method

A cutting-edge unsupervised method for noise removal, dimensionality reduction, anomaly detection, and more

All the tutorials about TensorFlow and neural networks I have shared until now have been about supervised learning. This one will be about the Autoenocder which is an unsupervised learning technique. If I want to express it simply, autoencoders reduce the noises from the data by compressing the input data, and encoding and reconstructing the data. That way autoencoders can reduce the dimensionality or the noise of the data and focus on the real focal point of the input data.

As you can see from the introduction to the autoencoders here there is more than one process required.

  1. First, a model to compress the input data which is the encoder model.
  2. Then another model to reconstruct the compressed data that should be as close as the input data which is a decoder model.

In this process, it can remove the noise, reduce the dimensionality, and clear up the input data.

In this tutorial, I will explain in detail how an autoencoder works with a working example.

For this example, I chose to use a public dataset (Apache License 2.0) named deep_weeds.

import tensorflow as tf
import tensorflow_datasets as tfds
ds = tfds.load('deep_weeds', split='train', shuffle_files=True)

Data Preparation

We need to prepare a dataset for this unsupervised anomaly detection example. Only one class will be taken as our main class that will be considered as the valid class. And I will put a few data from another class as an anomaly. Then we will develop the model to see if we can find that few anomaly data.

I chose class 5 as the valid class and class 1 as the anomaly. In the code block below, I am taking all the data of classes 5 and 1 first and creating lists of the images and their corresponding labels.

import numpy as np
images_main = []
images_anomaly = []
labels_main= []
labels_anomaly = []
ds = ds.prefetch(tf.data.AUTOTUNE)
for example in ds:
  #print(np.array(example['label']))
  if np.array(example['label']) == 5:
    images_main.append(example["image"])
    labels_main.append(example["label"])
  if np.array(example['label']) == 1:
    images_anomaly.append(example["image"])
    labels_anomaly.append(example["label"])

Let’s see the shape of the main image (images of class 5) data here:

np.array(images_main).shape

Output:

(1009, 256, 256, 3)

The image shapes are (256, 256, 3) and we have a total of 1009 data for class 5.

However, we do not need all the data from class 1. Because class 1 is the anomaly class. So, only 1% of the class 1 data will be taken for the training.

parc = round(len(labels_anomaly) * 0.01)
images_anomaly = np.array(images_anomaly)[:parc]
# stacking the main images and anomaly images together
total_images = np.vstack([images_main, images_anomaly])

The shape of the total_images:

total_images.shape

Output:

(1020, 256, 256, 3)

We have a total of 1020 images for training. As we saw earlier, we have 1009 class 5 images, and we took 1020–1009 = 11 of class 1 images which is our anomaly.

Let’s see if we can develop an autoencoder model in Keras and Tensorflow to detect these anomalies.

Model Development

This is the fun part! But first, we should do the necessary imports:

# import the necessary packages
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Conv2DTranspose
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Reshape
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Model
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import load_model
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import numpy as np
import random
import cv2

Some of the data should be kept separately for testing purposes. The train_test_split method from the sklearn library can be used for that. Remember, as this is an unsupervised learning method, the labels are not necessary. We will only split the images.

(train_x, test_x) = train_test_split(total_images, test_size=0.2, random_state=0)

Finally, the autoencoder model. We will build a Convolution_Autoencoder class which is a Convolutional Neural Network. The class has the build method where we will define the Autoencoder model.

The ‘build’ takes width, depth, height, filters, and latentDim as parameters. Here, width, depth, and height are the dimensions of the images that is (256, 256, 3) for us as we have seen with the total_images.shape method above.

The parameter ‘filters’ is the filter for the convolution layers.

The ‘latentDim’ is the size of our compressed layer after the encoder method.

In this build method, the first part is an encoder model which is a simple Convolutional Neural Network.

Once the encoder portion is done, a decoder model is developed using Conv2DTranspose layers to reconstruct the data again.

Then, we construct the autoencoder model which is actually a combination of both encoder and decoder models.

Finally, we return the encoder, decoder, and autoencoder models.

class Convolution_Autoencoder:
  @staticmethod
  def build(width, height, depth, filters=(16, 32, 64), latentDim=32):
    input_shape = (height, width, depth)
    chanDim = -1

    inputs = Input(shape=input_shape)
    x = inputs

    for f in filters:
      x = Conv2D(f, (3, 3), strides = 2, padding="same")(x)
      x = LeakyReLU(alpha=0.3)(x)
      x = BatchNormalization(axis=chanDim)(x)

    volume = K.int_shape(x)
    x = Flatten()(x)
    latent = Dense(latentDim)(x)
    
    #encoder model
    encoder = Model(inputs, latent, name="encoder")
    
    #compressed representation
    latent_layer_input = Input(shape=(latentDim,))
    x = Dense(np.prod(volume[1:]))(latent_layer_input)

    x = Reshape((volume[1], volume[2], volume[3]))(x)
    
    #Recostructing the image with a decoder model
    for f in filters[::-1]:
      x = Conv2DTranspose(f, (3, 3), strides=2, padding="same")(x)
      x = LeakyReLU(alpha=0.3)(x)
      x = BatchNormalization(axis=chanDim)(x)

    x = Conv2DTranspose(depth, (3, 3), padding="same")(x)

    outputs = Activation("sigmoid")(x)

    decoder = Model(latent_layer_input, outputs, name="decoder")

    autoencoder = Model(inputs, decoder(encoder(inputs)), name="autoencoder")

    return (encoder, decoder, autoencoder)

Model development is done. It’s time to run the model and see if it works. It should run like any other TensorFlow model.

Here we will compile the model first with Adam optimizer. And also, I used a decay in the learning rate and the ‘mse’ as the loss.

epochs = 50
lr_start = 0.001
batchSize = 32

(encoder, decoder, autoencoder) = Convolution_Autoencoder.build(256, 256, 3)
opt = tf.keras.optimizers.legacy.Adam(lr = lr_start, decay = lr_start / epochs)
autoencoder.compile(loss = "mse", optimizer = opt)

Finally, running the model. Remember, this is an unsupervised learning method. So there won't be any label in the model training. Instead, we need to pass two training features which will be just train_x twice. If you notice the build method in the Convolution_Autoencoder class, autoencoder looks like this there:

autoencoder = Model(inputs, decoder(encoder(inputs)), name="autoencoder")

In the Model above, we need to pass inputs which is train_x first, and then decoder(encoder(inputs)) where we need to pass the train_x again. Same for the test_x as well.

Before you begin the mode training, I should warn you that it is very slow in the default setting of Google Colab. You can make it way faster by running this in the GPU. Please change the settings of your Google Colab notebook before you run this.

history = autoencoder.fit(
 train_x, train_x,
 validation_data=(test_x, test_x),
 epochs=30,
 batch_size=batchSize)

Output:

Epoch 1/30
26/26 [==============================] - 15s 157ms/step - loss: 12963.2842 - val_loss: 13428.3906
Epoch 2/30
26/26 [==============================] - 2s 87ms/step - loss: 12924.1787 - val_loss: 13392.3418
Epoch 3/30
26/26 [==============================] - 2s 88ms/step - loss: 12911.4551 - val_loss: 13401.3350
Epoch 4/30
26/26 [==============================] - 2s 92ms/step - loss: 12905.8975 - val_loss: 13344.5596
...
...
Epoch 27/30
26/26 [==============================] - 2s 89ms/step - loss: 12890.9102 - val_loss: 13322.1299
Epoch 28/30
26/26 [==============================] - 2s 89ms/step - loss: 12890.8701 - val_loss: 13322.0820
Epoch 29/30
26/26 [==============================] - 2s 89ms/step - loss: 12890.8428 - val_loss: 13322.0488

As you can see there are not many changes to losses, simply because here we do not have labels. Instead, we pass the training features to it twice. Losses come from comparing the original images to the reconstructed images by autoencoders.

Model Evaluation

Model evaluation is different than a regular supervised learning model in autoencoders as this is not a supervised learning method. Let’s do that step by step.

First, we will do the prediction as usual, which will be the decoded images by the autoencoder model.

Then, you calculate the mean squared error using the original errors and the reconstructed error and save it to the ‘errors’ list. Here is the code for that.

decoded = autoencoder.predict(test_x)
errors = []

for (image, recon) in zip(total_images, decoded):
  mse = np.mean((image - recon) ** 2)
  errors.append(mse)

As we have the ‘mse’ for all the images in the test set, we choose a threshold. Here I am using 95% quantile using np. quantile method and getting indices from the ‘errors’ where ‘mse’ is greater than the threshold. When ‘mse’ is greater than the threshold error we decided we will consider them as an anomaly.

thresh = np.quantile(errors, 0.95)
idxs = np.where(np.array(errors) >= thresh)[0]
idxs

Output:

array([  9,  10,  35,  59,  84, 134, 146, 188, 200, 201, 202])

Now, let’s get back to the image dataset ‘total_images’ that we prepared for the training earlier. We need to check if the indices we have which are more than the threshold are actually the anomaly:

for i in idxs:
  if total_images[i] in images_anomaly:
    print(True)

Output:

True
True
True
True
True
True
True
True
True
True
True

Yes!! They are all anomaly data. If you count the number of ‘True’ above we have 11 ‘True’ here. We can check how many anomaly data we originally had in the ‘images_anomaly’:

len(images_anomaly)

Output:

11

So, we found all the anomaly data using the autoencoder model.

Conclusion

I have another anomaly detection tutorial that uses probability to find the anomaly. Please check the ‘More Reading’ section below. Here we used TensorFlow and Keras which are much more advanced tools for images and more complex data. As I mentioned in the Introduction, autoencoders can be used in a variety of other tasks as well. I will be sharing more use cases in my future posts on autoencoders and also more cutting-edge techniques in TensorFlow and Keras.

Feel free to follow me on Twitter and like my Facebook page.

More Reading:

A Complete Anomaly Detection Algorithm From Scratch in Python: Step by Step Guide | by Rashida Nasrin Sucky | Towards Data Science (medium.com)

Implementation of a Siamese Network in Keras and TensorFlow | by Rashida Nasrin Sucky | Aug, 2023 | Towards Data Science (medium.com)

Complete Implementation of a Mini VGG Network for Image Recognition | by Rashida Nasrin Sucky | Towards Data Science (medium.com)

Using a Keras Tuner for Hyperparameter Tuning of a TensorFlow Model | by Rashida Nasrin Sucky | Towards AI (medium.com)

Map, Filter, and CombinePerKey Transforms in Writing Apache Beam Pipelines with Examples | by Rashida Nasrin Sucky | Towards Data Science (medium.com)

Artificial Intelligence
Machine Learning
Data Science
Programming
Technology
Recommended from ReadMedium