All posts by Abdel Jaidi

Advanced patterns with AWS SDK for pandas on AWS Glue for Ray

Post Syndicated from Abdel Jaidi original https://aws.amazon.com/blogs/big-data/advanced-patterns-with-aws-sdk-for-pandas-on-aws-glue-for-ray/

AWS SDK for pandas is a popular Python library among data scientists, data engineers, and developers. It simplifies interaction between AWS data and analytics services and pandas DataFrames. It allows easy integration and data movement between 22 types of data stores, including Amazon Simple Storage Service (Amazon S3), Amazon Athena, Amazon Redshift, and Amazon OpenSearch Service.

In the previous post, we discussed how you can use AWS SDK for pandas to scale your workloads on AWS Glue for Ray. We explained how using both Ray and Modin within the library enabled us to distribute workloads across a compute cluster. To illustrate these capabilities, we explored examples of writing Parquet files to Amazon S3 at scale and querying data in parallel with Athena.

In this post, we show some more advanced ways to use this library on AWS Glue for Ray. We cover features and APIs from AWS services such as S3 Select, Amazon DynamoDB, and Amazon Timestream.

Solution overview

The Ray and Modin frameworks allow scaling of pandas workloads easily. You can write code on your laptop that uses the SDK for pandas to get data from an AWS data or analytics service to a pandas DataFrame, transform it using pandas, and then write it back to the AWS service. By using the distributed version of the SDK for pandas and replacing pandas with Modin, exactly the same code will scale on a Ray runtime—all logic about task coordination and distribution is hidden. Taking advantage of these abstractions, the AWS SDK for pandas team has made considerable use of Ray primitives to distribute some of the existing APIs (for the full list, see Supported APIs).

In this post, we show how to use some of these APIs in an AWS Glue for Ray job, namely querying with S3 Select, writing to and reading from a DynamoDB table, and writing to a Timestream table. Because AWS Glue for Ray is a fully managed environment, it’s by far the easiest way to run jobs because you don’t need to worry about cluster management. If you want to create your own cluster on Amazon Elastic Compute Cloud (Amazon EC2), refer to Distributing Calls on Ray Remote Cluster.

Configure solution resources

We use an AWS CloudFormation stack to provision the solution resources. Complete the following steps:

  1. Choose Launch stack to provision the stack in your AWS account:

Launch CloudFormation Stack

This takes about 2 minutes to complete. On successful deployment, the CloudFormation stack shows the status as CREATE_COMPLETE.

CloudFormation CREATE_COMPLETE

  1. Navigate to AWS Glue Studio to find an AWS Glue job named AdvancedGlueRayJob.

Glue for Ray Job Script

  1. On the Job details tab, scroll down and choose Advanced Properties.

Under Job Parameters, AWS SDK for pandas is specified as an additional Python module to install, along with Modin as an extra dependency.

Glue for Ray Job Details

  1. To run the job, choose Run and navigate to the Runs tab to monitor the job’s progress.

Glue for Ray Job Runs

Import the library

To import the library, use the following code:

import awswrangler as wr

AWS SDK for pandas detects if the runtime supports Ray, and automatically initializes a cluster with the default parameters. Advanced users can override this process by starting the Ray runtime before the import command.

Scale S3 Select workflows

S3 Select allows you to use SQL statements to query and filter S3 objects, including compressed files. This can be particularly useful if you have large files of several TBs and want to extract some information. Because the workload is delegated to Amazon S3, you don’t have to download and filter objects on the client side, leading to lower latency, lower cost, and higher performance.

With AWS SDK for pandas, these calls to S3 Select can be distributed across Ray workers in the cluster. In the following example, we query Amazon reviews data in Parquet format, filtering for reviews with 5-star ratings in the Mobile_Electronics partition. star_rating is a column in the Parquet data itself, while the partition is a directory.

# Filter for 5-star reviews with S3 Select within a partition
df_select = wr.s3.select_query(
    sql="SELECT * FROM s3object s where s.\"star_rating\" >= 5",
    path="s3://amazon-reviews-pds/parquet/product_category=Mobile_Electronics/",
    input_serialization="Parquet",
    input_serialization_params={},
    scan_range_chunk_size=1024*1024*16,
)

scan_range_chunk_size is an important parameter to calibrate when using S3 Select. It specifies the range of bytes to query the S3 object, thereby determining the amount of work delegated to each worker. For this example, it’s set to 16 MB, meaning the work of scanning the object is parallelized into separate S3 Select requests each 16 MB in size. A higher value equates to larger chunks per worker but fewer workers, and vice versa.

The results are returned in a Modin DataFrame, which is a drop-in replacement for pandas. It exposes the same APIs but enables you to use all the workers in the cluster. The data in the Modin DataFrame is distributed along with all the operations among the workers.

Scale DynamoDB workflows

DynamoDB is a scalable NoSQL database service that provides high-performance, low-latency, and managed storage.

AWS SDK for pandas uses Ray to scale DynamoDB workflows, allowing parallel data retrieval and insertion operations. The wr.dynamodb.read_items function retrieves data from DynamoDB in parallel across multiple workers, and the results are returned as a Modin DataFrame. Similarly, data insertion into DynamoDB can be parallelized using the wr.dynamodb.put_df function.

For example, the following code inserts the Amazon Reviews DataFrame obtained from S3 Select into a DynamoDB table and then reads it back:

# Write Modin DataFrame to DynamoDB
wr.dynamodb.put_df(
    df=df_select,
    table_name=dynamodb_table_name,
    use_threads=4,
)
# Read data back from DynamoDB to Modin
    df_dynamodb = wr.dynamodb.read_items(
    table_name=dynamodb_table_name,
    allow_full_scan=True,
)

DynamoDB calls are subject to AWS service quotas. The concurrency can be limited using the use_threads parameter.

Scale Timestream workflows

Timestream is a fast, scalable, fully managed, purpose-built time series database that makes it easy to store and analyze trillions of time series data points per day. With AWS SDK for pandas, you can distribute Timestream write operations across multiple workers in your cluster.

Data can be written to Timestream using the wr.timestream.write function, which parallelizes the data insertion process for improved performance.

In this example, we use sample data from Amazon S3 loaded into a Modin DataFrame. Familiar pandas commands such as selecting columns or resetting the index are applied at scale with Modin:

# Select columns
df_timestream = df_timestream.loc[:, ["region", "az", "hostname", "measure_kind", "measure", "time"]]
# Overwrite the time column
df_timestream["time"] = datetime.now()
# Reset the index
df_timestream.reset_index(inplace=True, drop=False)
# Filter a measure
df_timestream = df_timestream[df_timestream.measure_kind == "cpu_utilization"]

The Timestream write operation is parallelized across blocks in your dataset. If the blocks are too big, you can use Ray to repartition the dataset and increase the throughput, because each block will be handled by a separate thread:

# Repartition the data into 100 blocks
df_timestream = ray.data.from_modin(df_timestream).repartition(100).to_modin()

We are now ready to insert the data into Timestream, and a final query confirms the number of rows in the table:

# Write data to Timestream
rejected_records = wr.timestream.write(
    df=df_timestream,
    database=timestream_database_name,
    table=timestream_table_name,
    time_col="time",
    measure_col="measure",
    dimensions_cols=["index", "region", "az", "hostname"],
)

# Query
df = wr.timestream.query(f'SELECT COUNT(*) AS counter FROM "{timestream_database_name}"."{timestream_table_name}"')

Clean up

To prevent unwanted charges to your AWS account, we recommend deleting the AWS resources that you used in this post:

  1. On the Amazon S3 console, empty the data from the S3 bucket with prefix glue-ray-blog-script.

S3 Bucket

  1. On the AWS CloudFormation console, delete the AdvancedSDKPandasOnGlueRay stack.

All resources will be automatically deleted with it.

Conclusion

In this post, we showcased some more advanced patterns to run your workloads using AWS SDK for pandas. In particular, these examples demonstrated how Ray is used within the library to distribute operations for several other AWS services, not just Amazon S3. When used in combination with AWS Glue for Ray, this gives you access to a fully managed environment to run at scale. We hope this solution can help with migrating your existing pandas jobs to achieve higher performance and speedups across multiple data stores on AWS.


About the Authors

Abdel JaidiAbdel Jaidi is a Senior Cloud Engineer for AWS Professional Services. He works on open-source projects focused on AWS Data & Analytics services. In his spare time, he enjoys playing tennis and hiking.

Anton KukushkinAnton Kukushkin is a Data Engineer for AWS Professional Services based in London, UK. In his spare time, he enjoys playing musical instruments.

Leon LuttenbergerLeon Luttenberger is a Data Engineer for AWS Professional Services based in Austin, Texas. He works on AWS open-source solutions that help our customers analyze their data at scale. In his spare time, he enjoys reading and traveling.

Scale AWS SDK for pandas workloads with AWS Glue for Ray

Post Syndicated from Abdel Jaidi original https://aws.amazon.com/blogs/big-data/scale-aws-sdk-for-pandas-workloads-with-aws-glue-for-ray/

AWS SDK for pandas is an open-source library that extends the popular Python pandas library, enabling you to connect to AWS data and analytics services using pandas data frames. We’ve seen customers use the library in combination with pandas for both data engineering and AI workloads. Although pandas data frames are simple to use, they have a limitation on the size of data that can be processed. Because pandas is single-threaded, jobs are bounded by the available resources. If the data you need to process is small, this won’t be a problem, and pandas makes analysis and manipulation simple, as well as interactions with many other tools that support machine learning (ML) and visualization. However, as your data size scales, you may run into problems. This can be especially frustrating if you’ve created a promising prototype that can’t be moved to production. In our work with customers, we’ve seen many projects, both in data science and data engineering, that are stuck while they wait for someone to rewrite using a big data framework such as Apache Spark.

We are excited to announce that AWS SDK for pandas now supports Ray and Modin, enabling you to scale your pandas workflows from a single machine to a multi-node environment, with no code changes. The simplest way to do this is to use AWS Glue with Ray, the new serverless option to run distributed Python code announced at AWS re:Invent 2022. AWS SDK for pandas also supports self-managed Ray on Amazon Elastic Compute Cloud (Amazon EC2).

In this post, we show you how you can use pandas to connect to AWS data and analytics services and manipulate data at scale by running on an AWS Glue with Ray job.

Overview of solution

Ray is a unified framework that enables you to scale AI and Python applications. The goal of the project is to take any Python code that’s written on a laptop and scale the workload on a cluster. This innovative framework opens the door to big data processing to a new audience. Previously, the only way to process large datasets on a cluster was to use tools such as Apache Hadoop, Apache Spark, or Apache Flink. These frameworks require additional skills because they provide their own programming model and often require languages such as Scala or Java to fully take advantage of the advanced capabilities. With Ray, you can just use Python to parallelize your code with few modifications.

Although Ray opens the door to big data processing, it’s not enough on its own to distribute pandas-specific methods. That task falls to Modin, a drop-in replacement of pandas, optimized to run in a distributed environment, such as Ray. Modin has the same API as pandas, so you can keep your code the same, but it parallelizes workloads to improve performance.

With today’s announcement, AWS SDK for pandas customers can use both Ray and Modin for their workloads. You have the option of loading data into Modin data frames, instead of regular pandas data frames. By configuring the library to use Ray and Modin, your existing data processing scripts can distribute operations end-to-end, with no code changes. AWS SDK for pandas takes care of parallelizing the read and write operations for your files across the compute cluster.

To use this feature, you can install the release candidate version of awswrangler with the ray and modin extras:

pip install "awswrangler[modin,ray]==3.0.0rc2"

Once installed, you can use the library in your code by importing it with the following statement:

import awswrangler as wr

When you run this code, the SDK for pandas looks for an environmental variable called WR_ADDRESS. If it finds it, it uses this value to send the commands to a remote cluster. If it doesn’t find it, it starts a local Ray runtime on your machine.

The following diagram shows what is happening when you run code that uses AWS SDK for pandas to read data from Amazon Simple Storage Service (Amazon S3) into a Modin data frame, perform a filtering operation, and write the data back to Amazon S3, using a multi-node cluster.

In the first phase, each node reads one or more input files and stores them in memory as blocks. During this phase, the head node builds a mapping reference that tracks the location of each block on the worker nodes. In the second phase, a filter operation is submitted to each node, creating a subset of the data. Finally, each worker node writes its blocks to Amazon S3.

It’s important to note that certain data frame operations (for example groupby or join) may result in the data being shuffled across nodes. Shuffling will also happen if you do partitioned or bucketed writes. This tends to slow down the job because data needs to move between nodes.

If you want to create your own Ray cluster on Amazon EC2, refer to the tutorial Distributing Calls on Ray Remote Cluster. The rest of this post shows you how to run AWS SDK for pandas and Modin on an AWS Glue with Ray job.

Use AWS Glue with Ray

Because AWS Glue with Ray is a fully managed environment, it’s a simple way to run jobs. Both AWS SDK for pandas and Modin are pre-loaded, you don’t need to worry about cluster management or installing the right set of dependencies, and the job auto scales with your workload. To get started, complete the following steps:

  1. Choose Launch Stack to provision an AWS CloudFormation stack in your AWS account:
    launch cloudformation stack
    Note that while in preview, AWS Glue with Ray is available in a limited set of AWS Regions.The stack takes about 3 minutes to complete. You can verify that everything was successfully deployed by checking that the CloudFormation stack shows the status CREATE_COMPLETE.
  2. Navigate to AWS Glue Studio to find an AWS Glue job named GlueRayJob with the following script.
  3. Choose Run to start the job and navigate to the Runs tab to monitor progress.

Here, we break down the script and show you what happens at each stage when we run this code on AWS Glue with Ray. First, we import the library:

import awswrangler as wr

At import, AWS SDK for pandas detects if the runtime supports Ray, and automatically initializes a Ray cluster with the default parameters. In this case, because we’re running on AWS Glue with Ray, AWS SDK for pandas automatically uses the Ray cluster with no extra configuration needed. Advanced users can override this process, however, by starting the Ray runtime before the import command.

Next, we read Amazon product data in Parquet format from Amazon S3 and load it into a distributed Modin data frame:

# Read Parquet data (1.2 Gb Parquet compressed)
df = wr.s3.read_parquet(
    path=f"s3://amazon-reviews-pds/parquet/product_category={category.title()}/",
)

Simple data transformations on the data frame are applied next. Modin data frames implement the same interface as pandas data frames, allowing you to perform familiar pandas operations at scale. First, we drop the customer_id column, then we filter for a subset of the reviews that received five-star ratings:

# Drop the customer_id column
df.drop("customer_id", axis=1, inplace=True)

# Filter reviews with 5-star rating
df5 = df[df["star_rating"] == 5]

The data is written back to Amazon S3 in Parquet format, partitioned by year and marketplace. The dataset=True argument ensures that an associated Hive table is also created in the AWS Glue metadata catalog:

# Write partitioned five-star reviews to S3 in Parquet format
wr.s3.to_parquet(
    df5,
    path=f"s3://{bucket_name}/{category}/",
    partition_cols=["year", "marketplace"],
    dataset=True, 
    database=glue_database,
    table=glue_table, 
)

Finally, a query is run in Amazon Athena, and the S3 objects resulting from this operation are read in parallel into a Modin data frame:

# Read the data back to a Modin df via Athena
df5_athena = wr.athena.read_sql_query(
    f"SELECT * FROM {glue_table}",
    database=glue_database,
    ctas_approach=False, 
    unload_approach=True, 
    workgroup=workgroup_name,
    s3_output=f"s3://{bucket_name}/unload/{category}/",
)

The Amazon CloudWatch logs of the job provide insights into the performance achieved from reading blocks in parallel in a multi-node Ray cluster.

For simplicity, this example showcased Amazon S3 and Athena APIs only, but AWS SDK for pandas supports other services, including Amazon Timestream and Amazon Redshift. For a full list of the APIs that support distribution, refer to Supported APIs.

Clean up AWS resources

To prevent unwanted charges to your AWS account, you can delete the AWS resources that you used for this example:

  1. On the Amazon S3 console, empty data from both buckets with prefix glue-ray-.
  2. On the AWS CloudFormation console, delete the SDKPandasOnGlueRay stack.

The resources created as part of the stack are automatically deleted with it.

Conclusion

In this post, we demonstrated how you can run your workloads at scale using AWS SDK for pandas. When used in combination with AWS Glue with Ray, this gives you access to a fully managed environment to distribute your Python scripts. We hope this solution can help with migrating your existing pandas jobs to achieve higher performance and speedups across multiple data stores on AWS.

For more examples, check out the tutorials in the AWS SDK for pandas documentation.


About the Authors

Abdel Jaidi is a Senior Cloud Engineer for AWS Professional Services. He works on open-source projects focused on AWS Data & Analytics services. In his spare time, he enjoys playing tennis and hiking.

Anton Kukushkin is a Data Engineer for AWS Professional Services based in London, United Kingdom. He works with AWS customers, helping them build and scale their data and analytics.

Leon Luttenberger is a Data Engineer for AWS Professional Services based in Austin, Texas. He works on AWS open-source solutions that help our customers analyze their data at scale.

Lucas Hanson is Senior Cloud Engineer for AWS Professional Services. He focuses on helping customers with infrastructure management and DevOps processes for data management solutions. Outside of work, he enjoys music production and practicing yoga.