Machine Learning on AWS – Accelerating Data Processing and Saving compute costs with AWS SageMaker Processing Jobs

In the world of machine learning, data processing is a crucial phase that can often become a bottleneck in the MLOps (Machine Learning Operations) pipeline. This phase involves cleaning, transforming, and preparing data for training machine learning models. AWS SageMaker provides a robust solution for managing these tasks efficiently through its processing jobs. In this blog, we’ll delve into how SageMaker can streamline the data processing phase, why this phase can be time-consuming, and how leveraging multiple instances and tools like Dask can optimize this process.

What is AWS SageMaker?

AWS SageMaker is a fully managed service that enables developers and data scientists to build, train, and deploy machine learning models at scale. SageMaker simplifies the process of developing machine learning models by providing a variety of tools and services, including Jupyter notebooks for exploration and experimentation, built-in algorithms for training, and easy deployment options for creating scalable, production-ready models.

Understanding the MLOps Process

MLOps refers to the set of practices that aims to deploy and maintain machine learning models in production reliably and efficiently. The MLOps process encompasses several phases, including:

  1. Data Collection and Preparation: Gathering and cleaning data to ensure it is suitable for training models.
  2. Model Training: Using prepared data to train machine learning models.
  3. Model Evaluation: Assessing the performance of trained models.
  4. Model Deployment: Deploying models to production environments.
  5. Monitoring and Maintenance: Continuously monitoring model performance and retraining models as necessary.

The Data Processing Phase: A Potential Bottleneck

The data processing phase is often the most time-consuming part of the MLOps process. This phase involves several steps, such as data cleaning, normalization, feature engineering, and data partitioning. These steps are critical because the quality and format of the data directly impact the performance of the machine learning models.

However, data processing can become a bottleneck due to the sheer volume of data and the complexity of the transformations required. This is where AWS SageMaker processing jobs can play a pivotal role.

Optimizing Data Processing with SageMaker and Dask

AWS SageMaker processing jobs allow you to run data processing workloads on fully managed infrastructure. By leveraging multiple instances and parallel processing frameworks like Dask, you can significantly reduce the time required for data processing.

Using Multiple Instances

One of the key advantages of SageMaker processing jobs is the ability to distribute workloads across multiple instances. For example, let’s consider a scenario where we need to process a large dataset. Instead of running the job on a single instance, we can leverage multiple instances to process the data in parallel.

Leveraging Dask for Parallel Processing

Dask is a parallel computing library that scales Python code from a single machine to a cluster of machines. It allows you to perform parallel data processing using partitions, which can be particularly useful for handling large datasets.

Here’s an example of how you can use Dask with SageMaker processing jobs, in this script, I have created 1 million lines in order to create a big load on our processing task. This script will run on the SageMaker docker container. It first create a Dask cluster client and attach the instance to the cluster.

				
					from __future__ import print_function, unicode_literals
import argparse
import json
import logging
import os
import sys
import time
import warnings
import boto3
import numpy as np
import pandas as pd
from tornado import gen
import dask.dataframe as dd
import dask.array as da
import joblib
from dask.distributed import Client
from sklearn.compose import make_column_transformer
from sklearn.exceptions import DataConversionWarning
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import (
    KBinsDiscretizer,
    LabelBinarizer,
    OneHotEncoder,
    PolynomialFeatures,
    StandardScaler,
)

warnings.filterwarnings(action="ignore", category=DataConversionWarning)
attempts_counter = 3
attempts = 0

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--train-test-split-ratio", type=float, default=0.3)
    args, _ = parser.parse_known_args()

    # Get processor scrip arguments
    args_iter = iter(sys.argv[1:])
    script_args = dict(zip(args_iter, args_iter))
    scheduler_ip = sys.argv[-1]

    # S3 client
    s3_region = "il-central-1"
    s3_client = boto3.resource("s3", s3_region)
    print(f"Using the {s3_region} region")

    # Start the Dask cluster client
    try:
        client = Client("tcp://{ip}:8786".format(ip=scheduler_ip))
        logging.info("Printing cluster information: {}".format(client))
    except Exception as err:
        logging.exception(err)

    # Parameters
    rows = 10**6  # Number of rows
    cols = 100    # Number of columns

    # Start timer for DataFrame generation
    start_time = time.time()

    # Generate the DataFrame
    df = pd.DataFrame(np.random.rand(rows, cols), columns=[f"col_{j}" for j in range(cols)])

    # End timer for DataFrame generation
    gen_time = time.time() - start_time
    print(f"Time taken to generate the DataFrame: {gen_time:.2f} seconds")

    # Start timer for Dask conversion and CSV saving
    start_time = time.time()

    # Convert the DataFrame to a Dask DataFrame
    ddf = dd.from_pandas(df, npartitions=6)

    # Save the Dask DataFrame to a CSV file
    ddf.to_csv('s3://sagemaker-il-central-1-133111709665/test-*.csv', index=False, storage_options={"client_kwargs": {
        "region_name": s3_region
    }})

    # End timer for Dask conversion and CSV saving
    save_time = time.time() - start_time
    print(f"Time taken to save the Dask DataFrame to CSV: {save_time:.2f} seconds")

    print("CSV file creation complete.")

    print(client)
    sys.exit(os.EX_OK)

				
			

Now, let’s build our docker file:

				
					FROM daskdev/dask

ENV PYTHONHASHSEED 0
ENV PYTHONIOENCODING UTF-8

RUN pip install boto3==1.20.46

RUN conda install --yes -c conda-forge \
    dask-ml \
    s3fs

# Dumb init
RUN wget -O /usr/local/bin/dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.0/dumb-init_1.2.0_amd64
RUN chmod +x /usr/local/bin/dumb-init

RUN mkdir -p /opt/app /etc/dask
COPY dask_config/dask.yaml /etc/dask/

# Set up bootstrapping program and Spark configuration
COPY program /opt/program
RUN chmod +x /opt/program/bootstrap.py


ENTRYPOINT ["/opt/program/bootstrap.py"]
				
			

After building the docker file and push to ECR. Lets create our SageMaker Processing job. We will use AWS SageMaker sdk for python:

				
					from sagemaker.processing import ProcessingInput, ScriptProcessor

dask_processor = ScriptProcessor(
    base_job_name="dask-preprocessor",
    image_uri="******.dkr.ecr.il-central-1.amazonaws.com/sagemaker-dask-example:latest",
    command=["/opt/program/bootstrap.py"],
    role=role,
    instance_count=6,
    instance_type="ml.m5.large",
    max_runtime_in_seconds=1200,
)

dask_processor.run(
    code="preprocess.py",
    arguments=[
        "s3_input_bucket",
        bucket,
        "s3_input_key_prefix",
        input_prefix,
        "s3_output_bucket",
        bucket,
        "s3_output_key_prefix",
        input_preprocessed_prefix,
        "s3_region",
        region,
    ],
    logs=True,
)
				
			

In this example, we use four ml.m5.large instances to process the data in parallel. The data is read from an S3 bucket, processed using Dask, and the output is saved back to S3. By distributing the workload across multiple instances, we can significantly reduce the processing time.

Cost Efficiency Example

To illustrate the cost efficiency, let’s compare two scenarios:

  1. Using Four Instances: Running a processing job on four ml.m5.large instances for 50 seconds.
  2. Using Two Instances: Running the same job on two ml.m5.large instances for 107 seconds.

Assuming the cost of an ml.m5.large instance is $0.10 per hour:

  • Four Instances:

    • Total time: 50 seconds
    • Total cost: 4 instances * $0.10/hour * (50/3600) hours = $0.00556
  • Two Instances:

    • Total time: 107 seconds
    • Total cost: 2 instances * $0.10/hour * (107/3600) hours = $0.0059

While the cost difference is minimal in this simplified example, the key takeaway is that leveraging more instances can dramatically reduce processing time (More than 40%), which is crucial for timely insights and decision-making in production environments.

Conclusion

AWS SageMaker processing jobs provide a powerful and scalable solution for handling the data processing phase in MLOps. By leveraging multiple instances and parallel processing frameworks like Dask, you can optimize the processing time and improve overall efficiency. This not only accelerates the machine learning pipeline but also ensures that your models are trained on high-quality, well-prepared data, ultimately leading to better performance and more accurate predictions.


Comments

Leave a Reply

Your email address will not be published. Required fields are marked *