avatarAndrej Baranovskij

Free AI web copilot to create summaries, insights and extended knowledge, download it at here

5925

Abstract

e_columns<span class="hljs-selector-class">.append</span>(day_part_one_hot)

feature_layer = tf<span class="hljs-selector-class">.keras</span><span class="hljs-selector-class">.layers</span><span class="hljs-selector-class">.DenseFeatures</span>(feature_columns)</pre></div><p id="1aae">Function <b><i>build_dataset</i></b> converts Pandas data into TensorFlow dataset. Such dataset can be sent directly to Keras model training:</p><div id="5af4"><pre><span class="hljs-attr">batch_size</span> = <span class="hljs-number">16</span></pre></div><div id="f9b0"><pre>train_ds = df_to_dataset(train, <span class="hljs-attribute">shuffle</span>=<span class="hljs-literal">False</span>,batch_size=batch_size) val_ds = df_to_dataset(val, <span class="hljs-attribute">shuffle</span>=<span class="hljs-literal">False</span>, <span class="hljs-attribute">batch_size</span>=batch_size) test_ds = df_to_dataset(normed_df_test, <span class="hljs-attribute">shuffle</span>=<span class="hljs-literal">False</span>, batch_size = batch_size)</pre></div><p id="c307">I’m using batch size = 16. Based on my testing, this is an optimal batch to find the best fit during model training for given data. We don’t need to shuffle training data, it was already shuffled during split into training and validation sets.</p><p id="2f6b">Function <b><i>build_model</i></b> constructs the sequential Keras model. The model accepts as a first layer — feature layer (the list of features and how they are represented). There is no need to specify the input dimension. There are three layers, the third layer is the output layer (one unit/neuron). The first two layers are set with 16 and 8 units/neurons respectively and <i>relu</i> activation. The number of layers and units is selected by experimenting with different settings.</p><div id="73ec"><pre>model = tf.keras.Sequential([ feature_layer, layers.Dense(16, <span class="hljs-attribute">activation</span>=<span class="hljs-string">'relu'</span>), layers.Dense(8, <span class="hljs-attribute">activation</span>=<span class="hljs-string">'relu'</span>), layers.Dense(1) ])</pre></div><div id="2d5e"><pre>optimizer = tf<span class="hljs-selector-class">.keras</span><span class="hljs-selector-class">.optimizers</span><span class="hljs-selector-class">.RMSprop</span>(<span class="hljs-number">0.001</span>)

model<span class="hljs-selector-class">.compile</span>(loss=<span class="hljs-string">'mse'</span>, optimizer=optimizer, metrics=<span class="hljs-selector-attr">[<span class="hljs-string">'mae'</span>, <span class="hljs-string">'mse'</span>]</span>)</pre></div><p id="208a">The model is trained in <b><i>train_model</i></b> function. Training is repeated from scratch for ten times. The best model is saved as a result. The model is saved in TensorFlow format. After the training loop completes the best model is copied to the deployment folder accessible by API endpoint. Each new model is stored with a timestamp, this allows to implement model versioning. Training is configured with EarlyStopping callback, when there is no improvement in 10 epochs, training is stopped. Model training with TensorFlow dataset:</p><div id="c0c2"><pre><span class="hljs-attr">EPOCHS</span> = <span class="hljs-number">1000</span> <span class="hljs-comment"># The patience parameter is the amount of epochs to check </span> <span class="hljs-comment"># for improvement</span> <span class="hljs-attr">early_stop</span> = tf.keras.callbacks.EarlyStopping(monitor=<span class="hljs-string">'val_loss'</span>, patience=<span class="hljs-number">10</span>)

<span class="hljs-attr">history</span> = model.fit(train_ds, <span class="hljs-attr">validation_data</span>=val_ds, <span class="hljs-attr">epochs</span>=EPOCHS, <span class="hljs-attr">verbose</span>=<span class="hljs-number">0</span>, <span class="hljs-attr">callbacks</span>=[early_stop])</pre></div><p id="b01d">Saving the best model. The model is evaluated based on the test set. I included data with Report Parameters > 10 into the test set, to be able to test how regression works on unseen data. The model should pick up a rule during training — with a larger number of report parameters, execution time should be shorter. I was glad to see that model was actually learning this rule correctly:</p><div id="febf"><pre>loss, mae, mse = model.evaluate(test_ds, <span class="hljs-attribute">verbose</span>=0)</pre></div><div id="ff5d"><pre>rmse = <span class="hljs-built_in">math</span>.<span class="hljs-built_in">sqrt</span>(mse) <span class="hljs-built_in">print</span>(<span class="hljs-string">"Testing set RMSE Error:{:5.2f}"</span>.<span class="hljs-built_in">format</span>(<span class="hljs-built_in">math</span>.<span class="hljs-built_in">sqrt</span>(mse))) <span class="hljs-keyword">if</span> rmse < best_rmse: <span class="hljs-built_in">print</span>(<span class="hljs-string">"Saving model with RMSE Error {:5.2f}"</span>.<span class="hljs-built_in">format</span>(<span class="hljs-built_in">math</span>.<span class="hljs-built_in">sqrt</span>(mse))) model.save(<span class="hljs-string">'./model_exec_time_temp/'</span>, save_format=<span class="hljs-string">'tf'</span>)

best_history = history
best_rmse = rmse</pre></div><p id="19d5">Copying the best model into a directory accessible by API, along with timestamp value for versioning:</p><div id="52af"><pre>ts = calendar<span class="hljs-selector-class">.timegm</span>(<span class="hljs-selector-tag">time</span><span class="hljs-selector-class">.gmtime</span>())

<span class="hljs-function"><span class="hljs-title">print</span><span class="hljs-params">(<span class="hljs-string">'Creating new model:'</span>, ts)</span></span> <span class="hljs-function"><span class="hljs-title">

Options

copyDirectory</span><span class="hljs-params">(<span class="hljs-string">'./model_exec_time_temp/'</span>, <span class="hljs-string">'./model_exec_time/'</span> + str(ts)</span></span>)</pre></div><p id="bdaf">This is the model training result with RMSE = 8.02 seconds (this means ~8 seconds error calculating report execution time for the test set data). We can see that it took around 100 epochs before EarlyStopping callback terminated training:</p><figure id="48ce"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*kQP60ROp_Gv5uHbBYUacFg.png"><figcaption>Screenshot author: Andrej Baranovskij</figcaption></figure><p id="4401">Function <b><i>run_predict</i></b> accepts input data for inference, loads the latest available model, and executes predict call.</p><p id="645b"><b>Endpoint</b></p><p id="66a9">Endpoint logic is implemented in <i>report_exec_time_endpoint.py</i></p><p id="b4d2">In this script, I implemented both re-training <a href="https://apscheduler.readthedocs.io/en/stable/">scheduler</a> and REST API. Re-training runs in the background thread, this allows keeping REST API running without blocking.</p><p id="a0e4">The scheduler is configured to run once per day, but it’s up to you to configure re-training intervals. It makes sense to re-train when new data becomes available:</p><div id="5ee2"><pre><span class="hljs-comment"># create scheduler</span> <span class="hljs-keyword">scheduler </span>= <span class="hljs-keyword">BackgroundScheduler() </span><span class="hljs-keyword">scheduler.start()</span></pre></div><div id="698b"><pre><span class="hljs-comment"># Using UTC time to schedule job, once per day.</span> scheduler.add_job( <span class="hljs-attribute">func</span>=report_model.train_model, <span class="hljs-attribute">trigger</span>=<span class="hljs-string">'cron'</span>, <span class="hljs-attribute">hour</span>=<span class="hljs-string">'9'</span>, <span class="hljs-attribute">minute</span>=<span class="hljs-string">'45'</span>)</pre></div><p id="c2a9">REST API is implemented with <a href="https://flask.palletsprojects.com/en/1.1.x/">Flask</a>. Request parameters are sent for inference into the model (automatically picked up the best latest model trained from the scheduler, or you can implement other logic based on model versioning) predict method, the result is returned back to the client:</p><div id="a936"><pre><span class="hljs-variable">app</span> = <span class="hljs-function"><span class="hljs-title">Flask</span>(<span class="hljs-variable">name</span>)</span> <span class="hljs-function"><span class="hljs-title">CORS</span>(<span class="hljs-variable">app</span>)</span></pre></div><div id="4ac2"><pre>@app.route(<span class="hljs-string">"/katana-ml/api/v1.0/predict/reporttime"</span>, methods=[<span class="hljs-string">'POST'</span>]) def predict(): report_id = request.json[<span class="hljs-string">'report_id'</span>] report_params = request.json[<span class="hljs-string">'report_params'</span>] day_part = request.json[<span class="hljs-string">'day_part'</span>]

input_data = [[report_id, report_params, day_part]]
result = report_model.run_predict(input_data)

return str(result[<span class="hljs-number">0</span>][<span class="hljs-number">0</span>])</pre></div><div id="559e"><pre><span class="hljs-comment"># running REST interface port=3000</span>

<span class="hljs-keyword">if</span> name == <span class="hljs-string">"main"</span>: app.<span class="hljs-built_in">run</span>(<span class="hljs-attribute">debug</span>=<span class="hljs-literal">False</span>, <span class="hljs-attribute">host</span>=<span class="hljs-string">'0.0.0.0'</span>, <span class="hljs-attribute">port</span>=3000)</pre></div><p id="ef4a">Let’s verify the model. Take for example this data from the model training set (Report ID, Report Parameters, Day Part = Time):</p><div id="1179"><pre><span class="hljs-attribute">1</span>, <span class="hljs-number">10</span>, <span class="hljs-number">3</span> = <span class="hljs-number">440</span></pre></div><p id="7bc4">Do inference with 15 report parameters. As expected, execution time is shorter = 429, this means model was trained correctly:</p><figure id="71e4"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*Ly96mmeOnEFvkOlFKgXONw.png"><figcaption>Screenshot author: Andrej Baranovskij</figcaption></figure><p id="9eb0">With 11 report parameters, execution time is slightly longer, as it should:</p><figure id="e601"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*p7oB9VbrNThb6JBri1v4Og.png"><figcaption>Screenshot author: Andrej Baranovskij</figcaption></figure><p id="cafe">You can play around with different Report ID and Day Part values, to see how execution time calculation changes.</p><p id="7409"><b>Deployment</b></p><p id="84c6">Python endpoint process is running using <a href="https://pm2.keymetrics.io/">PM2</a> manager. Start process with PM2 command:</p><div id="b453"><pre>pm2 <span class="hljs-literal">start</span> report_exec_time_endpoint.py</pre></div><figure id="f9e5"><img src="https://cdn-images-1.readmedium.com/v2/resize:fit:800/1*wSBBNu18BLpphG1rv6Q4Eg.png"><figcaption>Screenshot author: Andrej Baranovskij</figcaption></figure><p id="0255"><b>Conclusion</b></p><p id="4ccd">Hopefully, the information described in this article will help you to run a scalable Machine Learning pipeline in production. I believe the core part is an automated re-training option, this helps to keep the model up to date, when new data becomes available. The ability to run scheduled re-training in separate thread allows us to manage it in the same process as API endpoint. This makes pipeline simpler and more maintainable.</p><p id="2645"><b>Source code</b>: <a href="https://github.com/abaranovskis-redsamurai/automation-repo/tree/master/pipeline">GitHub</a> repo.</p></article></body>

Simple Machine Learning Pipeline

Bringing together all essential parts to build a simple, but powerful Machine Learning pipeline. This will cover Keras/TensorFlow model training, testing, auto re-training, and REST API

Photo by Alexei_other on Pixabay

In this article, I’m going to cover multiple topics and explain how to build Machine Learning pipeline. What is ML pipeline? This is a solution that helps to re-train ML model automatically and make it available through API. Re-training intervals can be configured through a scheduler, the model can be updated daily or at any other selected intervals.

The brief architecture of the solution:

Simple ML pipeline (author: Andrej Baranovskij)

I’m using Keras to build a model, which can calculate wait time for enterprise report generation. Time is calculated based on Report ID, Report Parameters, and Day Part. There is a common rule, the one which model picks up during training — report run slower with fewer parameters and in the second part of the day.

Python function which trains Keras model is fetching data, processing it, and then calls Keras API to fit the model. Model training is done in ten separate calls, to be able to select the best result. The best result is selected based on model evaluation results with test data, more on this later. The new model is saved using a timestamp, this allows the current model still to be served if there are any active calls happening at the same time (or maybe some users still would prefer to call the previous model — this adds extra flexibility).

Model re-training is executed from APScheduler. I’m using the background scheduler option, this allows us to run it together with REST API process (I’m using PM2 to run Python process). This makes it more simple to control and REST API call is not blocked when re-training task runs.

REST API runs using Flask library. Model predict function is executed by loading the latest available model with tf.keras.models.load_model API and calling predict function on top of it.

Model

The model logic is implemented in report_exec_time_model.py.

Function fetch_data reads data from CSV. In this example, it always reads the same data (for simplicity reasons), but in the actual implementation, for each re-training, most likely you would read new data. Both training and test data sets are fetched. Training data contains 0–10 values for Report Parameters feature, test data contains 11–20. This is done on purpose, we are testing with data that was not known during training. This would allow us to check if the model was able to pick up the correct trend — execution time should decrease with more report parameters. Feature Report Parameters is normalized by calculating log to make the value smaller, hence help the model to learn it better. The same is applied for both training and test datasets. 20% of the training data is used for validation. Data structure:

Screenshot author: Andrej Baranovskij

Test data for Report Parameters is from a different set and differs in scale from a train set, but using log makes it possible to normalize data in different sets by the same rule:

# Normalize training feature - report_params
eps=0.001
dataframe['report_params'] = np.log(dataframe.pop('report_params') +eps)
normed_df = dataframe

Function build_feature_layer defines TensorFlow feature layer. This is a metadata layer that helps to execute automatic data conversion into the format which can be fed to a training algorithm. Features Report ID and Day Part are categorical. Both of these features are encoded as indicator columns using TensorFlow categorical feature support. Report Parameters feature is defined as a numeric column. All three columns are defined as TensorFlow feature layer — this allows to convert data without additional custom processing:

feature_columns = []

report_id = feature_column.categorical_column_with_vocabulary_list('report_id', [1, 2, 3, 4, 5])
report_id_one_hot = feature_column.indicator_column(report_id)
feature_columns.append(report_id_one_hot)

    feature_columns.append(feature_column.numeric_column('report_params'))

day_part = feature_column.categorical_column_with_vocabulary_list('day_part', [1, 2, 3])
day_part_one_hot = feature_column.indicator_column(day_part)
feature_columns.append(day_part_one_hot)
    
feature_layer = tf.keras.layers.DenseFeatures(feature_columns)

Function build_dataset converts Pandas data into TensorFlow dataset. Such dataset can be sent directly to Keras model training:

batch_size = 16
train_ds = df_to_dataset(train, shuffle=False,batch_size=batch_size)
val_ds = df_to_dataset(val, shuffle=False, batch_size=batch_size)
test_ds = df_to_dataset(normed_df_test, shuffle=False, batch_size = batch_size)

I’m using batch size = 16. Based on my testing, this is an optimal batch to find the best fit during model training for given data. We don’t need to shuffle training data, it was already shuffled during split into training and validation sets.

Function build_model constructs the sequential Keras model. The model accepts as a first layer — feature layer (the list of features and how they are represented). There is no need to specify the input dimension. There are three layers, the third layer is the output layer (one unit/neuron). The first two layers are set with 16 and 8 units/neurons respectively and relu activation. The number of layers and units is selected by experimenting with different settings.

model = tf.keras.Sequential([
        feature_layer,
        layers.Dense(16, activation='relu'),
        layers.Dense(8, activation='relu'),
        layers.Dense(1)
    ])
optimizer = tf.keras.optimizers.RMSprop(0.001)

model.compile(loss='mse',
              optimizer=optimizer,
              metrics=['mae', 'mse'])

The model is trained in train_model function. Training is repeated from scratch for ten times. The best model is saved as a result. The model is saved in TensorFlow format. After the training loop completes the best model is copied to the deployment folder accessible by API endpoint. Each new model is stored with a timestamp, this allows to implement model versioning. Training is configured with EarlyStopping callback, when there is no improvement in 10 epochs, training is stopped. Model training with TensorFlow dataset:

EPOCHS = 1000
# The patience parameter is the amount of epochs to check 
# for improvement
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10)
        
history = model.fit(train_ds,
                      validation_data=val_ds,
                      epochs=EPOCHS,
                      verbose=0,
                      callbacks=[early_stop])

Saving the best model. The model is evaluated based on the test set. I included data with Report Parameters > 10 into the test set, to be able to test how regression works on unseen data. The model should pick up a rule during training — with a larger number of report parameters, execution time should be shorter. I was glad to see that model was actually learning this rule correctly:

loss, mae, mse = model.evaluate(test_ds, verbose=0)
rmse = math.sqrt(mse)
print("Testing set RMSE Error:{:5.2f}".format(math.sqrt(mse)))
if rmse < best_rmse:
    print("Saving model with RMSE Error {:5.2f}".format(math.sqrt(mse)))
    model.save('./model_exec_time_temp/', save_format='tf')
            
    best_history = history
    best_rmse = rmse

Copying the best model into a directory accessible by API, along with timestamp value for versioning:

ts = calendar.timegm(time.gmtime())
print('Creating new model:', ts)
copyDirectory('./model_exec_time_temp/', './model_exec_time/' + str(ts))

This is the model training result with RMSE = 8.02 seconds (this means ~8 seconds error calculating report execution time for the test set data). We can see that it took around 100 epochs before EarlyStopping callback terminated training:

Screenshot author: Andrej Baranovskij

Function run_predict accepts input data for inference, loads the latest available model, and executes predict call.

Endpoint

Endpoint logic is implemented in report_exec_time_endpoint.py

In this script, I implemented both re-training scheduler and REST API. Re-training runs in the background thread, this allows keeping REST API running without blocking.

The scheduler is configured to run once per day, but it’s up to you to configure re-training intervals. It makes sense to re-train when new data becomes available:

# create scheduler
scheduler = BackgroundScheduler()
scheduler.start()
# Using UTC time to schedule job, once per day.
scheduler.add_job(
    func=report_model.train_model,
    trigger='cron',
    hour='9', 
    minute='45')

REST API is implemented with Flask. Request parameters are sent for inference into the model (automatically picked up the best latest model trained from the scheduler, or you can implement other logic based on model versioning) predict method, the result is returned back to the client:

app = Flask(__name__)
CORS(app)
@app.route("/katana-ml/api/v1.0/predict/reporttime", methods=['POST'])
def predict():
    report_id = request.json['report_id']
    report_params = request.json['report_params']
    day_part = request.json['day_part']
    
    input_data = [[report_id, report_params, day_part]]
    result = report_model.run_predict(input_data)
    
    return str(result[0][0])
# running REST interface port=3000
if __name__ == "__main__":
    app.run(debug=False, host='0.0.0.0', port=3000)

Let’s verify the model. Take for example this data from the model training set (Report ID, Report Parameters, Day Part = Time):

1, 10, 3 = 440

Do inference with 15 report parameters. As expected, execution time is shorter = 429, this means model was trained correctly:

Screenshot author: Andrej Baranovskij

With 11 report parameters, execution time is slightly longer, as it should:

Screenshot author: Andrej Baranovskij

You can play around with different Report ID and Day Part values, to see how execution time calculation changes.

Deployment

Python endpoint process is running using PM2 manager. Start process with PM2 command:

pm2 start report_exec_time_endpoint.py
Screenshot author: Andrej Baranovskij

Conclusion

Hopefully, the information described in this article will help you to run a scalable Machine Learning pipeline in production. I believe the core part is an automated re-training option, this helps to keep the model up to date, when new data becomes available. The ability to run scheduled re-training in separate thread allows us to manage it in the same process as API endpoint. This makes pipeline simpler and more maintainable.

Source code: GitHub repo.

Python
Machine Learning
Programming
TensorFlow
Keras
Recommended from ReadMedium