Data Engineering for ML: Building a Customer Churn Prediction Pipeline with Airflow
My articles are open to everyone; non-members can read the full article by clicking this link .
In the machine learning world, a strong data pipeline is the foundation of any successful project. Here, we’ll walk through building a pipeline that predicts customer churn. With Apache Airflow for orchestration and a bit of machine learning, we’ll cover every step from data extraction, cleaning to making predictions on new data with our pipeline.
But first, let’s set up some context. Let us assume, we are running a SaaS company. In our use case, churn prediction can highlight customers who have reduced engagement. This will help the company proactively reach out to these customers with better offering to re-engage them. The pipeline considers multiple variables — like age, recent activity, and login frequency — to capture a broader pattern for this prediction.
Step 1: Data Extraction
The below script creates a mock dataset with customer information and saves it as customer_data.csv
# project_folder/scripts/extract_data.py
import pandas as pd
import numpy as np
def extract_data():
data = {
'customer_id': range(1, 31),
'age': [25, 45, 34, 23, 30, 28, 37, np.nan, 45, 24, 31, 29, 41, 35, 44, 27, 32, 26, 49, 36, 22, 39, 43, 21, 48, 33, 40, 38, 27, 55],
'location': ['NY', 'CA', 'TX', 'NY', 'CA', 'TX', 'NY', 'CA', 'TX', 'NY', 'CA', 'TX', 'NY', 'CA', 'TX', 'NY', 'CA', 'TX', 'NY', 'CA',
'NY', 'CA', 'TX', 'NY', np.nan, 'TX', 'NY', 'CA', 'TX', 'NY'],
'last_login': ['2024-10-05', '2024-10-07', '2024-09-30', '2024-10-01', '2024-10-06',
'2024-09-15', '2024-09-10', '2024-10-03', '2024-10-02', '2024-10-04',
'2024-09-25', '2024-09-20', '2024-10-05', '2024-10-07', '2024-09-30',
'2024-10-01', '2024-09-28', '2024-09-22', '2024-09-25', '2024-10-02',
'2024-10-05', '2024-10-07', '2024-09-30', '2024-10-01', '2024-10-06',
'2024-09-28', '2024-10-04', '2024-10-08', '2024-09-10', '2024-09-18'],
'num_logins': [10, 20, 15, 8, 5, 3, 25, 40, 18, 22, 9, 7, np.nan, 35, 12, 6, 8, 15, 21, 10,
12, 17, 6, 8, 14, 9, 19, 23, 11, 18]
}
df = pd.DataFrame(data)
df.to_csv('<parent_path>/project_folder/tmp/customer_data.csv', index=False)Step 2: Data Cleaning & Transformation
This script cleans and processes the data, adding a feature for the number of days since the last login, then saves the output to — transformed_customer_data.csv
# project_folder/scripts/transform_data.py
import pandas as pd
def transform_data():
df = pd.read_csv('<parent_path>/project_folder/tmp/customer_data.csv')
# Data Cleaning
age_mean = df['age'].mean()
df['age'].fillna(age_mean, inplace=True)
df['location'].fillna('Unknown', inplace=True)
df['num_logins'].fillna(0, inplace=True)
# Feature Engineering
df['last_login'] = pd.to_datetime(df['last_login'])
df['days_since_login'] = (pd.to_datetime('today') - df['last_login']).dt.days
# Define churn based on more complex patterns ie. days_since_login>40 or num_logins <10
# This label is used for training only
df['churn'] = ((df['days_since_login'] > 40) | (df['num_logins'] < 10)).astype(int)
df.to_csv('<parent_path>/project_folder/tmp/transformed_customer_data.csv', index=False)Step 3: Model Training
This script trains a logistic regression model, which is one of the most popular algorithm for binary classification, on the transformed data and saves it.
# project_folder/scripts/train_model.py
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
import joblib
def train_model():
df = pd.read_csv('<parent_path>/project_folder/tmp/transformed_customer_data.csv')
# Features and target
X = df[['age', 'days_since_login', 'num_logins']]
y = df['churn']
# Split data in training and testing set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)
# Train model
model = LogisticRegression()
model.fit(X_train, y_train)
# Save model to pkl file
joblib.dump(model, '<parent_path>/project_folder/tmp/churn_model.pkl')
# Print model accuracy and f1 score
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
f1 = f1_score(y_test, predictions)
print(f"Model Accuracy: {accuracy}")
print(f"F1 Score: {f1}")For the provided input the sample accuracy at present is —
Model Accuracy: 0.8888
F1 Score: 0.9090This may change as the days_since_last_login will vary with time.
Step 4: Prediction
This script loads the trained model and makes predictions on new customer data.
# project_folder/scripts/predict.py
import pandas as pd
import joblib
def predict():
# Load the trained model
model = joblib.load('<parent_path>/project_folder/tmp/churn_model.pkl')
# New data for prediction
new_data = pd.DataFrame({
'age': [29, 52, 41, 23, 45, 30, 39, 40, 22, 50],
'days_since_login': [12, 45, 22, 10, 60, 35, 5, 90, 13, 20],
'num_logins': [18, 3, 7, 25, 1, 15, 20, 2, 17, 8]
})
# Make predictions
predictions = model.predict(new_data)
# Display results and save to file
output = pd.DataFrame({
'customer_id': range(31, 41),
'age': new_data['age'],
'days_since_login': new_data['days_since_login'],
'num_logins': new_data['num_logins'],
'predicted_churn': predictions # Adding predictions as a new column
})
# Save predictions to a CSV file
output.to_csv('<parent_path>/project_folder/tmp/predicted_churn.csv', index=False)
# Optional: Print predictions for verification
for i, pred in enumerate(predictions):
print(f"Customer {i+1} Churn Prediction: {'Churn' if pred == 1 else 'Not Churn'}")The output DataFrame includes predicted_churn, which stores the model’s predictions (0 or 1) for each customer.
Sampel Output :
From predicted_churn.csv we observe that Customers having id of 32, 35 and 38 are predicted to be a possible churn (churn = 1).
+-------------+-----+------------------+------------+-----------------+
| customer_id | age | days_since_login | num_logins | predicted_churn |
+-------------+-----+------------------+------------+-----------------+
| 31 | 29 | 12 | 18 | 0 |
| 32 | 52 | 45 | 3 | 1 |
| 33 | 41 | 22 | 7 | 0 |
| 34 | 23 | 10 | 25 | 0 |
| 35 | 45 | 60 | 1 | 1 |
| 36 | 30 | 35 | 15 | 0 |
| 37 | 39 | 5 | 20 | 0 |
| 38 | 40 | 90 | 2 | 1 |
| 39 | 22 | 13 | 17 | 0 |
| 40 | 50 | 20 | 8 | 0 |
+-------------+-----+------------------+------------+-----------------+
If you’re finding this article useful, your claps and share would mean a lot — it inspires me to keep creating valuable content like this! You can also follow me on Medium and LinkedIn to stay connected and catch all my latest insights.
Step 5: Scheduling the Pipeline with Airflow
Now that we have our python jobs in place, we want to automate everything so it runs daily without manual intervention. This is where Apache Airflow can be useful.
Setting Up Airflow to Run the ML Pipeline
In Airflow, a workflow is defined as a Directed Acyclic Graph (DAG). Airflow DAG (churn_prediction_dag.py) will orchestrate the data extraction, transformation, training and predicting tasks.
Here’s what the DAG code looks like:
# project_folder/dags/churn_prediction_dag.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
# Import functions from the scripts
from scripts.extract_data import extract_data
from scripts.transform_data import transform_data
from scripts.train_model import train_model
from scripts.predict import predict
# Default DAG arguments
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
}
# Initialize the DAG
with DAG(
'customer_churn_prediction',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2024, 11, 9),
catchup=False
) as dag:
# Start node
start = DummyOperator(task_id='start')
# Define tasks
task_extract_data = PythonOperator(
task_id='extract_data',
python_callable=extract_data
)
task_transform_data = PythonOperator(
task_id='transform_data',
python_callable=transform_data
)
task_train_model = PythonOperator(
task_id='train_model',
python_callable=train_model
)
task_predict = PythonOperator(
task_id='predict',
python_callable=predict
)
# End node
end = DummyOperator(task_id='end')
# Set task dependencies
start >> task_extract_data >> task_transform_data >> task_train_model >> task_predict >> end Project Structure
The project folder will looks like:
project_folder
├── __init__.py
├── dags
│ └── churn_prediction_dag.py # The main Airflow DAG
├── scripts
| ├── __init__.py # Add an empty __init__.py file in both the project_folder and scripts directories. This will make them Python packages.
│ ├── extract_data.py # Script for data extraction
│ ├── transform_data.py # Script for data transformation
│ ├── train_model.py # Script for model training
│ └── predict.py # Script for making predictions on new data
└── tmp
├── customer_data.csv # Intermediate data file after extraction
├── transformed_customer_data.csv # Transformed data file
└── churn_model.pkl # Trained model fileWrapping Up
This pipeline demonstrates the power of data engineering and machine learning working together to provide actionable insights. By orchestrating this workflow in Airflow and building a churn prediction model, we’ve created a robust, reusable solution for customer churn analysis. Here are some key takeaways:
- Modular Design: Breaking the pipeline into separate scripts for each task — extraction, transformation, training, and prediction — makes it easy to manage, test, and improve each part independently.
- Automation with Airflow: Airflow’s scheduling and task management let us automate each step, ensuring that our data is always up to date and our model always retrains as fresh data arrives.
- ML Insights Beyond Simple Rules: By using a model instead of hard-coded rules, we allow our churn prediction to capture more complex patterns of login frequency, recency etc.
- Scalability: This setup can easily be adapted for larger datasets, new features, or additional predictive models, making it flexible for evolving business needs.
With this structure in place, you can quickly adapt to changing patterns, retrain models with ease, and predict customer churn with confidence. This pipeline setup can be further enhanced by adding monitoring for model performance and re-training criteria, allowing for a fully operationalized ML solution.
Happy ML & Data Engineering! 🎉
If you enjoyed this article, your claps and share would mean a lot and motivate me to keep delivering valuable content like this! You can also follow me on Medium and LinkedIn to stay connected and catch all my latest insights
#MachineLearning #DataPipeline #DataEngineering #MLOps #Airflow #Technology #Data






