diff --git a/.gitignore b/.gitignore index 6208c06..07f57e3 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,4 @@ glacier-s3-select/*result.csv azure-functions/.python_packages/ */logs.txt +mwaa-ml-classifier-deployment/.pkgs diff --git a/mwaa-ml-classifier-deployment/Makefile b/mwaa-ml-classifier-deployment/Makefile new file mode 100644 index 0000000..21e38e7 --- /dev/null +++ b/mwaa-ml-classifier-deployment/Makefile @@ -0,0 +1,37 @@ +export DEBUG=1 + +SHELL := /bin/bash + +usage: ## Show this help + @fgrep -h "##" $(MAKEFILE_LIST) | fgrep -v fgrep | sed -e 's/\\$$//' | sed -e 's/##//' + +install: ## Install dependencies + @which localstack || pip install localstack + @which awslocal || pip install awscli-local + +run: ## Run MWAA ETL Processor + ./run.sh + +start: ## Start LocalStack + @echo "Attempting to create localstack network (if it doesn't already exist):" + @docker network create --attachable localstack || true + @echo "Starting LocalStack:" + EXTRA_CORS_ALLOWED_ORIGINS='*' DOCKER_FLAGS='-e LS_LOG=trace --network localstack --name=localhost.localstack.cloud -e GATEWAY_SERVER=hypercorn' localstack start -d + +stop: ## Stop LocalStack + @echo + MAIN_CONTAINER_NAME=localhost.localstack.cloud localstack stop + docker rm -f $$(docker network inspect localstack | jq -r '.[0].Containers | keys[0]') + +ready: ## Wait for LocalStack to be ready + @echo Waiting on the LocalStack container... + @localstack wait -t 30 && echo Localstack is ready to use! || (echo Gave up waiting on LocalStack, exiting. && exit 1) + +logs: ## Retrieve logs from LocalStack + @localstack logs > logs.txt + +test-ci: ## Run CI test + make start install ready run; return_code=`echo $$?`;\ + make logs; make stop; exit $$return_code; + +.PHONY: usage install start run stop ready logs test-ci diff --git a/mwaa-ml-classifier-deployment/README.md b/mwaa-ml-classifier-deployment/README.md new file mode 100644 index 0000000..07c5489 --- /dev/null +++ b/mwaa-ml-classifier-deployment/README.md @@ -0,0 +1,63 @@ +# Localstack Demo: Training and deploying ML classifier with MWAA + +App that creates a DAG inside MWAA that takes a dataset, and builds a classifier model based on the feature columns and targetting column. A classifier is trained and the one with the best accuracy out of a bunch of three algorithms is picked up: SVM, Logistic Regression, and Decision Tree. Finally, the model is deployed as a Lambda function. + +To keep it simple, no external dependencies (custom Docker images) were added, and the training happens locally in Airflow. Following that, the model gets deployed as a Lambda function. While not ideal, as usually all workloads are supposed to be off-loaded (i.e. with SageMaker, or EC2 / AWS Batch jobs), but easily trained models can still technically be run with the local executor. + +The only input the DAG has is a `airflow/variables/dataset_spec` secret in `SecretsManager` service, like the following one: + +```json +{ + "url": "https://gist.githubusercontent.com/netj/8836201/raw/6f9306ad21398ea43cba4f7d537619d0e07d5ae3/iris.csv", + "name": "iris.data", + "feature_columns": ["sepal.length", "sepal.width", "petal.length", "petal.width"], + "target_column": "variety" +} +``` + +## Prerequisites + +* LocalStack +* Docker +* Python 3.8+ / Python Pip +* `make` +* `jq` +* `curl` +* `awslocal` + +## Installing + +To install the dependencies: + +```shell +make install +``` + +## Starting LocalStack + +Make sure that LocalStack is started: + +```shell +LOCALSTACK_AUTH_TOKEN=... make start +``` + +## Running + +Run the sample demo script: + +```shell +make run +``` + +## Proxying Secrets + +To proxy Airflow variables to upstream AWS, you can use the [proxy.conf](proxy.conf) config file to only use upstream AWS secrets as the Airflow variables. That's because we're sourcing the Airflow variables from the AWS Secrets backend. This assumes you have the `localstack-extension-aws-replicator` extension installed onto the LocalStack instance: https://pypi.org/project/localstack-extension-aws-replicator/. + +```shell +localstack aws proxy -c proxy.conf --container +``` + +## License + +This code is available under the Apache 2.0 license. + diff --git a/mwaa-ml-classifier-deployment/airflow-bucket/dags/train_and_deploy.py b/mwaa-ml-classifier-deployment/airflow-bucket/dags/train_and_deploy.py new file mode 100644 index 0000000..b6f745b --- /dev/null +++ b/mwaa-ml-classifier-deployment/airflow-bucket/dags/train_and_deploy.py @@ -0,0 +1,212 @@ +import pickle +import io +from datetime import datetime, timedelta + +from airflow.decorators import dag, task +from airflow.models import Variable +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook + +from pydantic import BaseModel +from typing import List, Dict + +import hashlib +import pandas as pd + +class DatasetSpec(BaseModel): + url: str + name: str + feature_columns: List[str] + target_column: str + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +@dag( + default_args=default_args, + schedule_interval="@once", + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["ml-classifier"] +) +def train_and_deploy_classifier_model(): + @task + def retrieve_dataset(): + # Retrieve the dataset from the Airflow Variable. + dataset_spec = Variable.get( + "dataset_spec", + deserialize_json=True, + ) + if dataset_spec is None: + raise ValueError("Dataset URL is not defined") + + try: + DatasetSpec(**dataset_spec) + except Exception as e: + raise ValueError(f"Invalid dataset spec: {e}") + + return dataset_spec + + @task + def read_dataset(dataset_spec): + dataset = DatasetSpec(**dataset_spec) + + # Read the dataset from the specified URL. + df = pd.read_csv(dataset.url) + print(df.head()) + + # Compute dataset ID. + dataset_id = hashlib.sha256(pd.util.hash_pandas_object(df, index=True).values).hexdigest() + print(f"Dataset ID: {dataset_id}") + + # Return the dataset and its ID. + return { + "dataset": df.to_dict(), + "dataset_id": dataset_id, + } + + @task + def train_model(dataset_spec: dict, dataset: dict, algorithm: str): + from sklearn import svm + from sklearn import metrics + from sklearn.preprocessing import LabelEncoder + from sklearn.linear_model import LogisticRegression + from sklearn.model_selection import train_test_split + from sklearn.neighbors import KNeighborsClassifier + from sklearn.tree import DecisionTreeClassifier + + df_json: Dict = dataset["dataset"] + dataset_id: str = dataset["dataset_id"] + dataset_spec: DatasetSpec = DatasetSpec(**dataset_spec) + data: pd.DataFrame = pd.DataFrame().from_dict(df_json) + + print(data.head()) + + # Split the dataset into feature columns and target column. + X_data = data[dataset_spec.feature_columns] + Y_data = data[dataset_spec.target_column] + + # Split the dataset into training and testing sets. + X_train, X_test, y_train, y_test = train_test_split(X_data, Y_data, test_size=0.2) + + # Encode the target column. + label_encoder = LabelEncoder() + y_train_encoded = label_encoder.fit_transform(y_train) + y_test_encoded = label_encoder.transform(y_test) + y_train = y_train_encoded + y_test = y_test_encoded + + # Print the dataset information. + print(f"Feature columns: {dataset_spec.feature_columns}") + print(f"Target column: {dataset_spec.target_column}") + print(f"Train size: {len(X_train)}") + print(f"Test size: {len(X_test)}") + print(f"Training model using {algorithm} algorithm") + + # Train the model using the specified algorithm. + if algorithm == "SVM": + model = svm.SVC() + elif algorithm == "LogisticRegression": + model = LogisticRegression() + elif algorithm == "DecisionTreeClassifier": + model = DecisionTreeClassifier() + else: + raise ValueError(f"Unsupported algorithm: {algorithm}") + + # Train the model. + model.fit(X_train, y_train) + + # Predict the target values. + y_pred = model.predict(X_test) + + # Compute the accuracy of the model. + accuracy = metrics.accuracy_score(y_test, y_pred) + precision = metrics.precision_score(y_test, y_pred, average="weighted") + recall = metrics.recall_score(y_test, y_pred, average="weighted") + f1 = metrics.f1_score(y_test, y_pred, average="weighted") + conf_matrix = metrics.confusion_matrix(y_test, y_pred) + + # Save the model and label encoder classes. + model.classes_names = label_encoder.classes_ + + # Dump the model and label encoder to S3. + s3_hook = S3Hook(aws_conn_id="aws_default") + s3_hook.create_bucket(bucket_name="models") + model_bytes = pickle.dumps(model) + model_buffer = io.BytesIO(model_bytes) + s3_hook.load_bytes( + bytes_data=model_buffer.getvalue(), + key=f"models/{dataset_id}/{algorithm}.pkl", + bucket_name="models", + replace=True, + ) + + # Print or log the evaluation metrics + print(f"Accuracy: {accuracy}") + print(f"Precision: {precision}") + print(f"Recall: {recall}") + print(f"F1 Score: {f1}") + print(f"Confusion Matrix:\n{conf_matrix}") + + return accuracy + + @task + def deploy_model(ml_algorithms: List[str], accuracies: List[float], dataset: dict): + print(f"Model accuracies: {accuracies}") + print(f"ML algorithms: {ml_algorithms}") + + dataset_id = dataset["dataset_id"] + best_model_index = accuracies.index(max(accuracies)) + best_ml_algorithm = ml_algorithms[best_model_index] + + print(f"Location of best model: s3://models/models/{dataset_id}/{best_ml_algorithm}.pkl") + lambda_hook = LambdaHook(aws_conn_id="aws_default") + lambda_client = lambda_hook.get_client_type() + + try: + lambda_hook.create_lambda( + function_name=f"ml-model-{best_ml_algorithm}-{dataset_id}"[:64], + runtime="python3.9", + role="arn:aws:iam::000000000000:role/lambda-role", + handler="main.lambda_handler", + code={ + "S3Bucket": "lambda", + "S3Key": "deploy_lambda.zip", + }, + environment={ + "Variables": { + "MODEL_BUCKET_NAME": "models", + "MODEL_OBJECT_KEY": f"models/{dataset_id}/{best_ml_algorithm}.pkl", + }, + }, + ) + except Exception as e: + print(f"Error creating the function: {e}") + + try: + lambda_client.create_function_url_config( + FunctionName=f"ml-model-{best_ml_algorithm}-{dataset_id}"[:64], + AuthType="NONE", + InvokeMode="BUFFERED", + ) + except Exception as e: + print(f"Error creating the function URL config: {e}") + + dataset_spec: Dict = retrieve_dataset() + dataset = read_dataset(dataset_spec) + + ml_algorithms = ["SVM", "LogisticRegression", "DecisionTreeClassifier"] + accuracies = [] + for algorithm in ml_algorithms: + accuracies += [train_model(dataset_spec, dataset, algorithm)] + + deploy_model(ml_algorithms, accuracies, dataset) + + +dag = train_and_deploy_classifier_model() diff --git a/mwaa-ml-classifier-deployment/airflow-bucket/requirements.txt b/mwaa-ml-classifier-deployment/airflow-bucket/requirements.txt new file mode 100644 index 0000000..193a330 --- /dev/null +++ b/mwaa-ml-classifier-deployment/airflow-bucket/requirements.txt @@ -0,0 +1,5 @@ +apache-airflow[amazon]==2.8.1 +pandas +pydantic +scikit-learn==1.3.2 +scipy diff --git a/mwaa-ml-classifier-deployment/lambda/main.py b/mwaa-ml-classifier-deployment/lambda/main.py new file mode 100644 index 0000000..f9218ec --- /dev/null +++ b/mwaa-ml-classifier-deployment/lambda/main.py @@ -0,0 +1,40 @@ +import os +import boto3 +import pickle +import json + +def lambda_handler(event, context): + print(f"Received event: {json.dumps(event)}") + + # Retrieve JSON from body + payload = json.loads(event["body"]) + sample = payload.get("sample") + + # Specify the S3 bucket and object key + bucket_name = os.environ["MODEL_BUCKET_NAME"] + object_key = os.environ["MODEL_OBJECT_KEY"] + + # Create an S3 client + s3 = boto3.client("s3") + + # Download the file from S3 + response = s3.get_object(Bucket=bucket_name, Key=object_key) + model_data = response["Body"].read() + + # Load the model from the downloaded data + model = pickle.loads(model_data) + + # Run inference. + print(f"Running inference on sample: {sample}") + index_prediction = int(model.predict(sample)[0]) + print(f"Prediction index: {index_prediction}") + prediction = model.classes_names[index_prediction] + print(f"Prediction: {prediction}") + + return { + "statusCode": 200, + "headers": { + "Content-Type": "application/json" + }, + "body": json.dumps({"prediction": prediction}) + } diff --git a/mwaa-ml-classifier-deployment/proxy.conf b/mwaa-ml-classifier-deployment/proxy.conf new file mode 100644 index 0000000..a42aecf --- /dev/null +++ b/mwaa-ml-classifier-deployment/proxy.conf @@ -0,0 +1,10 @@ +services: + secretsmanager: + resources: + # list of ARNs of secrets to proxy to real AWS + - 'arn:aws:secretsmanager:.+:secret:airflow/variables/.*' + operations: + # list of operation name regex patterns to include all operations + - '.*' + # optionally, specify if only read requests should be allowed; false allows all operations + read_only: true diff --git a/mwaa-ml-classifier-deployment/run.sh b/mwaa-ml-classifier-deployment/run.sh new file mode 100755 index 0000000..ffd044d --- /dev/null +++ b/mwaa-ml-classifier-deployment/run.sh @@ -0,0 +1,81 @@ +#!/bin/bash +set -euxo pipefail + +DATASET_CONFIG=$(cat <