avatarRitam Mukherjee

Summary

The provided content outlines a comprehensive guide to building a customer churn prediction pipeline using Apache Airflow for data orchestration and machine learning techniques.

Abstract

The article presents a step-by-step approach to constructing a data engineering pipeline tailored for predicting customer churn in a SaaS company context. It begins with the extraction of a mock dataset containing customer information, followed by data cleaning and transformation, including feature engineering to capture patterns indicative of churn. The pipeline then proceeds to train a logistic regression model on the processed data, achieving an accuracy of 88.88% and an F1 score of 90.90%. Subsequently, the model is used to make predictions on new customer data, with the results saved to a CSV file. The entire workflow is automated using Apache Airflow, which schedules and executes the data extraction, transformation, training, and prediction tasks on a daily basis. The project structure is modular, facilitating manageability and scalability, and the article emphasizes the importance of combining data engineering with machine learning for actionable business insights.

Opinions

  • The author believes that a strong data pipeline is fundamental to the success of machine learning projects.
  • The use of Apache Airflow for orchestration is highly recommended for automating and managing complex data workflows.
  • The article suggests that capturing a broader pattern, including variables like age and login frequency, is crucial for accurate churn prediction.
  • The author values the modular design of the pipeline, as it allows for independent testing and improvement of each component.
  • There is an emphasis on the scalability of the setup, with the potential to adapt to larger datasets and evolving business needs.
  • The author encourages the audience to engage with the content by clapping and sharing, and to follow their Medium and LinkedIn profiles for further insights.

Data Engineering for ML: Building a Customer Churn Prediction Pipeline with Airflow

My articles are open to everyone; non-members can read the full article by clicking this link .

Photo by Jez Timms on Unsplash

In the machine learning world, a strong data pipeline is the foundation of any successful project. Here, we’ll walk through building a pipeline that predicts customer churn. With Apache Airflow for orchestration and a bit of machine learning, we’ll cover every step from data extraction, cleaning to making predictions on new data with our pipeline.

But first, let’s set up some context. Let us assume, we are running a SaaS company. In our use case, churn prediction can highlight customers who have reduced engagement. This will help the company proactively reach out to these customers with better offering to re-engage them. The pipeline considers multiple variables — like age, recent activity, and login frequency — to capture a broader pattern for this prediction.

Step 1: Data Extraction

The below script creates a mock dataset with customer information and saves it as customer_data.csv

# project_folder/scripts/extract_data.py
import pandas as pd
import numpy as np

def extract_data():
    data = {
        'customer_id': range(1, 31),
        'age': [25, 45, 34, 23, 30, 28, 37, np.nan, 45, 24, 31, 29, 41, 35, 44, 27, 32, 26, 49, 36, 22, 39, 43, 21, 48, 33, 40, 38, 27, 55],
        'location': ['NY', 'CA', 'TX', 'NY', 'CA', 'TX', 'NY', 'CA', 'TX', 'NY', 'CA', 'TX', 'NY', 'CA', 'TX', 'NY', 'CA', 'TX', 'NY', 'CA', 
                     'NY', 'CA', 'TX', 'NY', np.nan, 'TX', 'NY', 'CA', 'TX', 'NY'],
        'last_login': ['2024-10-05', '2024-10-07', '2024-09-30', '2024-10-01', '2024-10-06',
                       '2024-09-15', '2024-09-10', '2024-10-03', '2024-10-02', '2024-10-04',
                       '2024-09-25', '2024-09-20', '2024-10-05', '2024-10-07', '2024-09-30',
                       '2024-10-01', '2024-09-28', '2024-09-22', '2024-09-25', '2024-10-02',
                       '2024-10-05', '2024-10-07', '2024-09-30', '2024-10-01', '2024-10-06',
                       '2024-09-28', '2024-10-04', '2024-10-08', '2024-09-10', '2024-09-18'],
        'num_logins': [10, 20, 15, 8, 5, 3, 25, 40, 18, 22, 9, 7, np.nan, 35, 12, 6, 8, 15, 21, 10, 
                       12, 17, 6, 8, 14, 9, 19, 23, 11, 18]
    }
    df = pd.DataFrame(data)
    df.to_csv('<parent_path>/project_folder/tmp/customer_data.csv', index=False)

Step 2: Data Cleaning & Transformation

This script cleans and processes the data, adding a feature for the number of days since the last login, then saves the output to — transformed_customer_data.csv

# project_folder/scripts/transform_data.py
import pandas as pd

def transform_data():
    df = pd.read_csv('<parent_path>/project_folder/tmp/customer_data.csv')
    
    # Data Cleaning
    age_mean = df['age'].mean()
    df['age'].fillna(age_mean, inplace=True)
    df['location'].fillna('Unknown', inplace=True)
    df['num_logins'].fillna(0, inplace=True)
    
    # Feature Engineering
    df['last_login'] = pd.to_datetime(df['last_login'])
    df['days_since_login'] = (pd.to_datetime('today') - df['last_login']).dt.days
    
    # Define churn based on more complex patterns ie. days_since_login>40  or num_logins <10  
    # This label is used for training only
    df['churn'] = ((df['days_since_login'] > 40) | (df['num_logins'] < 10)).astype(int)
    df.to_csv('<parent_path>/project_folder/tmp/transformed_customer_data.csv', index=False)

Step 3: Model Training

This script trains a logistic regression model, which is one of the most popular algorithm for binary classification, on the transformed data and saves it.

# project_folder/scripts/train_model.py
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
import joblib

def train_model():
    df = pd.read_csv('<parent_path>/project_folder/tmp/transformed_customer_data.csv')
    
    # Features and target
    X = df[['age', 'days_since_login', 'num_logins']]
    y = df['churn']
    
    # Split data in training and testing set
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)
    
    # Train model
    model = LogisticRegression()
    model.fit(X_train, y_train)
    
    # Save model to pkl file
    joblib.dump(model, '<parent_path>/project_folder/tmp/churn_model.pkl')
    
    # Print model accuracy and f1 score
    predictions = model.predict(X_test)
    accuracy = accuracy_score(y_test, predictions)
    f1 = f1_score(y_test, predictions)
    
    print(f"Model Accuracy: {accuracy}")
    print(f"F1 Score: {f1}")

For the provided input the sample accuracy at present is —

Model Accuracy: 0.8888
F1 Score: 0.9090

This may change as the days_since_last_login will vary with time.

Step 4: Prediction

This script loads the trained model and makes predictions on new customer data.

# project_folder/scripts/predict.py
import pandas as pd
import joblib

def predict():
    # Load the trained model
    model = joblib.load('<parent_path>/project_folder/tmp/churn_model.pkl')
    
    # New data for prediction
    new_data = pd.DataFrame({
        'age': [29, 52, 41, 23, 45, 30, 39, 40, 22, 50],
        'days_since_login': [12, 45, 22, 10, 60, 35, 5, 90, 13, 20],
        'num_logins': [18, 3, 7, 25, 1, 15, 20, 2, 17, 8]
    })
    
    # Make predictions
    predictions = model.predict(new_data)
    
    
    # Display results and save to file
    output = pd.DataFrame({
        'customer_id': range(31, 41),
        'age': new_data['age'],
        'days_since_login': new_data['days_since_login'],
        'num_logins': new_data['num_logins'],
        'predicted_churn': predictions  # Adding predictions as a new column
    })
    
    # Save predictions to a CSV file
    output.to_csv('<parent_path>/project_folder/tmp/predicted_churn.csv', index=False)

    # Optional: Print predictions for verification
    for i, pred in enumerate(predictions):
        print(f"Customer {i+1} Churn Prediction: {'Churn' if pred == 1 else 'Not Churn'}")

The output DataFrame includes predicted_churn, which stores the model’s predictions (0 or 1) for each customer.

Sampel Output :

From predicted_churn.csv we observe that Customers having id of 32, 35 and 38 are predicted to be a possible churn (churn = 1).

+-------------+-----+------------------+------------+-----------------+
| customer_id | age | days_since_login | num_logins | predicted_churn |
+-------------+-----+------------------+------------+-----------------+
|          31 |  29 |               12 |         18 |               0 |
|          32 |  52 |               45 |          3 |               1 |
|          33 |  41 |               22 |          7 |               0 |
|          34 |  23 |               10 |         25 |               0 |
|          35 |  45 |               60 |          1 |               1 |
|          36 |  30 |               35 |         15 |               0 |
|          37 |  39 |                5 |         20 |               0 |
|          38 |  40 |               90 |          2 |               1 |
|          39 |  22 |               13 |         17 |               0 |
|          40 |  50 |               20 |          8 |               0 |
+-------------+-----+------------------+------------+-----------------+

If you’re finding this article useful, your claps and share would mean a lot — it inspires me to keep creating valuable content like this! You can also follow me on Medium and LinkedIn to stay connected and catch all my latest insights.

Step 5: Scheduling the Pipeline with Airflow

Now that we have our python jobs in place, we want to automate everything so it runs daily without manual intervention. This is where Apache Airflow can be useful.

Setting Up Airflow to Run the ML Pipeline

In Airflow, a workflow is defined as a Directed Acyclic Graph (DAG). Airflow DAG (churn_prediction_dag.py) will orchestrate the data extraction, transformation, training and predicting tasks. Here’s what the DAG code looks like:

# project_folder/dags/churn_prediction_dag.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

# Import functions from the scripts
from scripts.extract_data import extract_data
from scripts.transform_data import transform_data
from scripts.train_model import train_model
from scripts.predict import predict

# Default DAG arguments
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
}

# Initialize the DAG
with DAG(
    'customer_churn_prediction',
    default_args=default_args,
    schedule_interval='@daily',
    start_date=datetime(2024, 11, 9),
    catchup=False
) as dag:
    # Start node
    start = DummyOperator(task_id='start')

    # Define tasks
    task_extract_data = PythonOperator(
        task_id='extract_data',
        python_callable=extract_data
    )
    
    task_transform_data = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data
    )
    
    task_train_model = PythonOperator(
        task_id='train_model',
        python_callable=train_model
    )
    
    task_predict = PythonOperator(
        task_id='predict',
        python_callable=predict
    )

    # End node
    end = DummyOperator(task_id='end')

    # Set task dependencies
    start >> task_extract_data >> task_transform_data >> task_train_model >> task_predict >> end 

Project Structure

The project folder will looks like:

project_folder
├── __init__.py
├── dags
│   └── churn_prediction_dag.py       # The main Airflow DAG
├── scripts
|   ├── __init__.py                   # Add an empty __init__.py file in both the project_folder and scripts directories. This will make them Python packages.
│   ├── extract_data.py               # Script for data extraction
│   ├── transform_data.py             # Script for data transformation
│   ├── train_model.py                # Script for model training
│   └── predict.py                    # Script for making predictions on new data
└── tmp
    ├── customer_data.csv             # Intermediate data file after extraction
    ├── transformed_customer_data.csv # Transformed data file
    └── churn_model.pkl               # Trained model file

Wrapping Up

This pipeline demonstrates the power of data engineering and machine learning working together to provide actionable insights. By orchestrating this workflow in Airflow and building a churn prediction model, we’ve created a robust, reusable solution for customer churn analysis. Here are some key takeaways:

  1. Modular Design: Breaking the pipeline into separate scripts for each task — extraction, transformation, training, and prediction — makes it easy to manage, test, and improve each part independently.
  2. Automation with Airflow: Airflow’s scheduling and task management let us automate each step, ensuring that our data is always up to date and our model always retrains as fresh data arrives.
  3. ML Insights Beyond Simple Rules: By using a model instead of hard-coded rules, we allow our churn prediction to capture more complex patterns of login frequency, recency etc.
  4. Scalability: This setup can easily be adapted for larger datasets, new features, or additional predictive models, making it flexible for evolving business needs.

With this structure in place, you can quickly adapt to changing patterns, retrain models with ease, and predict customer churn with confidence. This pipeline setup can be further enhanced by adding monitoring for model performance and re-training criteria, allowing for a fully operationalized ML solution.

Happy ML & Data Engineering! 🎉

If you enjoyed this article, your claps and share would mean a lot and motivate me to keep delivering valuable content like this! You can also follow me on Medium and LinkedIn to stay connected and catch all my latest insights

#MachineLearning #DataPipeline #DataEngineering #MLOps #Airflow #Technology #Data

Mlops
Machine Learning
Data Engineering
Data
Data Science
Recommended from ReadMedium