Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ glacier-s3-select/*result.csv
azure-functions/.python_packages/

*/logs.txt
mwaa-ml-classifier-deployment/.pkgs
37 changes: 37 additions & 0 deletions mwaa-ml-classifier-deployment/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
export DEBUG=1

SHELL := /bin/bash

usage: ## Show this help
@fgrep -h "##" $(MAKEFILE_LIST) | fgrep -v fgrep | sed -e 's/\\$$//' | sed -e 's/##//'

install: ## Install dependencies
@which localstack || pip install localstack
@which awslocal || pip install awscli-local

run: ## Run MWAA ETL Processor
./run.sh

start: ## Start LocalStack
@echo "Attempting to create localstack network (if it doesn't already exist):"
@docker network create --attachable localstack || true
@echo "Starting LocalStack:"
EXTRA_CORS_ALLOWED_ORIGINS='*' DOCKER_FLAGS='-e LS_LOG=trace --network localstack --name=localhost.localstack.cloud -e GATEWAY_SERVER=hypercorn' localstack start -d

stop: ## Stop LocalStack
@echo
MAIN_CONTAINER_NAME=localhost.localstack.cloud localstack stop
docker rm -f $$(docker network inspect localstack | jq -r '.[0].Containers | keys[0]')

ready: ## Wait for LocalStack to be ready
@echo Waiting on the LocalStack container...
@localstack wait -t 30 && echo Localstack is ready to use! || (echo Gave up waiting on LocalStack, exiting. && exit 1)

logs: ## Retrieve logs from LocalStack
@localstack logs > logs.txt

test-ci: ## Run CI test
make start install ready run; return_code=`echo $$?`;\
make logs; make stop; exit $$return_code;

.PHONY: usage install start run stop ready logs test-ci
63 changes: 63 additions & 0 deletions mwaa-ml-classifier-deployment/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Localstack Demo: Training and deploying ML classifier with MWAA

App that creates a DAG inside MWAA that takes a dataset, and builds a classifier model based on the feature columns and targetting column. A classifier is trained and the one with the best accuracy out of a bunch of three algorithms is picked up: SVM, Logistic Regression, and Decision Tree. Finally, the model is deployed as a Lambda function.

To keep it simple, no external dependencies (custom Docker images) were added, and the training happens locally in Airflow. Following that, the model gets deployed as a Lambda function. While not ideal, as usually all workloads are supposed to be off-loaded (i.e. with SageMaker, or EC2 / AWS Batch jobs), but easily trained models can still technically be run with the local executor.

The only input the DAG has is a `airflow/variables/dataset_spec` secret in `SecretsManager` service, like the following one:

```json
{
"url": "https://gist.githubusercontent.com/netj/8836201/raw/6f9306ad21398ea43cba4f7d537619d0e07d5ae3/iris.csv",
"name": "iris.data",
"feature_columns": ["sepal.length", "sepal.width", "petal.length", "petal.width"],
"target_column": "variety"
}
```

## Prerequisites

* LocalStack
* Docker
* Python 3.8+ / Python Pip
* `make`
* `jq`
* `curl`
* `awslocal`

## Installing

To install the dependencies:

```shell
make install
```

## Starting LocalStack

Make sure that LocalStack is started:

```shell
LOCALSTACK_AUTH_TOKEN=... make start
```

## Running

Run the sample demo script:

```shell
make run
```

## Proxying Secrets

To proxy Airflow variables to upstream AWS, you can use the [proxy.conf](proxy.conf) config file to only use upstream AWS secrets as the Airflow variables. That's because we're sourcing the Airflow variables from the AWS Secrets backend. This assumes you have the `localstack-extension-aws-replicator` extension installed onto the LocalStack instance: https://pypi.org/project/localstack-extension-aws-replicator/.

```shell
localstack aws proxy -c proxy.conf --container
```

## License

This code is available under the Apache 2.0 license.

212 changes: 212 additions & 0 deletions mwaa-ml-classifier-deployment/airflow-bucket/dags/train_and_deploy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import pickle
import io
from datetime import datetime, timedelta

from airflow.decorators import dag, task
from airflow.models import Variable
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook

from pydantic import BaseModel
from typing import List, Dict

import hashlib
import pandas as pd

class DatasetSpec(BaseModel):
url: str
name: str
feature_columns: List[str]
target_column: str

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

@dag(
default_args=default_args,
schedule_interval="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["ml-classifier"]
)
def train_and_deploy_classifier_model():
@task
def retrieve_dataset():
# Retrieve the dataset from the Airflow Variable.
dataset_spec = Variable.get(
"dataset_spec",
deserialize_json=True,
)
if dataset_spec is None:
raise ValueError("Dataset URL is not defined")

try:
DatasetSpec(**dataset_spec)
except Exception as e:
raise ValueError(f"Invalid dataset spec: {e}")

return dataset_spec

@task
def read_dataset(dataset_spec):
dataset = DatasetSpec(**dataset_spec)

# Read the dataset from the specified URL.
df = pd.read_csv(dataset.url)
print(df.head())

# Compute dataset ID.
dataset_id = hashlib.sha256(pd.util.hash_pandas_object(df, index=True).values).hexdigest()
print(f"Dataset ID: {dataset_id}")

# Return the dataset and its ID.
return {
"dataset": df.to_dict(),
"dataset_id": dataset_id,
}

@task
def train_model(dataset_spec: dict, dataset: dict, algorithm: str):
from sklearn import svm
from sklearn import metrics
from sklearn.preprocessing import LabelEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier

df_json: Dict = dataset["dataset"]
dataset_id: str = dataset["dataset_id"]
dataset_spec: DatasetSpec = DatasetSpec(**dataset_spec)
data: pd.DataFrame = pd.DataFrame().from_dict(df_json)

print(data.head())

# Split the dataset into feature columns and target column.
X_data = data[dataset_spec.feature_columns]
Y_data = data[dataset_spec.target_column]

# Split the dataset into training and testing sets.
X_train, X_test, y_train, y_test = train_test_split(X_data, Y_data, test_size=0.2)

# Encode the target column.
label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(y_train)
y_test_encoded = label_encoder.transform(y_test)
y_train = y_train_encoded
y_test = y_test_encoded

# Print the dataset information.
print(f"Feature columns: {dataset_spec.feature_columns}")
print(f"Target column: {dataset_spec.target_column}")
print(f"Train size: {len(X_train)}")
print(f"Test size: {len(X_test)}")
print(f"Training model using {algorithm} algorithm")

# Train the model using the specified algorithm.
if algorithm == "SVM":
model = svm.SVC()
elif algorithm == "LogisticRegression":
model = LogisticRegression()
elif algorithm == "DecisionTreeClassifier":
model = DecisionTreeClassifier()
else:
raise ValueError(f"Unsupported algorithm: {algorithm}")

# Train the model.
model.fit(X_train, y_train)

# Predict the target values.
y_pred = model.predict(X_test)

# Compute the accuracy of the model.
accuracy = metrics.accuracy_score(y_test, y_pred)
precision = metrics.precision_score(y_test, y_pred, average="weighted")
recall = metrics.recall_score(y_test, y_pred, average="weighted")
f1 = metrics.f1_score(y_test, y_pred, average="weighted")
conf_matrix = metrics.confusion_matrix(y_test, y_pred)

# Save the model and label encoder classes.
model.classes_names = label_encoder.classes_

# Dump the model and label encoder to S3.
s3_hook = S3Hook(aws_conn_id="aws_default")
s3_hook.create_bucket(bucket_name="models")
model_bytes = pickle.dumps(model)
model_buffer = io.BytesIO(model_bytes)
s3_hook.load_bytes(
bytes_data=model_buffer.getvalue(),
key=f"models/{dataset_id}/{algorithm}.pkl",
bucket_name="models",
replace=True,
)

# Print or log the evaluation metrics
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")
print(f"Confusion Matrix:\n{conf_matrix}")

return accuracy

@task
def deploy_model(ml_algorithms: List[str], accuracies: List[float], dataset: dict):
print(f"Model accuracies: {accuracies}")
print(f"ML algorithms: {ml_algorithms}")

dataset_id = dataset["dataset_id"]
best_model_index = accuracies.index(max(accuracies))
best_ml_algorithm = ml_algorithms[best_model_index]

print(f"Location of best model: s3://models/models/{dataset_id}/{best_ml_algorithm}.pkl")
lambda_hook = LambdaHook(aws_conn_id="aws_default")
lambda_client = lambda_hook.get_client_type()

try:
lambda_hook.create_lambda(
function_name=f"ml-model-{best_ml_algorithm}-{dataset_id}"[:64],
runtime="python3.9",
role="arn:aws:iam::000000000000:role/lambda-role",
handler="main.lambda_handler",
code={
"S3Bucket": "lambda",
"S3Key": "deploy_lambda.zip",
},
environment={
"Variables": {
"MODEL_BUCKET_NAME": "models",
"MODEL_OBJECT_KEY": f"models/{dataset_id}/{best_ml_algorithm}.pkl",
},
},
)
except Exception as e:
print(f"Error creating the function: {e}")

try:
lambda_client.create_function_url_config(
FunctionName=f"ml-model-{best_ml_algorithm}-{dataset_id}"[:64],
AuthType="NONE",
InvokeMode="BUFFERED",
)
except Exception as e:
print(f"Error creating the function URL config: {e}")

dataset_spec: Dict = retrieve_dataset()
dataset = read_dataset(dataset_spec)

ml_algorithms = ["SVM", "LogisticRegression", "DecisionTreeClassifier"]
accuracies = []
for algorithm in ml_algorithms:
accuracies += [train_model(dataset_spec, dataset, algorithm)]

deploy_model(ml_algorithms, accuracies, dataset)


dag = train_and_deploy_classifier_model()
5 changes: 5 additions & 0 deletions mwaa-ml-classifier-deployment/airflow-bucket/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
apache-airflow[amazon]==2.8.1
pandas
pydantic
scikit-learn==1.3.2
scipy
40 changes: 40 additions & 0 deletions mwaa-ml-classifier-deployment/lambda/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import os
import boto3
import pickle
import json

def lambda_handler(event, context):
print(f"Received event: {json.dumps(event)}")

# Retrieve JSON from body
payload = json.loads(event["body"])
sample = payload.get("sample")

# Specify the S3 bucket and object key
bucket_name = os.environ["MODEL_BUCKET_NAME"]
object_key = os.environ["MODEL_OBJECT_KEY"]

# Create an S3 client
s3 = boto3.client("s3")

# Download the file from S3
response = s3.get_object(Bucket=bucket_name, Key=object_key)
model_data = response["Body"].read()

# Load the model from the downloaded data
model = pickle.loads(model_data)

# Run inference.
print(f"Running inference on sample: {sample}")
index_prediction = int(model.predict(sample)[0])
print(f"Prediction index: {index_prediction}")
prediction = model.classes_names[index_prediction]
print(f"Prediction: {prediction}")

return {
"statusCode": 200,
"headers": {
"Content-Type": "application/json"
},
"body": json.dumps({"prediction": prediction})
}
10 changes: 10 additions & 0 deletions mwaa-ml-classifier-deployment/proxy.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
services:
secretsmanager:
resources:
# list of ARNs of secrets to proxy to real AWS
- 'arn:aws:secretsmanager:.+:secret:airflow/variables/.*'
operations:
# list of operation name regex patterns to include all operations
- '.*'
# optionally, specify if only read requests should be allowed; false allows all operations
read_only: true
Loading