All posts by Gonzalo Herreros

Introducing enhanced support for tagging, cross-account access, and network security in AWS Glue interactive sessions

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/introducing-enhanced-support-for-tagging-cross-account-access-and-network-security-in-aws-glue-interactive-sessions/

AWS Glue interactive sessions allow you to run interactive AWS Glue workloads on demand, which enables rapid development by issuing blocks of code on a cluster and getting prompt results. This technology is enabled by the use of notebook IDEs, such as the AWS Glue Studio notebook, Amazon SageMaker Studio, or your own Jupyter notebooks.

In this post, we discuss the following new management features recently added and how can they give you more control over the configurations and security of your AWS Glue interactive sessions:

  • Tags magic – You can use this new cell magic to tag the session for administration or billing purposes. For example, you can tag each session with the name of the billable department and later run a search to find all spending associated with this department on the AWS Billing console.
  • Assume role magic – Now you can create a session in an account different than the one you’re connected with by assuming an AWS Identity and Access Management (IAM) role owned by the other account. You can designate a dedicated role with permissions to create sessions and have other users assume it when they use sessions.
  • IAM VPC rules – You can require your users to use (or restrict them from using) certain VPCs or subnets for the sessions, to comply with your corporate policies and have control over how your data travels in the network. This feature existed for AWS Glue jobs and is now available for interactive sessions.

Solution overview

For our use case, we’re building a highly secured app and want to have users (developers, analysts, data scientists) running AWS Glue interactive sessions on specific VPCs to control how the data travels through the network.

In addition, users are not allowed to log in directly to the production account, which has the data and the connections they need; instead, users will run their own notebooks via their individual accounts and get permission to assume a specific role enabled on the production account to run their sessions. Users can run AWS Glue interactive sessions by using both AWS Glue Studio notebooks via the AWS Glue console, as well as Jupyter notebooks that run on their local machine.

Lastly, all new resources be tagged with the name of the department for proper billing allocation and cost control.

The following architecture diagram highlights the different roles and accounts involved:

  • Account A – The individual user account. The user ISBlogUser has permissions to create AWS Glue notebook servers via the AWSGlueServiceRole-notebooks role and assume a role in account B (directly or indirectly).
  • Account B – The production account that owns the GlueSessionsCreationRole role, which users assume to create AWS Glue interactive sessions in this account.

architecture

Prerequisites

In this section, we walk through the steps to set up the prerequisite resources and security configurations.

Install AWS CLI and Python library

Install and configure the AWS Command Line Interface (AWS CLI) if you don’t have it already set up. For instructions, refer to Install or update the latest version of the AWS CLI.

Optionally, if you want to use run a local notebook from your computer, install Python 3.7 or later and then install Jupyter and the AWS Glue interactive sessions kernels. For instructions, refer to Getting started with AWS Glue interactive sessions. You can then run Jupyter directly from the command line using jupyter notebook, or via an IDE like VSCode or PyCharm.

Get access to two AWS accounts

If you have access to two accounts, you can reproduce the use case described in this post. The instructions refer to account A as the user account that runs the notebook and account B as the account that runs the sessions (the production account in the use case). This post assumes you have enough administration permissions to create the different components and manage the account security roles.

If you have access to only one account, you can still follow this post and perform all the steps on that single account.

Create a VPC and subnet

We want to limit users to use AWS Glue interactive session only via a specific VPC network. First, let’s create a new VPC in account B using Amazon Virtual Private Cloud (Amazon VPC). We use this VPC connection later to enforce the network restrictions.

  1. Sign in to the AWS Management Console with account B.
  2. On the Amazon VPC console, choose Your VPCs in the navigation pane.
  3. Choose Create VPC.
  4. Enter 10.0.0.0/24 as the IP CIDR.
  5. Leave the remaining parameters as default and create your VPC.
  6. Make a note of the VPC ID (starting with vpc-) to use later.

For more information about creating VPCs, refer to Create a VPC.

  1. In the navigation pane, choose Subnets.
  2. Choose Create subnet.
  3. Select the VPC you created, enter the same CIDR (10.0.0.0/24), and create your subnet.
  4. In the navigation pane, choose Endpoints.
  5. Choose Create endpoint.
  6. For Service category, select AWS services.
  7. Search for the option that ends in s3, such as com.amazonaws.{region}.s3.
  8. In the search results, select the Gateway type option.

add gateway endpoint

  1. Choose your VPC on the drop-down menu.
  2. For Route tables, select the subnet you created.
  3. Complete the endpoint creation.

Create an AWS Glue network connection

You now need to create an AWS Glue connection that uses the VPC, so sessions created with it can meet the VPC requirement.

  1. Sign in to the console with account B.
  2. On the AWS Glue console, choose Data connections in the navigation pane.
  3. Choose Create connection.
  4. For Name, enter session_vpc.
  5. For Connection type, choose Network.
  6. In the Network options section, choose the VPC you created, a subnet, and a security group.
  7. Choose Create connection.

create connection

Account A security setup

Account A is the development account for your users (developers, analysts, data scientists, and so on). They are provided IAM users to access this account programmatically or via the console.

Create the assume role policy

The assume role policy allows users and roles in account A to assume roles in account B (the role in account B also has to allow it). Complete the following steps to create the policy:

  1. On the IAM console, choose Policies in the navigation pane.
  2. Choose Create policy.
  3. Switch to the JSON tab in the policy editor and enter the following policy (provide the account B number):{
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "sts:AssumeRole",
            "Resource": "arn:aws:iam::{account B number}:role/*"
        }
    ]
}
  1. Name the role AssumeRoleAccountBPolicy and complete the creation.

Create an IAM user

Now you create an IAM user for account A that you can use to run AWS Glue interactive sessions locally or on the console.

  1. On the IAM console, choose Users in the navigation pane.
  2. Choose Create user.
  3. Name the user ISBlogUser.
  4. Select Provide user access to the AWS Management Console.
  5. Select I want to create an IAM user and choose a password.
  6. Attach the policies AWSGlueConsoleFullAccess and AssumeRoleAccountBPolicy.
  7. Review the settings and complete the user creation.

Create an AWS Glue Studio notebook role

To start an AWS Glue Studio notebook, a role is required. Usually, the same role is used both to start a notebook and run a session. In this use case, users of account A only need permissions to run a notebook, because they will create sessions via the assumed role in account B.

  1. On the IAM console, choose Roles in the navigation pane.
  2. Choose Create role.
  3. Select Glue as the use case.
  4. Attach the policies AWSGlueServiceNotebookRole and AssumeRoleAccountBPolicy.
  5. Name the role AWSGlueServiceRole-notebooks (because the name starts with AWSGlueServiceRole, the user doesn’t need explicit PassRole permission), then complete the creation.

Optionally, you can allow Amazon CodeWhisperer to provide code suggestions on the notebook by adding the permission to the role. To do so, navigate to the role AWSGlueServiceRole-notebooks on the IAM console. On the Add permissions menu, choose Create inline policy. Use the following JSON policy and name it CodeWhispererPolicy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "codewhisperer:GenerateRecommendations",
            "Resource": "*"
        }
    ]
}

Account B security setup

Account B is considered the production account that contains the data and connections, and runs the AWS Glue data integration pipelines (using either AWS Glue sessions or jobs). Users don’t have direct access to it; they use it assuming the role created for this purpose.

To follow this post, you need two roles: one the AWS Glue service will assume to run and another that creates sessions, enforcing the VPC restriction.

Create an AWS Glue service role

To create an AWS Glue service role, complete the following steps:

  1. On the IAM console, choose Roles in the navigation pane.
  2. Choose Create role.
  3. Choose Glue for the use case.
  4. Attach the policy AWSGlueServiceRole.
  5. Name the role AWSGlueServiceRole-blog and complete the creation.

Create an AWS Glue interactive session role

This role will be used to create sessions following the VPC requirements. Complete the following steps to create the role:

  1. On the IAM console, choose Policies in the navigation pane.
  2. Choose Create policy.
  3. Switch to the JSON tab in the policy editor and enter the following code (provide your VPC ID). You can also replace the * in the policy with the full ARN of the role AWSGlueServiceRole-blog you just created, to force the notebook to only use that role when creating sessions.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Deny",
            "Action": [
                "glue:CreateSession"
            ],
            "Resource": [
                "*"
            ],
            "Condition": {
                "ForAnyValue:StringNotEquals": {
                    "glue:VpcIds": [
                        "{enter your vpc id here}"
                    ]
                }
            }
        },
        {
            "Effect": "Deny",
            "Action": [
                "glue:CreateSession"
            ],
            "Resource": [
                "*"
            ],
            "Condition": {
                "Null": {
                    "glue:VpcIds": true
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "glue:GetTags"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": "*"
        }        
    ]
}

This policy complements the AWSGlueServiceRole you attached before and restricts the session creation based on the VPC. You could also restrict the subnet and security group in a similar way using conditions for the resources glue:SubnetIds and glue:SecurityGroupIds respectively.

In this case, the sessions creation requires a VPC, which has to be in the list of IDs listed. If you need to just require any valid VPC to be used, you can remove the first statement and leave the one that denies the creation when the VPC is null.

  1. Name the policy CustomCreateSessionPolicy and complete the creation.
  2. Choose Roles in the navigation pane.
  3. Choose Create role.
  4. Select Custom trust policy.
  5. Replace the trust policy template with the following code (provide your account A number):
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": [
                      "arn:aws:iam::{account A}:role/AWSGlueServiceRole-notebooks", 
                      "arn:aws:iam::{account A}:user/ISBlogUser"
                    ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

This allows the role to be assumed directly by the user when using a local notebook and also when using an AWS Glue Studio notebook with a role.

  1. Attach the policies AWSGlueServiceRole and CustomCreateSessionPolicy (which you created on the previous step, so you might need to refresh for them to be listed).
  2. Name the role GlueSessionCreationRole and complete the role creation.

Create the Glue interactive session in the VPC, with assumed role and tags

Now that you have the accounts, roles, VPC, and connection ready, you use them to meet the requirements. You start a new notebook using account A, which assumes the role of account B to create a session in the VPC, and tag it with the department and billing area.

Start a new notebook

Using account A, start a new notebook. You may use either of the following options.

Option 1: Create an AWS Glue Studio notebook

The first option is to create an AWS Glue Studio notebook:

  1. Sign in to the console with account A and the ISBlogUser user.
  2. On the AWS Glue console, choose Notebooks in the navigation pane under ETL jobs.
  3. Select Jupyter Notebook and choose Create.
  4. Enter a name for your notebook.
  5. Specify the role AWSGlueServiceRole-notebooks.
  6. Choose Start notebook.

Option 2: Create a local notebook

Alternatively, you can create a local notebook. Before you start the process that runs Jupyter (or if you run it indirectly, then the IDE that runs it), you need to set the IAM ID and key for the user ISBlogUser, either using aws configure on the command line or setting the values as environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY for the user ID and secret key, respectively. Then create a new Jupyter notebook and select the kernel Glue PySpark.

Start a session from the notebook

After you start the notebook, select the first cell and add four new empty code cells. If you are using an AWS Glue Studio notebook, the notebook already contains some prepopulated cells as examples; we don’t use those sample cells in this post.

  1. In the first cell, enter the following magic configuration with the session creation role ARN, using the ID of account B:
# Configure the role we assume for creating the sessions
# Tip: assume_role is a cell magic (meaning it needs its own cell)
%%assume_role
"arn:aws:iam::{account B}:role/GlueSessionCreationRole"
  1. Run the cell to set up that configuration, either by choosing the button on the toolbar or pressing Shift + Enter.

It should confirm the role was assumed correctly. Now when the session is launched, it will be done by this role. This allowed you to use a role from a different account to run a session on that account.

  1. In the second cell, enter sample tags like the following and run the cell in the same way:
# Set a tag to associate the session with billable department
# Tip: tags is a cell magic (meaning it needs its own cell)
%%tags
{'team':'analytics', 'billing':'Data-Platform'}
  1. In the third cell, enter the following sample configuration (provide the role ARN with account B) and run the cell to set up the configuration:
# Set the configuration of your sessions using magics 
# Tip: non-cell magics can share the same cell 
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5
%iam_role arn:aws:iam::{account B}:role/AWSGlueServiceRole-blog

Now the session is configured but hasn’t started yet because you didn’t run any Python code.

  1. In the fourth empty cell, enter the following code to set up the objects required to work with AWS Glue and run the cell:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

It should fail with a permission error saying that there is an explicit deny policy activated. This is the VPC condition you set before. By default, the session doesn’t use a VPC, so this is why it’s failing.

notebook error

You can solve the error by assigning the connection you created before, so the session runs inside the VPC authorized.

  1. In the third cell, add the %connections magic with the value session_vpc.

The session needs to run in the same Region in which the connection is defined. If that’s not the same as the notebook Region, you can explicitly configure the session Region using the %region magic.

notebook cells

  1. After you have added the new config settings, run the cell again so the magics take effect.
  2. Run the fourth cell again (the one with the code).

This time, it should start the session and after a brief period confirm it has been created correctly.

  1. Add a new cell with the following content and run it: %status

This will display the configuration and other information about the session that the notebook is using, including the tags set before.

status result

You started a notebook in account A and used a role from account B to create a session, which uses the network connection so it runs in the required VPC. You also tagged the session to be able to easily identify it later.

In the next section, we discuss more ways to monitor sessions using tags.

Interactive session tags

Before tags were supported, if you wanted to identify the purpose of sessions running the account, you had to use the magic %session_id_prefix to name your session with something meaningful.

Now, with the new tags magic, you can use more sophisticated ways to categorize your sessions.

In the previous section, you tagged the session with a team and billing department. Let’s imagine now you are an administrator checking the sessions that different teams run in an account and Region.

Explore tags via the AWS CLI

On the command line where you have the AWS CLI installed, run the following command to list the sessions running in the account and Regions configured (use the Region and max results parameters if needed):

aws glue list-sessions

You also have the option to just list sessions that have a specific tag:

aws glue list-sessions --tags team=analytics

You can also list all the tags associated with a specific session with the following command. Provide the Region, account, and session ID (you can get it from the list-sessions command):

aws glue get-tags --resource-arn arn:aws:glue:{region}:{account}:session/{session Id}

Explore tags via the AWS Billing console

You can also use tags to keep track of cost and do more accurate cost assignment in your company. After you have used a tag in your session, the tag will become available for billing purposes (it can take up to 24 hours to be detected).

  1. On the AWS Billing console, choose Cost allocation tags under Billing in the navigation pane.
  2. Search for and select the tags you used in the session: “team” and “billing”.
  3. Choose Activate.

This activation can take up to 24 hours additional hours until the tag is applied for billing purposes. You only have to do this one time when you start using a new tag on an account.

cost allocation tags

  1. After the tags have been correctly activated and applied, choose Cost explorer under Cost Management in the navigation pane.
  2. In the Report parameters pane, for Tag, choose one of the tags you activated.

This adds a drop-down menu for this tag, where you can choose some or all of the tag values to use.

  1. Make your selection and choose Apply to use the filter on the report.

bill barchart

Clean up

Run the %stop_session magic in a cell to stop the session and avoid further charges. If you no longer need the notebook, VPC, or roles you created, you can delete them as well.

Conclusion

In this post, we showed how to use these new features in AWS Glue to have more control over your interactive sessions for management and security. You can enforce network restrictions, allow users from other accounts to use your session, and use tags to help you keep track of the session usage and cost reports. These new features are already available, so you can start using them now.


About the authors

Gonzalo Herreros
Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.
Gal Heyne
Gal Heyne is a Technical Product Manager on the AWS Glue team.

Use AWS Glue DataBrew recipes in your AWS Glue Studio visual ETL jobs

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/use-aws-glue-databrew-recipes-in-your-aws-glue-studio-visual-etl-jobs/

AWS Glue Studio is now integrated with AWS Glue DataBrew. AWS Glue Studio is a graphical interface that makes it easy to create, run, and monitor extract, transform, and load (ETL) jobs in AWS Glue. DataBrew is a visual data preparation tool that enables you to clean and normalize data without writing any code. The over 200 transformations it provides are now available to be used in an AWS Glue Studio visual job.

In DataBrew, a recipe is a set of data transformation steps that you can author interactively in its intuitive visual interface. In this post, you’ll see how to use build a recipe in DataBrew and then apply it as part of an AWS Glue Studio visual ETL job.

Existing DataBrew users will also benefit from this integration—you can now run your recipes as part of a larger visual workflow with all the other components AWS Glue Studio provides, in addition to being able to use advanced job configuration and the latest AWS Glue engine version.

This integration brings distinct benefits to the existing users of both tools:

  • You have a centralized view in AWS Glue Studio of the overall ETL diagram, end to end
  • You can interactively define a recipe, seeing values, statistics, and distribution on the DataBrew console, then reuse that tested and versioned processing logic in AWS Glue Studio visual jobs
  • You can orchestrate multiple DataBrew recipes in an AWS Glue ETL job or even multiple jobs using AWS Glue workflows
  • DataBrew recipes can now use AWS Glue job features such as bookmarks for incremental data processing, automatic retries, auto scale, or grouping small files for greater efficiency

Solution overview

In our fictitious use case, the requirement is to clean up a synthetic medical claims dataset created for this post, which has some data quality issues introduced on purpose to demonstrate the DataBrew capabilities on data preparation. Then the claims data is ingested into the catalog (so it’s visible to analysts), after enriching it with some relevant details about the corresponding medical providers coming from a separate source.

The solution consists of an AWS Glue Studio visual job that reads two CSV files with claims and providers, respectively. The job applies a recipe of the first one to address the quality issues, select columns from the second one, join both datasets, and finally store the result on Amazon Simple Storage Service (Amazon S3), creating a table on the catalog so the output data can be used by other tools like Amazon Athena.

Create a DataBrew recipe

Start by registering the data store for the claims file. This will allow you to build the recipe in its interactive editor using the actual data so you can evaluate the result of the transformations as you define them.

  1. Download the claims CSV file using the following link: alabama_claims_data_Jun2023.csv.
  2. On the DataBrew console, choose Datasets in the navigation pane, then choose Connect new dataset.
  3. Choose the option File upload.
  4. For Dataset name, enter Alabama claims.
  5. For Select a file to upload, choose the file you just downloaded on your computer.
    Add dataset
  6. For Enter S3 destination, enter or browse to a bucket in your account and Region.
  7. Leave the rest of the options by default (CSV separated with comma and with header) and complete the dataset creation.
  8. Choose Project in the navigation pane, then choose Create project.
  9. For Project name, name it ClaimsCleanup.
  10. Under Recipe details, for Attached recipe, choose Create new recipe, name it ClaimsCleanup-recipe, and choose the Alabama claims dataset you just created.Add project
  11. Select a role suitable for DataBrew or create a new one, and complete the project creation.

This will create a session using a configurable subset of the data. After it has initialized the session, you can notice some of the cells have invalid or missing values.

Loaded project

In addition to the missing values in the columns Diagnosis Code, Claim Amount, and Claim Date, some values in the data have some extra characters: Diagnosis Code values are sometimes prefixed with “code ” (space included), and Procedure Code values are sometimes followed by single quotes.
Claim Amount values will likely be used for some calculations, so convert to number, and Claim Data should be converted to date type.

Now that we identified the data quality issues to address, we need to decide how to deal with each case.
There are multiple ways you can add recipe steps, including using the column context menu, the toolbar on the top, or from the recipe summary. Using the last method, you can search for the indicated step type to replicate the recipe created in this post.

Add step searchbox

Claim Amount is essential for this use case, and the decision is to remove such rows.

  1. Add the step Remove missing values.
  2. For Source column, choose Claim Amount.
  3. Leave the default action Delete rows with missing values and choose Apply to save it.
    Preview missing values

The view is now updated to reflect the step application and the rows with missing amounts are no longer there.

Diagnosis Code can be empty so this is accepted, but in the case of Claim Date, we want to have a reasonable estimation. The rows in the data are sorted in chronological order, so you can impute missing dates using the previews valid value from the preceding rows. Assuming every day has claims, the largest error would be assigning it to the preview day if it were the first claim that day missing the date; for illustration purposes, let’s consider that potential error acceptable.

First, convert the column from string to date type.

  1. Add the step Change type.
  2. Choose Claim Date as the column and date as the type, then choose Apply.
    Change type to date
  3. Now to do the imputation of missing dates, add the step Fill or impute missing values.
  4. Select Fill with last valid value as the action and choose Claim Date as the source.
  5. Choose Preview changes to validate it, then choose Apply to save the step.
    Preview imputation

So far, your recipe should have three steps, as shown in the following screenshot.

Steps so far

  1. Next, add the step Remove quotation marks.
  2. Choose the Procedure Code column and select Leading and trailing quotation marks.
  3. Preview to verify it has the desired effect and apply the new step.
    Preview remove quotes
  4. Add the step Remove special characters.
  5. Choose the Claim Amount column and to be more specific, select Custom special characters and enter $ for Enter custom special characters.
    Preview remove dollar sign
  6. Add a Change type step on the column Claim Amount and choose double as the type.
    Chane type to double
  7. As the last step, to remove the superfluous “code ” prefix, add a Replace value or pattern step.
  8. Choose the column Diagnosis Code, and for Enter custom value, enter code (with a space at the end).
    Preview remove code

Now that you have addressed all data quality issues identified on the sample, publish the project as a recipe.

  1. Choose Publish in the Recipe pane, enter an optional description, and complete the publication.
    Recipe steps

Each time you publish, it will create a different version of the recipe. Later, you will be able to choose which version of the recipe to use.

Create a visual ETL job in AWS Glue Studio

Next, you create the job that uses the recipe. Complete the following steps:

  1. On the AWS Glue Studio console, choose Visual ETL in the navigation pane.
  2. Choose Visual with a blank canvas and create the visual job.
  3. At the top of the job, replace “Untitled job” with a name of your choice.
  4. On the Job Details tab, specify a role that the job will use.
    This needs to be an AWS Identity and Access Management (IAM) role suitable for AWS Glue with permissions to Amazon S3 and the AWS Glue Data Catalog. Note that the role used before for DataBrew is not usable for run jobs, so won’t be listed on the IAM Role drop-down menu here.
    Job details
    If you used only DataBrew jobs before, notice that in AWS Glue Studio, you can choose performance and cost settings, including worker size, auto scaling, and Flexible Execution, as well as use the latest AWS Glue 4.0 runtime and benefit from the significant performance improvements it brings. For this job, you can use the default settings, but reduce the requested number of workers in the interest of frugality. For this example, two workers will do.
  5. On the Visual tab, add an S3 source and name it Providers.
  6. For S3 URL, enter s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv.
    S3 Source
  1. Select the format as CSV and choose Infer schema.
    Now the schema is listed on the Output schema tab using the file header.
    Input schema

In this use case, the decision is that not all columns in the providers dataset are needed, so we can discard the rest.

  1. With the Providers node selected, add a Drop Fields transform (if you didn’t select the parent node, it won’t have one; in that case, assign the node parent manually).
  2. Select all the fields after Provider Zip Code.
    Drop fields

Later, this data will be joined by the claims for Alabama state using the provider; however, that second dataset doesn’t have the state specified. We can use knowledge of the data to optimize the join by filtering the data we really need.

  1. Add a Filter transform as a child of Drop Fields.
  2. Name it Alabama providers and add a condition that the state must match AL.
    Filter providers
  3. Add the second source (a new S3 source) and name it Alabama claims.
  4. To enter the S3 URL, open DataBrew on a separate browser tab, choose Datasets in the navigation pane, and on the table copy the location shown on the table for Alabama claims (copy the text starting with s3://, not the http link associated). Then back on the visual job, paste it as S3 URL; if it is correct, you will see in the Output schema tab the data fields listed.
  5. Select CSV format and infer the schema like you did with the other source.
  6. As a child of this source, search in the Add nodes menu for recipe and choose Data Preparation Recipe.
    Add recipe
  7. In this new node’s properties, give it the name Claim cleanup recipe and choose the recipe and version you published before.
  8. You can review the recipe steps here and use the link to DataBrew to make changes if needed.
    Recipe details
  9. Add a Join node and select both Alabama providers and Claim cleanup recipes as the parent.
  10. Add a join condition equaling the provider ID from both sources.
  11. As the last step, add an S3 node as a target (note the first one listed when you search is the source; make sure you select the version that is listed as the target).
  12. In the node configuration, leave the default format JSON and enter an S3 URL on which the job role has permission to write.

In addition, make the data output available as a table in the catalog.

  1. In the Data Catalog update options section, select the second option Create a table in the Data Catalog and on subsequent runs, update the schema and add new partitions, then select a database on which you have permission to create tables.
  2. Assign alabama_claims as the name and choose Claim Date as the partition key (this is for illustration purposes; a tiny table like this doesn’t really need partitions if further data won’t be added later).
    Join
  3. Now you can save and run the job.
  4. On the Runs tab, you can keep track of the process and see detailed job metrics using the job ID link.

The job should take a few minutes to complete.

  1. When the job is complete, navigate to the Athena console.
  2. Search for the table alabama_claims in the database you selected and, using the context menu, choose Preview Table, which will run a simple SELECT * SQL statement on the table.

Athena results

You can see in the result of the job that the data was cleaned by the DataBrew recipe and enriched by the AWS Glue Studio join.

Apache Spark is the engine that runs the jobs created on AWS Glue Studio. Using the Spark UI on the event logs it produces, you can view insights about the job plan and run, which can help you understand how your job is performing and potential performance bottlenecks. For instance, for this job on a large dataset, you could use it to compare the impact of filtering explicitly the provider state before doing the join, or identify if you can benefit from adding an Autobalance transform to improve parallelism.

By default, the job will store the Apache Spark event logs under the path s3://aws-glue-assets-<your account id>-<your region name>/sparkHistoryLogs/. To view the jobs, you have to install a History server using one of the methods available.

SparkUI

Clean up

If you no longer need this solution, you can delete the files generated on Amazon S3, the table created by the job, the DataBrew recipe, and the AWS Glue job.

Conclusion

In this post, we showed how you can use AWS DataBrew to build a recipe using the provided interactive editor and then use the published recipe as part of an AWS Glue Studio visual ETL job. We included some examples of common tasks that are required when doing data preparation and ingesting data into AWS Glue Catalog tables.

This example used a single recipe in the visual job, but it’s possible to use multiple recipes at different parts of the ETL process, as well as reusing the same recipe on multiple jobs.

These AWS Glue solutions allow you to effectively create advanced ETL pipelines that are straightforward to build and maintain, all without writing any code. You can start creating solutions that combine both tools today.


About the authors

Mikhail Smirnov is a Sr. Software Dev Engineer on the AWS Glue team and part of the AWS Glue DataBrew development team. Outside of work, his interests include learning to play guitar and traveling with his family.

Gonzalo Herreros is a Sr. Big Data Architect on the AWS Glue team. Based on Dublin, Ireland, he helps customers succeed with big data solutions based on AWS Glue. On his spare time, he enjoys board games and cycling.

Dive deep into AWS Glue 4.0 for Apache Spark

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/dive-deep-into-aws-glue-4-0-for-apache-spark/

Deriving insight from data is hard. It’s even harder when your organization is dealing with silos that impede data access across different data stores. Seamless data integration is a key requirement in a modern data architecture to break down data silos. AWS Glue is a serverless data integration service that makes data preparation simpler, faster, and cheaper. You can discover and connect to over 70 diverse data sources, manage your data in a centralized data catalog, and create, run, and monitor data integration pipelines to load data into your data lakes and your data warehouses. AWS Glue for Apache Spark takes advantage of Apache Spark’s powerful engine to process large data integration jobs at scale.

AWS Glue released version 4.0 at AWS re:Invent 2022, which includes many upgrades, such as the new optimized Apache Spark 3.3.0 runtime (3.5 times performance improvement on average over open-source Apache Spark 3.3.0), Python 3.10, and a new enhanced Amazon Redshift connector (10 times performance improvement on average over the previous version).

In this post, we discuss the main benefits that this new AWS Glue version brings and how it can help you build better data integration pipelines.

Spark upgrade highlights

The new version of Spark included in AWS Glue 4.0 brings a number of valuable features, which we highlight in this section. For more details, refer to Spark Release 3.3.0 and Spark Release 3.2.0.

Support for the pandas API

Support for the pandas API allows users familiar with the popular Python library to start writing distributed extract, transform, and load (ETL) jobs without having to learn a new framework API. We discuss this in more detail later in this post.

Python UDF profiling

With Python UDF profiling, now you can profile regular and pandas user-defined functions (UDFs). Calling show_profiles() on the SparkContext to get details about the Python run was added on Spark 1 for RDD; now it also works for DataFrame Python UDFs and provides valuable information about how many calls to the UDF are made and how much time is spent on it.

For instance, to illustrate how the profiler measures time, the following example is the profile of a pandas UDF that processes over a million rows (but the pandas UDF only needs 112 calls) and sleeps for 1 second. We can use the command spark.sparkContext.show_profiles(), as shown in the following screenshot.

Python UDF Profiling

Pushdown filters

Pushdown filters are used in more scenarios, such as aggregations or limits. The upgrade also offers support for Bloom filters and skew optimization. These improvements allow for handling larger datasets by reading less data and processing it more efficiently. For more information, refer to Spark Release 3.3.0.

ANSI SQL

Now you can ask SparkSQL to follow the ANSI behavior on those points that it traditionally differed from the standard. This helps users bring their existing SQL skills and start producing value on AWS Glue faster. For more information, refer to ANSI Compliance.

Adaptive query execution by default

Adaptive query execution (AQE) by default helps optimize Spark SQL performance. You can turn AQE on and off by using spark.sql.adaptive.enabled as an umbrella configuration. Since AWS Glue 4.0, it’s enabled by default, so you no longer need to enable this explicitly.

Improved error messages

Improved error messages provide better context and easy resolution. For instance, if you have a division by zero in a SQL statement on ANSI mode, previously you would get just a Java exception: java.lang.ArithmeticException: divide by zero. Depending on the complexity and number of queries, the cause might not be obvious and might require some reruns with trial and error until it’s identified.

On the new version, you get a much more revealing error:

Caused by: org.apache.spark.SparkArithmeticException: Division by zero. 
Use `try_divide` to tolerate divisor being 0 and return NULL instead. 
If necessary set "spark.sql.ansi.enabled" to "false" (except for ANSI interval type) to bypass this error.
== SQL(line 1, position 8) ==
select sum(cost)/(select count(1) from items where cost > 100) from items
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Not only does it show the query that caused the issue, but it also indicates the specific operation where the error occurred (the division in this case). In addition, it provides some guidance on resolving the issue.

New pandas API on Spark

The Spark upgrade brings a new, exciting feature, which is the chance to use your existing Python pandas framework knowledge in a distributed and scalable runtime. This lowers the barrier of entry for teams without previous Spark experience, so they can start delivering value quickly and make the most of the AWS Glue for Spark runtime.

The new API provides a pandas DataFrame-compatible API, so you can use existing pandas code and migrate it to AWS Glue for Spark changing the imports, although it’s not 100% compatible.

If you just want to migrate existing pandas code to run on pandas on Spark, you could replace the import and test:

# Replace pure pandas 
import pandas as pd
# With pandas API on Spark
import pyspark.pandas as pd

In some cases, you might want to use multiple implementations on the same script, for instance because a feature is still not available on the pandas API for Spark or the data is so small that some operations are more efficient if done locally rather than distributed. In that situation, to avoid confusion, it’s better to use a different alias for the pandas and the pandas on Spark module imports, and to follow a convention to name the different types of DataFrames, because it has implications in performance and features, for instance, pandas DataFrame variables starting with pdf_, pandas on Spark as psdf_, and standard Spark as sdf_ or just df_.

You can also convert to a standard Spark DataFrame calling to_spark(). This allows you to use features not available on pandas such as writing directly to catalog tables or using some Spark connectors.

The following code shows an example of combining the different types of APIs:

# The job has the parameter "--additional-python-modules":"openpyxl", 
#  the Excel library is not provided by default

import pandas as pd
# pdf is a pure pandas DF which resides in the driver memory only
pdf = pd.read_excel('s3://mybucket/mypath/MyExcel.xlsx', index_col=0)

import pyspark.pandas as ps
# psdf behaves like a pandas df but operations on it will be distributed among nodes
psdf = ps.from_pandas(pdf)
means_series = psdf.mean()

# Convert to a dataframe of column names and means
#  pandas on Spark series don't allow iteration so we convert to pure pandas
#  on a big dataset this could cause the driver to be overwhelmed but not here
# We reset to have the index labels as a columns to use them later
pdf_means = pd.DataFrame(means_series.to_pandas()).reset_index(level=0)
# Set meaningful column names
pdf_means.columns = ["excel_column", "excel_average"]

# We want to use this to enrich a Spark DF representing a huge table
#  convert to standard Spark DF so we can use the start API
sdf_means = ps.from_pandas(pdf_means).to_spark()

sdf_bigtable = spark.table("somecatalog.sometable")
sdf_bigtable.join(sdf_means, sdf_bigtable.category == sdf_means.excel_column)

Improved Amazon Redshift connector

A new version of the Amazon Redshift connector brings many improvements:

  • Pushdown optimizations
  • Support for reading SUPER columns
  • Writing allowed based on column names instead of position
  • An optimized serializer to increase performance when reading from Amazon Redshift
  • Other minor improvements like trimming pre- and post-actions and handling numeric time zone formats

This new Amazon Redshift connector is built on top of an existing open-source connector project and offers further enhancements for performance and security, helping you gain up to 10 times faster application performance. It accelerates AWS Glue jobs when reading from Amazon Redshift, and also enables you to run data-intensive workloads more reliably. For more details, see Moving data to and from Amazon Redshift. To learn more about how to use it, refer to New – Amazon Redshift Integration with Apache Spark.

When you use the new Amazon Redshift connector on an AWS Glue DynamicFrame, use the existing methods: GlueContext.create_data_frame and GlueContext.write_data_frame.

When you use the new Amazon Redshift connector on a Spark DataFrame, use the format io.github.spark_redshift_community.spark.redshift, as shown in the following code snippet:

df = spark.read.format("io.github.spark_redshift_community.spark.redshift") \
    .option("url", url) \
    .option("dbtable", dbtable) \
    .option("tempdir", redshiftTmpDir) \
    .option("user", user) \
    .option("password", password) \
    .option("aws_iam_role", aws_iam_role) \
    .load()

Other upgrades and improvements

The following are updates and improvements in the dependent libraries:

  • Spark 3.3.0-amzn-0
  • Hadoop 3.2.1-amzn-8
  • Hive 2.39-amzn-2
  • Parquet 1.12
  • Log4j 2
  • Python 3.10
  • Arrow 7.0.0
  • Boto3 1.26
  • EMRFS 2.53.0
  • AWS Glue Data Catalog client 3.6.0
  • New versions of the provided JDBC drivers:
    • MySQL 8.0.23
    • PostgreSQL 42.3.6
    • Microsoft SQL Server 9.4.0
    • Oracle 21.7
    • MongoDB 4.7.2
    • Amazon Redshift redshift-jdbc42-2.1.0.9
  • Integrated and upgraded plugins to popular table formats:
    • Iceberg 1.0.0
    • Hudi 0.12.1
    • Delta Lake 2.1.0

To learn more, refer to the appendices in Migrating AWS Glue jobs to AWS Glue version 4.0.

Improved performance

In addition to all the new features, AWS Glue 4.0 brings performance improvements at lower cost. In summary, AWS Glue 4.0 with Amazon Simple Storage Service (Amazon S3) is 2.7 times faster than AWS Glue 3.0, and AWS Glue 4.0 with Amazon Redshift is 7.1 times faster than AWS Glue 3.0. In the following sections, we provide details about AWS Glue 4.0 performance results with Amazon S3 and Amazon Redshift.

Amazon S3

The following chart shows the total job runtime for all queries (in seconds) in the 3 TB query dataset between AWS Glue 3.0 and AWS Glue 4.0. The TPC-DS dataset is located in an S3 bucket in Parquet format, and we used 30 G.2X workers in AWS Glue. We observed that our TPC-DS tests on Amazon S3 had a total job runtime on AWS Glue 4.0 that was 2.7 times faster than that on AWS Glue 3.0. Detailed instructions are explained in the appendix of this post.

. AWS Glue 3.0 AWS Glue 4.0
Total Query Time 5084.94274 1896.1904
Geometric Mean 14.00217 10.09472

TPC-DS benchmark result with S3

Amazon Redshift

The following chart shows the total job runtime for all queries (in seconds) in the 1 TB query dataset between AWS Glue 3.0 and AWS Glue 4.0. The TPC-DS dataset is located in a five-node ra3.4xlarge Amazon Redshift cluster, and we used 150 G.2X workers in AWS Glue. We observed that our TPC-DS tests on Amazon Redshift had a total job runtime on AWS Glue 4.0 that was 7.1 times faster than that on AWS Glue 3.0.

. AWS Glue 3.0 AWS Glue 4.0
Total Query Time 22020.58 3075.96633
Geometric Mean 142.38525 8.69973

TPC-DS benchmark result with Redshift

Get started with AWS Glue 4.0

You can start using AWS Glue 4.0 via AWS Glue Studio, the AWS Glue console, the latest AWS SDK, and the AWS Command Line Interface (AWS CLI).

To start using AWS Glue 4.0 in AWS Glue Studio, open the AWS Glue job and on the Job details tab, choose the version Glue 4.0 – Supports spark 3.3, Scala 2, Python 3.

Glue version 4.0 in Glue Studio

To migrate your existing AWS Glue jobs from AWS Glue 0.9, 1.0, 2.0, and 3.0 to AWS Glue 4.0, see Migrating AWS Glue jobs to AWS Glue version 4.0.

The AWS Glue 4.0 Docker images are now available on Docker Hub, so you can use them to develop locally for the new version. Refer to Develop and test AWS Glue version 3.0 and 4.0 jobs locally using a Docker container for further details.

Conclusion

In this post, we discussed the main upgrades provided by the new 4.0 version of AWS Glue. You can already start writing new jobs on that version and benefit from all the improvements, as well as migrate your existing jobs.


About the authors

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team. He’s been an Apache Spark enthusiast since version 0.8. In his spare time, he likes playing board games.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Bo Li is a Senior Software Development Engineer on the AWS Glue team. He is devoted to designing and building end-to-end solutions to address customer’s data analytic and processing needs with cloud-based data-intensive technologies.

Rajendra Gujja is a Senior Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and everything and anything about the data.

Savio Dsouza is a Software Development Manager on the AWS Glue team. His team works on solving challenging distributed systems problems for data integration on Glue platform for customers using Apache Spark.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team works on distributed systems for building data lakes on AWS and simplifying integration with data warehouses for customers using Apache Spark.


Appendix: TPC-DS benchmark on AWS Glue against a dataset on Amazon S3

To perform a TPC-DS benchmark on AWS Glue against a dataset in an S3 bucket, you need to copy the TPC-DS dataset into your S3 bucket. These instructions are based on emr-spark-benchmark:

  1. Create a new S3 bucket in your test account if needed. In the following code, replace $YOUR_S3_BUCKET with your S3 bucket name. We suggest you export YOUR_S3_BUCKET as an environment variable:
export YOUR_S3_BUCKET=<Your bucket name>
aws s3 mb s3://$YOUR_S3_BUCKET
  1. Copy the TPC-DS source data as input to your S3 bucket. If it’s not exported as an environment variable, replace $YOUR_S3_BUCKET with your S3 bucket name:
aws s3 sync s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/ s3://$YOUR_S3_BUCKET/blog/BLOG_TPCDS-TEST-3T-partitioned/
  1. Build the benchmark application following the instructions in Steps to build spark-benchmark-assembly application.

For your convenience, we have provided the sample application JAR file spark-benchmark-assembly-3.3.0.jar, which we built for AWS Glue 4.0.

  1. Upload the spark-benchmar-assembly JAR file to your S3 bucket.
  2. In AWS Glue Studio, create a new AWS Glue job through the script editor:
    1. Under Job details, for Type, choose Spark.
    2. For Glue version, choose Glue 4.0 – Supports spark 3.3, Scala 2, Python 3.
    3. For Language, choose Scala.
    4. For Worker type, choose your preferred worker type.
    5. For Requested number of workers, choose your preferred number.
    6. Under Advanced properties, for Dependent JARs path, enter your S3 path for the spark-benchmark-assembly JAR file.
    7. For Script, enter the following code snippet:
import com.amazonaws.eks.tpcds.BenchmarkSQL

object GlueApp {
  def main(sysArgs: Array[String]): Unit = {
    BenchmarkSQL.main(
      Array(
        "s3://YOUR_S3_BUCKET/blog/BLOG_TPCDS-TEST-3T-partitioned",
        "s3://YOUR_S3_BUCKET/blog/GLUE_TPCDS-TEST-3T-RESULT/", 
        "/opt/tpcds-kit/tools", 
        "parquet", 
        "3000", 
        "3", 
        "false", 
        "q1-v2.4,q10-v2.4,q11-v2.4,q12-v2.4,q13-v2.4,q14a-v2.4,q14b-v2.4,q15-v2.4,q16-v2.4,q17-v2.4,q18-v2.4,q19-v2.4,q2-v2.4,q20-v2.4,q21-v2.4,q22-v2.4,q23a-v2.4,q23b-v2.4,q24a-v2.4,q24b-v2.4,q25-v2.4,q26-v2.4,q27-v2.4,q28-v2.4,q29-v2.4,q3-v2.4,q30-v2.4,q31-v2.4,q32-v2.4,q33-v2.4,q34-v2.4,q35-v2.4,q36-v2.4,q37-v2.4,q38-v2.4,q39a-v2.4,q39b-v2.4,q4-v2.4,q40-v2.4,q41-v2.4,q42-v2.4,q43-v2.4,q44-v2.4,q45-v2.4,q46-v2.4,q47-v2.4,q48-v2.4,q49-v2.4,q5-v2.4,q50-v2.4,q51-v2.4,q52-v2.4,q53-v2.4,q54-v2.4,q55-v2.4,q56-v2.4,q57-v2.4,q58-v2.4,q59-v2.4,q6-v2.4,q60-v2.4,q61-v2.4,q62-v2.4,q63-v2.4,q64-v2.4,q65-v2.4,q66-v2.4,q67-v2.4,q68-v2.4,q69-v2.4,q7-v2.4,q70-v2.4,q71-v2.4,q72-v2.4,q73-v2.4,q74-v2.4,q75-v2.4,q76-v2.4,q77-v2.4,q78-v2.4,q79-v2.4,q8-v2.4,q80-v2.4,q81-v2.4,q82-v2.4,q83-v2.4,q84-v2.4,q85-v2.4,q86-v2.4,q87-v2.4,q88-v2.4,q89-v2.4,q9-v2.4,q90-v2.4,q91-v2.4,q92-v2.4,q93-v2.4,q94-v2.4,q95-v2.4,q96-v2.4,q97-v2.4,q98-v2.4,q99-v2.4,ss_max-v2.4", 
        "true"
      )
    )
  }
}
  1. Save and run the job.

The result file will be stored under s3://YOUR_S3_BUCKET/blog/GLUE_TPCDS-TEST-3T-RESULT/.

Ten new visual transforms in AWS Glue Studio

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/ten-new-visual-transforms-in-aws-glue-studio/

AWS Glue Studio is a graphical interface that makes it easy to create, run, and monitor extract, transform, and load (ETL) jobs in AWS Glue. It allows you to visually compose data transformation workflows using nodes that represent different data handling steps, which later are converted automatically into code to run.

AWS Glue Studio recently released 10 more visual transforms to allow creating more advanced jobs in a visual way without coding skills. In this post, we discuss potential uses cases that reflect common ETL needs.

The new transforms that will be demonstrated in this post are: Concatenate, Split String, Array To Columns, Add Current Timestamp, Pivot Rows To Columns, Unpivot Columns To Rows, Lookup, Explode Array Or Map Into Columns, Derived Column, and Autobalance Processing.

Solution overview

In this use case, we have some JSON files with stock option operations. We want to make some transformations before storing the data to make it easier to analyze, and we also want to produce a separate dataset summary.

In this dataset, each row represents a trade of option contracts. Options are financial instruments that provide the right—but not the obligation—to buy or sell stock shares at a fixed price (called  strike price) before a defined expiration date.

Input data

The data follows the following schema:

  • order_id – A unique ID
  • symbol – A code generally based on a few letters to identify the corporation that emits the underlying stock shares
  • instrument – The name that identifies the specific option being bought or sold
  • currency – The ISO currency code in which the price is expressed
  • price – The amount that was paid for the purchase of each option contract (on most exchanges, one contract allows you to buy or sell 100 stock shares)
  • exchange – The code of the exchange center or venue where the option was traded
  • sold – A list of the number of contracts that where allocated to fill the sell order when this is a sell trade
  • bought – A list of the number of contracts that where allocated to fill the buy order when this is buy trade

The following is a sample of the synthetic data generated for this post:

{"order_id": 1679931512485, "symbol": "AMZN", "instrument": "AMZN MAR 24 23 102 PUT", "currency": "usd", "price": 17.18, "exchange": "EDGX", "bought": [18, 38]}
{"order_id": 1679931512486, "symbol": "BMW.DE", "instrument": "BMW.DE MAR 24 23 96 PUT", "currency": "eur", "price": 2.98, "exchange": "XETR", "bought": [28]}
{"order_id": 1679931512487, "symbol": "BMW.DE", "instrument": "BMW.DE APR 28 23 101 CALL", "currency": "eur", "price": 14.71, "exchange": "XETR", "sold": [9, 59, 54]}
{"order_id": 1679931512489, "symbol": "JPM", "instrument": "JPM JUN 30 23 140 CALL", "currency": "usd", "price": 11.83, "exchange": "EDGX", "bought": [33, 42, 55, 67]}
{"order_id": 1679931512490, "symbol": "SIE.DE", "instrument": "SIE.DE MAR 24 23 149 CALL", "currency": "eur", "price": 13.68, "exchange": "XETR", "bought": [96, 89, 82]}
{"order_id": 1679931512491, "symbol": "NKE", "instrument": "NKE MAR 24 23 112 CALL", "currency": "usd", "price": 3.23, "exchange": "EDGX", "sold": [67]}
{"order_id": 1679931512492, "symbol": "AMZN", "instrument": "AMZN MAY 26 23 95 CALL", "currency": "usd", "price": 11.44, "exchange": "EDGX", "sold": [41, 62, 12]}
{"order_id": 1679931512493, "symbol": "JPM", "instrument": "JPM MAR 24 23 121 PUT", "currency": "usd", "price": 1.0, "exchange": "EDGX", "bought": [61, 34]}
{"order_id": 1679931512494, "symbol": "SAP.DE", "instrument": "SAP.DE MAR 24 23 132 CALL", "currency": "eur", "price": 15.9, "exchange": "XETR", "bought": [69, 33]}

ETL requirements

This data has a number of unique characteristics, as often found on older systems, that make the data harder to use.

The following are the ETL requirements:

  • The instrument name has valuable information that is intended for humans to understand; we want to normalize it into separate columns for easier analysis.
  • The attributes bought and sold are mutually exclusive; we can consolidate them into a single column with the contract numbers and have another column indicating if the contracts where bought or sold in this order.
  • We want to keep the information about the individual contract allocations but as individual rows instead of forcing users to deal with an array of numbers. We could add up the numbers, but we would lose information about how the order was filled (indicating market liquidity). Instead, we choose to denormalize the table so each row has a single number of contracts, splitting orders with multiple numbers into separate rows. In a compressed columnar format, the extra dataset size of this repetition is often small when compression is applied, so it’s acceptable to make the dataset easier to query.
  • We want to generate a summary table of volume for each option type (call and put) for each stock. This provides an indication of the market sentiment for each stock and the market in general (greed vs. fear).
  • To enable overall trade summaries, we want to provide for each operation the grand total and standardize the currency to US dollars, using an approximate conversion reference.
  • We want to add the date when these transformations took place. This could be useful, for instance, to have a reference on when was the currency conversion made.

Based on those requirements, the job will produce two outputs:

  • A CSV file with a summary of the number of contracts for each symbol and type
  • A catalog table to keep a history of the order, after doing the transformations indicated
    Data schema

Prerequisites

You will need your own S3 bucket to follow along with this use case. To create a new bucket, refer to Creating a bucket.

Generate synthetic data

To follow along with this post (or experiment with this kind of data on your own), you can generate this dataset synthetically. The following Python script can be run on a Python environment with Boto3 installed and access to Amazon Simple Storage Service (Amazon S3).

To generate the data, complete the following steps:

  1. On AWS Glue Studio, create a new job with the option Python shell script editor.
  2. Give the job a name and on the Job details tab, select a suitable role and a name for the Python script.
  3. In the Job details section, expand Advanced properties and scroll down to Job parameters.
  4. Enter a parameter named --bucket and assign as the value the name of the bucket you want to use to store the sample data.
  5. Enter the following script into the AWS Glue shell editor:
    import argparse
    import boto3
    from datetime import datetime
    import io
    import json
    import random
    import sys
    
    # Configuration
    parser = argparse.ArgumentParser()
    parser.add_argument('--bucket')
    args, ignore = parser.parse_known_args()
    if not args.bucket:
        raise Exception("This script requires an argument --bucket with the value specifying the S3 bucket where to store the files generated")
    
    data_bucket = args.bucket
    data_path = "transformsblog/inputdata"
    samples_per_file = 1000
    
    # Create a single file with synthetic data samples
    s3 = boto3.client('s3')
    buff = io.BytesIO()
    
    sample_stocks = [("AMZN", 95, "usd"), ("NKE", 120, "usd"), ("JPM", 130, "usd"), ("KO", 130, "usd"),
                     ("BMW.DE", 95, "eur"), ("SIE.DE", 140, "eur"), ("SAP.DE", 115, "eur")]
    option_type = ["PUT", "CALL"]
    operations = ["sold", "bought"]
    dates = ["MAR 24 23", "APR 28 23", "MAY 26 23", "JUN 30 23"]
    for i in range(samples_per_file):
        stock = random.choice(sample_stocks)
        symbol = stock[0]
        ref_price = stock[1]
        currency = stock[2]
        strike_price = round(ref_price * 0.9 + ref_price * random.uniform(0.01, 0.3))
        sample = {
            "order_id": int(datetime.now().timestamp() * 1000) + i,
            "symbol": stock[0],
            "instrument":f"{symbol} {random.choice(dates)} {strike_price} {random.choice(option_type)}",
            "currency": currency,
            "price": round(random.uniform(0.5, 20.1), 2),
            "exchange": "EDGX" if currency == "usd" else "XETR"
         }
        sample[random.choice(operations)] = [random.randrange(1,100) for i in range(random.randrange(1,5))]
        buff.write(json.dumps(sample).encode())
        buff.write("\n".encode())
    
    s3.put_object(Body=buff.getvalue(), Bucket=data_bucket, Key=f"{data_path}/{int(datetime.now().timestamp())}.json")

  6. Run the job and wait until it shows as successfully completed on the Runs tab (it should take just a few seconds).

Each run will generate a JSON file with 1,000 rows under the bucket specified and prefix transformsblog/inputdata/. You can run the job multiple times if you want to test with more input files.
Each line in the synthetic data is a data row representing a JSON object like the following:

{
 "order_id":1681986991888,
 "symbol":"AMZN",
 "instrument":"AMZN APR 28 23 100 PUT",
 "currency":"usd",
 "price":2.89,
 "exchange":"EDGX",
 "sold":[88,49]
}

Create the AWS Glue visual job

To create the AWS Glue visual job, complete the following steps:

  1. Go to AWS Glue Studio and create a job using the option Visual with a blank canvas.
  2. Edit Untitled job to give it a name and assign a role suitable for AWS Glue on the Job details tab.
  3. Add an S3 data source (you can name it JSON files source) and enter the S3 URL under which the files are stored (for example, s3://<your bucket name>/transformsblog/inputdata/), then select JSON as the data format.
  4. Select Infer schema so it sets the output schema based on the data.

From this source node, you’ll keep chaining transforms. When adding each transform, make sure the selected node is the last one added so it gets assigned as the parent, unless indicated otherwise in the instructions.

If you didn’t select the right parent, you can always edit the parent by selecting it and choosing another parent in the configuration pane.

Node parent configuration

For each node added, you’ll give it a specific name (so the node purpose shows in the graph) and configuration on the Transform tab.

Every time a transform changes the schema (for instance, add a new column), the output schema needs to be updated so it’s visible to the downstream transforms. You can manually edit the output schema, but it’s more practical and safer to do it using the data preview.
Additionally, that way you can verify the transformation are working so far as expected. To do so, open the Data preview tab with the transform selected and start a preview session. After you have verified the transformed data looks as expected, go to the Output schema tab and choose Use data preview schema to update the schema automatically.

As you add new kinds of transforms, the preview might show a message about a missing dependency. When this happens, choose End Session and the start a new one, so the preview picks up the new kind of node.

Extract instrument information

Let’s start by dealing with the information on the instrument name to normalize it into columns that are easier to access in the resulting output table.

  1. Add a Split String node and name it Split instrument, which will tokenize the instrument column using a whitespace regex: \s+ (a single space would do in this case, but this way is more flexible and visually clearer).
  2. We want to keep the original instrument information as is, so enter a new column name for the split array: instrument_arr.
    Split config
  3. Add an Array To Columns node and name it Instrument columns to convert the array column just created into new fields, except for symbol, for which we already have a column.
  4. Select the column instrument_arr, skip the first token and tell it to extract the output columns month, day, year, strike_price, type using indexes 2, 3, 4, 5, 6 (the spaces after the commas are for readability, they don’t impact the configuration).
    Array config

The year extracted is expressed with two digits only; let’s put a stopgap to assume it’s in this century if they just use two digits.

  1. Add a Derived Column node and name it Four digits year.
  2. Enter year as the derived column so it overrides it, and enter the following SQL expression:
    CASE WHEN length(year) = 2 THEN ('20' || year) ELSE year END
    Year derived column config

For convenience, we build an expiration_date field that a user can have as reference of the last date the option can be exercised.

  1. Add a Concatenate Columns node and name it Build expiration date.
  2. Name the new column expiration_date, select the columns year, month, and day (in that order), and a hyphen as spacer.
    Concatenated date config

The diagram so far should look like the following example.

DAG

The data preview of the new columns so far should look like the following screenshot.

Data preview

Normalize the number of contracts

Each of the rows in the data indicates the number of contracts of each option that were bought or sold and the batches on which the orders were filled. Without losing the information about the individual batches, we want to have each amount on an individual row with a single amount value, while the rest of the information is replicated in each row produced.

First, let’s merge the amounts into a single column.

  1. Add an Unpivot Columns Into Rows node and name it Unpivot actions.
  2. Choose the columns bought and sold to unpivot and store the names and values in columns named action and contracts, respectively.
    Unpivot config
    Notice in the preview that the new column contracts is still an array of numbers after this transformation.
  1. Add an Explode Array Or Map into Rows row named Explode contracts.
  2. Choose the contracts column and enter contracts as the new column to override it (we don’t need to keep the original array).

The preview now shows that each row has a single contracts amount, and the rest of the fields are the same.

This also means that order_id is no longer a unique key. For your own use cases, you need to decide how to model your data and if you want to denormalize or not.
Explode config

The following screenshot is an example of what the new columns look like after the transformations so far.
Data preview

Create a summary table

Now you create a summary table with the number of contracts traded for each type and each stock symbol.

Let’s assume for illustration purposes that the files processed belong to a single day, so this summary gives the business users information about what the market interest and sentiment are that day.

  1. Add a Select Fields node and select the following columns to keep for the summary: symbol, type, and contracts.
    Selected fields
  2. Add a Pivot Rows Into Columns node and name it Pivot summary.
  3. Aggregate on the contracts column using sum and choose to convert the type column.
    Pivot config

Normally, you would store it on some external database or file for reference; in this example, we save it as a CSV file on Amazon S3.

  1. Add an Autobalance Processing node and name it Single output file.
  2. Although that transform type is normally used to optimize the parallelism, here we use it to reduce the output to a single file. Therefore, enter 1 in the number of partitions configuration.
    Autobalance config
  3. Add an S3 target and name it CSV Contract summary.
  4. Choose CSV as the data format and enter an S3 path where the job role is allowed to store files.

The last part of the job should now look like the following example.
DAG

  1. Save and run the job. Use the Runs tab to check when it has finished successfully.
    You’ll find a file under that path that is a CSV, despite not having that extension. You’ll probably need to add the extension after downloading it to open it.
    On a tool that can read the CSV, the summary should look something like the following example.
    Spreadsheet

Clean up temporary columns

In preparation for saving the orders into a historical table for future analysis, let’s clean up some temporary columns created along the way.

  1. Add a Drop Fields node with the Explode contracts node selected as its parent (we are branching the data pipeline to generate a separate output).
  2. Select the fields to be dropped: instrument_arr, month, day, and year.
    The rest we want to keep so they are saved in the historical table we’ll create later.
    Drop fields

Currency standardization

This synthetic data contains fictional operations on two currencies, but in a real system you could get currencies from markets all over the world. It’s useful to standardize the currencies handled into a single reference currency so they can be easily be compared and aggregated for reporting and analysis.

We use Amazon Athena to simulate a table with approximate currency conversions that gets updated periodically (here we assume we process the orders timely enough that the conversion is a reasonable representative for comparison purposes).

  1. Open the Athena console in the same Region where you’re using AWS Glue.
  2. Run the following query to create the table by setting an S3 location where both your Athena and AWS Glue roles can read and write. Also, you might want to store the table in a different database than default (if you do that, update the table qualified name accordingly in the examples provided).
    CREATE EXTERNAL TABLE default.exchange_rates(currency string, exchange_rate double)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS TEXTFILE
    LOCATION 's3://<enter some bucket>/exchange_rates/';

  3. Enter a few sample conversions into the table:
    INSERT INTO default.exchange_rates VALUES ('usd', 1.0), ('eur', 1.09), ('gbp', 1.24);
  4. You should now be able to view the table with the following query:
    SELECT * FROM default.exchange_rates
  5. Back on the AWS Glue visual job, add a Lookup node (as a child of Drop Fields) and name it Exchange rate.
  6. Enter the qualitied name of the table you just created, using currency as the key and select the exchange_rate field to use.
    Because the field is named the same in both the data and the lookup table, we can just enter the name currency and don’t need to define a mapping.Lookup config
    At the time of this writing, the Lookup transform is not supported in the data preview and it will show an error that the table doesn’t exist. This is only for the data preview and doesn’t prevent the job from running correctly. The few remaining steps of the post don’t require you to update the schema. If you need to run a data preview on other nodes, you can remove the lookup node temporarily and then put it back.
  7. Add a Derived Column node and name it Total in usd.
  8. Name the derived column total_usd and use the following SQL expression:
    round(contracts * price * exchange_rate, 2)
    Currency conversion config
  9. Add a Add Current Timestamp node and name the column ingest_date.
  10. Use the format %Y-%m-%d for your timestamp (for demonstration purposes, we are just using the date; you can make it more precise if you want to).
    Timestamp config

Save the historical orders table

To save the historical orders table, complete the following steps:

  1. Add an S3 target node and name it Orders table.
  2. Configure Parquet format with snappy compression, and provide an S3 target path under which to store the results (separate from the summary).
  3. Select Create a table in the Data Catalog and on subsequent runs, update the schema and add new partitions.
  4. Enter a target database and a name for the new table, for instance: option_orders.
    Table sink config

The last part of the diagram should now look similar to the following, with two branches for the two separate outputs.
DAG

After you run the job successfully, you can use a tool like Athena to review the data the job has produced by querying the new table. You can find the table on the Athena list and choose Preview table or just run a SELECT query (updating the table name to the name and catalog you used):

SELECT * FROM default.option_orders limit 10

Your table content should look similar to the following screenshot.
Table content

Clean up

If you don’t want to keep this example, delete the two jobs you created, the two tables in Athena, and the S3 paths where the input and output files were stored.

Conclusion

In this post, we showed how the new transforms in AWS Glue Studio can help you do more advanced transformation with minimum configuration. This means you can implement more ETL uses cases without having to write and maintain any code. The new transforms are already available on AWS Glue Studio, so you can use the new transforms today in your visual jobs.


About the author

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.

Create your own reusable visual transforms for AWS Glue Studio

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/create-your-own-reusable-visual-transforms-for-aws-glue-studio/

AWS Glue Studio has recently added the possibility of adding custom transforms that you can use to build visual jobs to use them in combination with the AWS Glue Studio components provided out of the box. You can now define custom visual transform by simply dropping a JSON file and a Python script onto Amazon S3, which defines the component and the processing logic, respectively.

Custom visual transform lets you define, reuse, and share business-specific ETL logic among your teams. With this new feature, data engineers can write reusable transforms for the AWS Glue visual job editor. Reusable transforms increase consistency between teams and help keep jobs up-to-date by minimizing duplicate effort and code.

In this blog post, I will show you a fictional use case that requires the creation of two custom transforms to illustrate what you can accomplish with this new feature. One component will generate synthetic data on the fly for testing purposes, and the other will prepare the data to store it partitioned.

Use case: Generate synthetic data on the fly

There are multiple reasons why you would want to have a component that generates synthetic data. Maybe the real data is heavily restricted or not yet available, or there is not enough quantity or variety at the moment to test performance. Or maybe using the real data imposes some cost or load to the real system, and we want to reduce its usage during development.

Using the new custom visual transforms framework, let’s create a component that builds synthetic data for fictional sales during a natural year.

Define the generator component

First, define the component by giving it a name, description, and parameters. In this case, use salesdata_generator for both the name and the function, with two parameters: how many rows to generate and for which year.

For the parameters, we define them both as int, and you can add a regex validation to make sure the parameters provided by the user are in the correct format.

There are further configuration options available; to learn more, refer to the AWS Glue User Guide.

This is how the component definition would look like. Save it as salesdata_generator.json. For convenience, we’ll match the name of the Python file, so it’s important to choose a name that doesn’t conflict with an existing Python module.
If the year is not specified, the script will default to last year.

{
  "name": "salesdata_generator",
  "displayName": "Synthetic Sales Data Generator",
  "description": "Generate synthetic order datasets for testing purposes.",
  "functionName": "salesdata_generator",
  "parameters": [
    {
      "name": "numSamples",
      "displayName": "Number of samples",
      "type": "int",
      "description": "Number of samples to generate"
    },
    {
      "name": "year",
      "displayName": "Year",
      "isOptional": true,
      "type": "int",
      "description": "Year for which generate data distributed randomly, by default last year",
      "validationRule": "^\\d{4}$",
      "validationMessage": "Please enter a valid year number"
    }
  ]
}

Implement the generator logic

Now, you need to create a Python script file with the implementation logic.
Save the following script as salesdata_generator.py. Notice the name is the same as the JSON, just with a different extension.

from awsglue import DynamicFrame
import pyspark.sql.functions as F
import datetime
import time

def salesdata_generator(self, numSamples, year=None):
    if not year:
        # Use last year
        year = datetime.datetime.now().year - 1
    
    year_start_ts = int(time.mktime((year,1,1,0,0,0,0,0,0)))
    year_end_ts = int(time.mktime((year + 1,1,1,0,0,0,0,0,0)))
    ts_range = year_end_ts - year_start_ts
    
    departments = ["bargain", "checkout", "food hall", "sports", "menswear", "womenwear", "health and beauty", "home"]
    dep_array = F.array(*[F.lit(x) for x in departments])
    dep_randomizer = (F.round(F.rand() * (len(departments) -1))).cast("int")

    df = self.glue_ctx.sparkSession.range(numSamples) \
      .withColumn("sale_date", F.from_unixtime(F.lit(year_start_ts) + F.rand() * ts_range)) \
      .withColumn("amount_dollars", F.round(F.rand() * 1000, 2)) \
      .withColumn("department", dep_array.getItem(dep_randomizer))  
    return DynamicFrame.fromDF(df, self.glue_ctx, "sales_synthetic_data")

DynamicFrame.salesdata_generator = salesdata_generator

The function salesdata_generator in the script receives the source DynamicFrame as “self”, and the parameters must match the definition in the JSON file. Notice the “year” is an optional parameter, so it has assigned a default function on call, which the function detects and replaces with the previous year. The function returns the transformed DynamicFrame. In this case, it’s not derived from the source one, which is the common case, but replaced by a new one.

The transform leverages Spark functions as well as Python libraries in order to implement this generator.
To keep things simple, this example only generates four columns, but we could do the same for many more by either hardcoding values, assigning them from a list, looking for some other input, or doing whatever makes sense to make the data realistic.

Deploy and using the generator transform

Now that we have both files ready, all we have to do is upload them on Amazon S3 under the following path.

s3://aws-glue-assets-<account id>-<region name>/transforms/

If AWS Glue has never been used in the account and Region, then that bucket might not exist and needs to be created. AWS Glue will automatically create this bucket when you create your first job.

You will need to manually create a folder called “transforms” in that bucket to upload the files into.

Once you have uploaded both files, the next time we open (or refresh) the page on AWS Glue Studio visual editor, the transform should be listed among the other transforms. You can search for it by name or description.

Because this is a transform and not a source, when we try to use the component, the UI will demand a parent node. You can use as a parent the real data source (so you can easily remove the generator and use the real data) or just use a placeholder. I’ll show you how:

  1. Go to the AWS Glue, and in the left menu, select Jobs under AWS Glue Studio.
  2. Leave the default options (Visual with a source and target and S3 source and destination), and choose Create.
  3. Give the job a name by editing Untitled job at the top left; for example, CustomTransformsDemo
  4. Go to the Job details tab and select a role with AWS Glue permissions as the IAM role. If no role is listed on the dropdown, then follow these instructions to create one.
    For this demo, you can also reduce Requested number of workers to 2 and Number of retries to 0 to minimize costs.
  5. Delete the Data target node S3 bucket at the bottom of the graph by selecting it and choosing Remove. We will restore it later when we need it.
  6. Edit the S3 source node by selecting it in the Data source properties tab and selecting source type S3 location.
    In the S3 URL box, enter a path that doesn’t exist on a bucket the role selected can access, for instance: s3://aws-glue-assets-<account id>-<region name>/file_that_doesnt_exist. Notice there is no trailing slash.
    Choose JSON as the data format with default settings; it doesn’t matter.
    You might get a warning that it cannot infer schema because the file doesn’t exist; that’s OK, we don’t need it.
  7. Now search for the transform by typing “synthetic” in the search box of transforms. Once the result appears (or you scroll and search it on the list), choose it so it is added to the job.
  8. Set the parent of the transform just added to be S3 bucket source in the Node properties tab. Then for the ApplyMapping node, replace the parent S3 bucket with transforms Synthetic Sales Data Generator. Notice this long name is coming from the displayName defined in the JSON file uploaded before.
  9. After these changes, your job diagram should look as follows (if you tried to save, there might be some warnings; that’s OK, we’ll complete the configuration next).
  10. Select the Synthetic Sales node and go to the Transform tab. Enter 10000 as the number of samples and leave the year by default, so it uses last year.
  11. Now we need the generated schema to be applied. This would be needed if we had a source that matches the generator schema.
    In the same node, select the tab Data preview and start a session. Once it is running, you should see sample synthetic data. Notice the sale dates are randomly distributed across the year.
  12. Now select the tab Output schema and choose Use datapreview schema That way, the four fields generated by the node will be propagated, and we can do the mapping based on this schema.
  13. Now we want to convert the generated sale_date timestamp into a date column, so we can use it to partition the output by day. Select the node ApplyMapping in the Transform tab. For the sale_date field, select date as the target type. This will truncate the timestamp to just the date.
  14. Now it’s a good time to save the job. It should let you save successfully.

Finally, we need to configure the sink. Follow these steps:

  1. With the ApplyMapping node selected, go to the Target dropdown and choose Amazon S3. The sink will be added to the ApplyMapping node. If you didn’t select the parent node before adding the sink, you can still set it in the Node details tab of the sink.
  2. Create an S3 bucket in the same Region as where the job will run. We’ll use it to store the output data, so we can clean up easily at the end. If you create it via the console, the default bucket config is OK.
    You can read more information about bucket creation on the Amazon S3 documentation 
  3. In the Data target properties tab, enter in S3 Target Location the URL of the bucket and some path and a trailing slash, for instance: s3://<your output bucket here>/output/
    Leave the rest with the default values provided.
  4. Choose Add partition key at the bottom and select the field sale_date.

We could create a partitioned table at the same time just by selecting the corresponding catalog update option. For simplicity, generate the partitioned files at this time without updating the catalog, which is the default option.

You can now save and then run the job.

Once the job has completed, after a couple of minutes (you can verify this in the Runs tab), explore the S3 target location entered above. You can use the Amazon S3 console or the AWS CLI. You will see files named like this: s3://<your output bucket here>/output/sale_date=<some date yyyy-mm-dd>/<filename>.

If you count the files, there should be close to but not more than 1,460 (depending on the year used and assuming you are using 2 G.1X workers and AWS Glue version 3.0)

Use case: Improve the data partitioning

In the previous section, you created a job using a custom visual component that produced synthetic data, did a small transformation on the date, and saved it partitioned on S3 by day.

You might be wondering why this job generated so many files for the synthetic data. This is not ideal, especially when they are as small as in this case. If this data was saved as a table with years of history, generating small files has a detrimental impact on tools that consume it, like Amazon Athena.

The reason for this is that when the generator calls the “range” function in Apache Spark without specifying a number of memory partitions (notice they are a different kind from the output partitions saved to S3), it defaults to the number of cores in the cluster, which in this example is just 4.

Because the dates are random, each memory partition is likely to contain rows representing all days of the year, so when the sink needs to split the dates into output directories to group the files, each memory partition needs to create one file for each day present, so you can have 4 * 365 (not in a leap year) is 1,460.

This example is a bit extreme, and normally data read from the source is not so spread over time. The issue can often be found when you add other dimensions, such as output partition columns.

Now you are going to build a component that optimizes this, trying to reduce the number of output files as much as possible: one per output directory.
Also, let’s imagine that on your team, you have the policy of generating S3 date partition separated by year, month, and day as strings, so the files can be selected efficiently whether using a table on top or not.

We don’t want individual users to have to deal with these optimizations and conventions individually but instead have a component they can just add to their jobs.

Define the repartitioner transform

For this new transform, create a separate JSON file, let’s call it repartition_date.json, where we define the new transform and the parameters it needs.

{
  "name": "repartition_date",
  "displayName": "Repartition by date",
  "description": "Split a date into partition columns and reorganize the data to save them as partitions.",
  "functionName": "repartition_date",
  "parameters": [
    {
      "name": "dateCol",
      "displayName": "Date column",
      "type": "str",
      "description": "Column with the date to split into year, month and day partitions. The column won't be removed"
    },
    {
      "name": "partitionCols",
      "displayName": "Partition columns",
      "type": "str",
      "isOptional": true,
      "description": "In addition to the year, month and day, you can specify additional columns to partition by, separated by commas"
    },
    {
      "name": "numPartitionsExpected",
      "displayName": "Number partitions expected",
      "isOptional": true,
      "type": "int",
      "description": "The number of partition column value combinations expected, if not specified the system will calculate it."
    }
  ]
}

Implement the transform logic

The script splits the date into multiple columns with leading zeros and then reorganizes the data in memory according to the output partitions. Save the code in a file named repartition_date.py:

from awsglue import DynamicFrame
import pyspark.sql.functions as F

def repartition_date(self, dateCol, partitionCols="", numPartitionsExpected=None):
    partition_list = partitionCols.split(",") if partitionCols else []
    partition_list += ["year", "month", "day"]
    
    date_col = F.col(dateCol)
    df = self.toDF()\
      .withColumn("year", F.year(date_col).cast("string"))\
      .withColumn("month", F.format_string("%02d", F.month(date_col)))\
      .withColumn("day", F.format_string("%02d", F.dayofmonth(date_col)))
    
    if not numPartitionsExpected:
        numPartitionsExpected = df.selectExpr(f"COUNT(DISTINCT {','.join(partition_list)})").collect()[0][0]
    
    # Reorganize the data so the partitions in memory are aligned when the file partitioning on s3
    # So each partition has the data for a combination of partition column values
    df = df.repartition(numPartitionsExpected, partition_list)    
    return DynamicFrame.fromDF(df, self.glue_ctx, self.name)

DynamicFrame.repartition_date = repartition_date

Upload the two new files onto the S3 transforms folder like you did for the previous transform.

Deploy and use the generator transform

Now edit the job to make use of the new component to generate a different output.
Refresh the page in the browser if the new transform is not listed.

  1. Select the generator transform and from the transforms dropdown, find Repartition by date and choose it; it should be added as a child of the generator.
    Now change the parent of the Data target node to the new node added and remove the ApplyMapping; we no longer need it.
  2. Repartition by date needs you to enter the column that contains the timestamp.
    Enter sale_date (the framework doesn’t yet allow field selection using a dropdown) and leave the other two as defaults.
  3. Now we need to update the output schema with the new date split fields. To do so, use the Data preview tab to check it’s working correctly (or start a session if the previous one has expired). Then in the Output schema, choose Use datapreview schema so the new fields get added. Notice the transform doesn’t remove the original column, but it could if you change it to do so.
  4. Finally, edit the S3 target to enter a different location so the folders don’t mix with the previous run, and it’s easier to compare and use. Change the path to /output2/.
    Remove the existing partition column and instead add year, month, and day.

Save and run the job. After one or two minutes, once it completes, examine the output files. They should be much closer to the optimal number of one per day, maybe two. Consider that in this example, we only have four partitions. In a real dataset, the number of files without this repartitioning would explode very easily.
Also, now the path follows the traditional date partition structure, for instance: output2/year=2021/month=09/day=01/run-AmazonS3_node1669816624410-4-part-r-00292

Notice that at the end of the file name is the partition number. While we now have more partitions, we have fewer output files because the data is organized in memory more aligned with the desired output.

The repartition transform has additional configuration options that we have left empty. You can now go ahead and try different values and see how they affect the output.
For instance, you can specify “department ” as “Partition columns” in the transform and then add it in the sink partition column list. Or you can enter a “Number of partitions expected” and see how it affects the runtime (it no longer needs to determine this at runtime) and the number of files produced as you enter a higher number, for instance, 3,000.

How this feature works under the hood

  1. Upon loading the AWS Glue Studio visual job authoring page, all your transforms stored in the aforementioned S3 bucket will be loaded in the UI. AWS Glue Studio will parse the JSON definition file to display transform metadata such as name, description, and list of parameters.
  2. Once the user is done creating and saving his job using custom visual transforms, AWS Glue Studio will generate the job script and update the Python library path (also referred as —extra-py-files job parameters) with the list of transform Python file S3 paths, separated by comma.
  3. Before running your script, AWS Glue will add all file paths stored in the —extra-py-files job parameters to the Python path, allowing your script to run all custom visual transform functions you defined.

Cleanup

In order to avoid running costs, if you don’t want to keep the generated files, you can empty and delete the output bucket created for this demo. You might also want to delete the AWS Glue job created.

Conclusion

In this post, you have seen how you can create your own reusable visual transforms and then use them in AWS Glue Studio to enhance your jobs and your team’s productivity.

You first created a component to use synthetically generated data on demand and then another transform to optimize the data for partitioning on Amazon S3.


About the authors

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.

Michael Benattar is a Senior Software Engineer on the AWS Glue Studio team. He has led the design and implementation of the custom visual transform feature.

Best practices to optimize cost and performance for AWS Glue streaming ETL jobs

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/best-practices-to-optimize-cost-and-performance-for-aws-glue-streaming-etl-jobs/

AWS Glue streaming extract, transform, and load (ETL) jobs allow you to process and enrich vast amounts of incoming data from systems such as Amazon Kinesis Data Streams, Amazon Managed Streaming for Apache Kafka (Amazon MSK), or any other Apache Kafka cluster. It uses the Spark Structured Streaming framework to perform data processing in near-real time.

This post covers use cases where data needs to be efficiently processed, delivered, and possibly actioned in a limited amount of time. This can cover a wide range of cases, such as log processing and alarming, continuous data ingestion and enrichment, data validation, internet of things, machine learning (ML), and more.

We discuss the following topics:

  • Development tools that help you code faster using our newly launched AWS Glue Studio notebooks
  • How to monitor and tune your streaming jobs
  • Best practices for sizing and scaling your AWS Glue cluster, using our newly launched features like auto scaling and the small worker type G 0.25X

Development tools

AWS Glue Studio notebooks can speed up the development of your streaming job by allowing data engineers to work using an interactive notebook and test code changes to get quick feedback—from business logic coding to testing configuration changes—as part of tuning.

Before you run any code in the notebook (which would start the session), you need to set some important configurations.

The magic %streaming creates the session cluster using the same runtime as AWS Glue streaming jobs. This way, you interactively develop and test your code using the same runtime that you use later in the production job.

Additionally, configure Spark UI logs, which will be very useful for monitoring and tuning the job.

See the following configuration:

%streaming
%%configure
{
"--enable-spark-ui": "true",
"--spark-event-logs-path": "s3://your_bucket/sparkui/"
}

For additional configuration options such as version or number of workers, refer to Configuring AWS Glue interactive sessions for Jupyter and AWS Glue Studio notebooks.

To visualize the Spark UI logs, you need a Spark history server. If you don’t have one already, refer to Launching the Spark History Server for deployment instructions.

Structured Streaming is based on streaming DataFrames, which represent micro-batches of messages.
The following code is an example of creating a stream DataFrame using Amazon Kinesis as the source:

kinesis_options = {
  "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
  "startingPosition": "TRIM_HORIZON",
  "inferSchema": "true",
  "classification": "json"
}
kinesisDF = glueContext.create_data_frame_from_options(
   connection_type="kinesis",
   connection_options=kinesis_options
)

The AWS Glue API helps you create the DataFrame by doing schema detection and auto decompression, depending on the format. You can also build it yourself using the Spark API directly:

kinesisDF = spark.readStream.format("kinesis").options(**kinesis_options).load()

After your run any code cell, it triggers the startup of the session, and the application soon appears in the history server as an incomplete app (at the bottom of the page there is a link to display incomplete apps) named GlueReplApp, because it’s a session cluster. For a regular job, it’s listed with the job name given when it was created.

History server home page

From the notebook, you can take a sample of the streaming data. This can help development and give an indication of the type and size of the streaming messages, which might impact performance.

Monitor the cluster with Structured Streaming

The best way to monitor and tune your AWS Glue streaming job is using the Spark UI; it gives you the overall streaming job trends on the Structured Streaming tab and the details of each individual micro-batch processing job.

Overall view of the streaming job

On the Structured Streaming tab, you can see a summary of the streams running in the cluster, as in the following example.

Normally there is just one streaming query, representing a streaming ETL. If you start multiple in parallel, it’s good if you give it a recognizable name, calling queryName() if you use the writeStream API directly on the DataFrame.

After a good number of batches are complete (such as 10), enough for the averages to stabilize, you can use Avg Input/sec column to monitor how many events or messages the job is processing. This can be confusing because the column to the right, Avg Process/sec, is similar but often has a higher number. The difference is that this process time tells us how efficient our code is, whereas the average input tells us how many messages the cluster is reading and processing.

The important thing to note is that if the two values are similar, it means the job is working at maximum capacity. It’s making the best use of the hardware but it likely won’t be able to cope with an increase in volume without causing delays.

In the last column is the latest batch number. Because they’re numbered incrementally from zero, this tells us how many batches the query has processed so far.

When you choose the link in the “Run ID” column of a streaming query, you can review the details with graphs and histograms, as in the following example.

The first two rows correspond to the data that is used to calculate the averages shown on the summary page.

For Input Rate, each data point is calculated by dividing the number of events read for the batch by the time passed between the current batch start and the previous batch start. In a healthy system that is able to keep up, this is equal to the configured trigger interval (in the GlueContext.forEachBatch() API, this is set using the option windowSize).

Because it uses the current batch rows with the previous batch latency, this graph is often unstable in the first batches until the Batch Duration (the last line graph) stabilizes.

In this example, when it stabilizes, it gets completely flat. This means that either the influx of messages is constant or the job is hitting the limit per batch set (we discuss how to do this later in the post).

Be careful if you set a limit per batch that is constantly hit, you could be silently building a backlog, but everything could look good in the job metrics. To monitor this, have a metric of latency measuring the difference between the message timestamp when it gets created and the time it’s processed.

Process Rate is calculated by dividing the number of messages in a batch by the time it took to process that batch. For instance, if the batch contains 1,000 messages, and the trigger interval is 10 seconds but the batch only needed 5 seconds to process it, the process rate would be 1000/5 = 200 msg/sec. while the input rate for that batch (assuming the previous batch also ran within the interval) is 1000/10 = 100 msg/sec.

This metric is useful to measure how efficient our code processing the batch is, and therefore it can get higher than the input rate (this doesn’t mean it’s processing more messages, just using less time). As mentioned earlier, if both metrics get close, it means the batch duration is close to the interval and therefore additional traffic is likely to start causing batch trigger delays (because the previous batch is still running) and increase latency.

Later in this post, we show how auto scaling can help prevent this situation.

Input Rows shows the number of messages read for each batch, like input rate, but using volume instead of rate.

It’s important to note that if the batch processes the data multiple times (for example, writing to multiple destinations), the messages are counted multiple times. If the rates are greater than the expected, this could be the reason. In general, to avoid reading messages multiple times, you should cache the batch while processing it, which is the default when you use the GlueContext.forEachBatch() API.

The last two rows tell us how long it takes to process each batch and how is that time spent. It’s normal to see the first batches take much longer until the system warms up and stabilizes.
The important thing to look for is that the durations are roughly stable and well under the configured trigger interval. If that’s not the case, the next batch gets delayed and could start a compounding delay by building a backlog or increasing batch size (if the limit allows taking the extra messages pending).

In Operation Duration, the majority of time should be spent on addBatch (the mustard color), which is the actual work. The rest are fixed overhead, therefore the smaller the batch process, the more percentage of time that will take. This represents the trade-off between small batches with lower latency or bigger batches but more computing efficient.

Also, it’s normal for the first batch to spend significant time in the latestOffset (the brown bar), locating the point at which it needs to start processing when there is no checkpoint.

The following query statistics show another example.

In this case, the input has some variation (meaning it’s not hitting the batch limit). Also, the process rate is roughly the same as the input rate. This tells us the system is at max capacity and struggling to keep up. By comparing the input rows and input rate, we can guess that the interval configured is just 3 seconds and the batch duration is barely able to meet that latency.

Finally, in Operation Duration, you can observe that because the batches are so frequent, a significant amount of time (proportionally speaking) is spent saving the checkpoint (the dark green bar).

With this information, we can probably improve the stability of the job by increasing the trigger interval to 5 seconds or more. This way, it checkpoints less often and has more time to process data, which might be enough to get batch duration consistently under the interval. The trade-off is that the latency between when a message is published and when it’s processed is longer.

Monitor individual batch processing

On the Jobs tab, you can see how long each batch is taking and dig into the different steps the processing involves to understand how the time is spent. You can also check if there are tasks that succeed after retry. If this happens continuously, it can silently hurt performance.

For instance, the following screenshot shows the batches on the Jobs tab of the Spark UI of our streaming job.

Each batch is considered a job by Spark (don’t confuse the job ID with the batch number; they only match if there is no other action). The job group is the streaming query ID (this is important only when running multiple queries).

The streaming job in this example has a single stage with 100 partitions. Both batches processed them successfully, so the stage is marked as succeeded and all the tasks completed (100/100 in the progress bar).

However, there is a difference in the first batch: there were 20 task failures. You know all the failed tasks succeeded in the retries, otherwise the stage would have been marked as failed. For the stage to fail, the same task would have to fail four times (or as configured by spark.task.maxFailures).

If the stage fails, the batch fails as well and possibly the whole job; if the job was started by using GlueContext.forEachBatch(), it has a number of retries as per the batchMaxRetries parameter (three by default).

These failures are important because they have two effects:

  • They can silently cause delays in the batch processing, depending on how long it took to fail and retry.
  • They can cause records to be sent multiple times if the failure is in the last stage of the batch, depending on the type of output. If the output is files, in general it won’t cause duplicates. However, if the destination is Amazon DynamoDB, JDBC, Amazon OpenSearch Service, or another output that uses batching, it’s possible that some part of the output has already been sent. If you can’t tolerate any duplicates, the destination system should handle this (for example, being idempotent).

Choosing the description link takes you to the Stages tab for that job. Here you can dig into the failure: What is the exception? Is it always in the same executor? Does it succeed on the first retry or took multiple?

Ideally, you want to identify these failures and solve them. For example, maybe the destination system is throttling us because doesn’t have enough provisioned capacity, or a larger timeout is needed. Otherwise, you should at least monitor it and decide if it is systemic or sporadic.

Sizing and scaling

Defining how to split the data is a key element in any distributed system to run and scale efficiently. The design decisions on the messaging system will have a strong influence on how the streaming job will perform and scale, and thereby affect the job parallelism.

In the case of AWS Glue Streaming, this division of work is based on Apache Spark partitions, which define how to split the work so it can be processed in parallel. Each time the job reads a batch from the source, it divides the incoming data into Spark partitions.

For Apache Kafka, each topic partition becomes a Spark partition; similarly, for Kinesis, each stream shard becomes a Spark partition. To simplify, I’ll refer to this parallelism level as number of partitions, meaning Spark partitions that will be determined by the input Kafka partitions or Kinesis shards on a one-to-one basis.

The goal is to have enough parallelism and capacity to process each batch of data in less time than the configured batch interval and therefore be able to keep up. For instance, with a batch interval of 60 seconds, the job lets 60 seconds of data build up and then processes that data. If that work takes more than 60 seconds, the next batch waits until the previous batch is complete before starting a new batch with the data that has built up since the previous batch started.

It’s a good practice to limit the amount of data to process in a single batch, instead of just taking everything that has been added since the last one. This helps make the job more stable and predictable during peak times. It allows you to test that the job can handle volume of data without issues (for example, memory or throttling).

To do so, specify a limit when defining the source stream DataFrame:

  • For Kinesis, specify the limit using kinesis.executor.maxFetchRecordsPerShard, and revise this number if the number of shards changes substantially. You might need to increase kinesis.executor.maxFetchTimeInMs as well, in order to allow more time to read the batch and make sure it’s not truncated.
  • For Kafka, set maxOffsetsPerTrigger, which divides that allowance equally between the number of partitions.

The following is an example of setting this config for Kafka (for Kinesis, it’s equivalent but using Kinesis properties):

kafka_properties= {
  "kafka.bootstrap.servers": "bootstrapserver1:9092",
  "subscribe": "mytopic",
  "startingOffsets": "latest",
  "maxOffsetsPerTrigger": "5000000"
}
# Pass the properties as options when creating the DataFrame
spark.spark.readStream.format("kafka").options(**kafka_properties).load()

Initial benchmark

If the events can be processed individually (no interdependency such as grouping), you can get a rough estimation of how many messages a single Spark core can handle by running with a single partition source (one Kafka partition or one Kinesis shard stream) with data preloaded into it and run batches with a limit and the minimum interval (1 second). This simulates a stress test with no downtime between batches.

For these repeated tests, clear the checkpoint directory, use a different one (for example, make it dynamic using the timestamp in the path), or just disable the checkpointing (if using the Spark API directly), so you can reuse the same data.
Leave a few batches to run (at least 10) to give time for the system and the metrics to stabilize.

Start with a small limit (using the limit configuration properties explained in the previous section) and do multiple reruns, increasing the value. Record the batch duration for that limit and the throughput input rate (because it’s a stress test, the process rate should be similar).

In general, larger batches tend to be more efficient up to a point. This is because the fixed overhead taken for each to checkpoint, plan, and coordinate the nodes is more significant if the batches are smaller and therefore more frequent.

Then pick your reference initial settings based on the requirements:

  • If a goal SLA is required, use the largest batch size whose batch duration is less than half the latency SLA. This is because in the worst case, a message that is stored just after a batch is triggered has to wait at least the interval and then the processing time (which should be less than the interval). When the system is keeping up, the latency in this worst case would be close to twice the interval, so aim for the batch duration to be less than half the target latency.
  • In the case where the throughput is the priority over latency, just pick the batch size that provides a higher average process rate and define an interval that allows some buffer over the observed batch duration.

Now you have an idea of the number of messages per core our ETL can handle and the latency. These numbers are idealistic because the system won’t scale perfectly linearly when you add more partitions and nodes. You can use the messages per core obtained to divide the total number of messages per second to process and get the minimum number of Spark partitions needed (each core handles one partition in parallel).

With this number of estimated Spark cores, calculate the number of nodes needed depending on the type and version, as summarized in the following table.

AWS Glue Version Worker Type vCores Spark Cores per Worker
2 G 1X 4 8
2 G 2X 8 16
3 G 0.25X 2 2
3 G 1X 4 4
3 G 2X 8 8

Using the newer version 3 is preferable because it includes more optimizations and features like auto scaling (which we discuss later). Regarding size, unless the job has some operation that is heavy on memory, it’s preferable to use the smaller instances so there aren’t so many cores competing for memory, disk, and network shared resources.

Spark cores are equivalent to threads; therefore, you can have more (or less) than the actual cores available in the instance. This doesn’t mean that having more Spark cores is going to necessarily be faster if they’re not backed by physical cores, it just means you have more parallelism competing for the same CPU.

Sizing the cluster when you control the input message system

This is the ideal case because you can optimize the performance and the efficiency as needed.

With the benchmark information you just gathered, you can define your initial AWS Glue cluster size and configure Kafka or Kinesis with the number of partitions or topics estimated, plus some buffer. Test this baseline setup and adjust as needed until the job can comfortably meet the total volume and required latency.

For instance, if we have determined that we need 32 cores to be well within the latency requirement for the volume of data to process, then we can create an AWS Glue 3.0 cluster with 9 G.1X nodes (a driver and 8 workers with 4 cores = 32) which reads from a Kinesis data stream with 32 shards.

Imagine that the volume of data in that stream doubles and we want to keep the latency requirements. To do so, we double the number of workers (16 + 1 driver = 17) and the number of shards on the stream (now 64). Remember this is just a reference and needs to be validated; in practice you might need more or less nodes depending on the cluster size, if the destination system can keep up, complexity of transformations, or other parameters.

Sizing the cluster when you don’t control the message system configuration

In this case, your options for tuning are much more limited.

Check if a cluster with the same number of Spark cores as existing partitions (determined by the message system) is able to keep up with the expected volume of data and latency, plus some allowance for peak times.

If that’s not the case, adding more nodes alone won’t help. You need to repartition the incoming data inside AWS Glue. This operation adds an overhead to redistribute the data internally, but it’s the only way the job can scale out in this scenario.

Let’s illustrate with an example. Imagine we have a Kinesis data stream with one shard that we don’t control, and there isn’t enough volume to justify asking the owner to increase the shards. In the cluster, significant computing for each message is needed; for each message, it runs heuristics and other ML techniques to take action depending on the calculations. After running some benchmarks, the calculations can be done promptly for the expected volume of messages using 8 cores working in parallel. By default, because there is only one shard, only one core will process all the messages sequentially.

To solve this scenario, we can provision an AWS Glue 3.0 cluster with 3 G 1X nodes to have 8 worker cores available. In the code repartition, the batch distributes the messages randomly (as evenly as possible) between them:

def batch_function(data_frame, batch_id):
    # Repartition so the udf is called in parallel for each partition
    data_frame.repartition(8).foreach(process_event_udf)

glueContext.forEachBatch(frame=streaming_df, batch_function=batch_function)

If the messaging system resizes the number of partitions or shards, the job picks up this change on the next batch. You might need to adjust the cluster capacity accordingly with the new data volume.

The streaming job is able to process more partitions than Spark cores are available, but might cause inefficiencies because the additional partitions will be queued and won’t start being processed until others finish. This might result in many nodes being idle while the remaining partitions finish and the next batch can be triggered.

When the messages have processing interdependencies

If the messages to be processed depend on other messages (either in the same or previous batches), that’s likely to be a limiting factor on the scalability. In that case, it might help to analyze a batch (job in Spark UI) to see where the time is spent and if there are imbalances by checking the task duration percentiles on the Stages tab (you can also reach this page by choosing a stage on the Jobs tab).

Auto scaling

Up to now, you have seen sizing methods to handle a stable stream of data with the occasional peak.
However, for variable incoming volumes of data, this isn’t cost-effective because you need to size for the worst-case scenario or accept higher latency at peak times.

This is where AWS Glue Streaming 3.0 auto scaling comes in. You can enable it for the job and define the maximum number of workers you want to allow (for example, using the number you have determined needed for the peak times).

The runtime monitors the trend of time spent on batch processing and compares it with the configured interval. Based on that, it makes a decision to increase or decrease the number of workers as needed, being more aggressive as the batch times get near or go over the allowed interval time.

The following screenshot is an example of a streaming job with auto scaling enabled.

Splitting workloads

You have seen how to scale a single job by adding nodes and partitioning the data as needed, which is enough on most cases. As the cluster grows, there is still a single driver and the nodes have to wait for the others to complete the batch before they can take additional work. If it reaches a point that increasing the cluster size is no longer effective, you might want to consider splitting the workload between separate jobs.

In the case of Kinesis, you need to divide the data into multiple streams, but for Apache Kafka, you can divide a topic into multiple jobs by assigning partitions to each one. To do so, instead of the usual subscribe or subscribePattern where the topics are listed, use the property assign to assign using JSON a subset of the topic partitions that the job will handle (for example, {"topic1": [0,1,2]}). At the time of this writing, it’s not possible to specify a range, so you need to list all the partitions, for instance building that list dynamically in the code.

Sizing down

For low volumes of traffic, AWS Glue Streaming has a special type of small node: G 0.25X, which provides two cores and 4 GB RAM for a quarter of the cost of a DPU, so it’s very cost-effective. However, even with that frugal capacity, if you have many small streams, having a small cluster for each one is still not practical.

For such situations, there are currently a few options:

  • Configure the stream DataFrame to feed from multiple Kafka topics or Kinesis streams. Then in the DataFrame, use the columns topic and streamName, for Kafka and Kinesis sources respectively, to determine how to handle the data (for example, different transformations or destinations). Make sure the DataFrame is cached, so you don’t read the data multiple times.
  • If you have a mix of Kafka and Kinesis sources, you can define a DataFrame for each, join them, and process as needed using the columns mentioned in the previous point.
  • The preceding two cases require all the sources to have the same batch interval and links their processing (for example, a busier stream can delay a slower one). To have independent stream processing inside the same cluster, you can trigger the processing of separate stream’s DataFrames using separate threads. Each stream is monitored separately in the Spark UI, but you’re responsible for starting and managing those threads and handle errors.

Settings

In this post, we showed some config settings that impact performance. The following table summarizes the ones we discussed and other important config properties to use when creating the input stream DataFrame.

Property Applies to Remarks
maxOffsetsPerTrigger Kafka Limit of messages per batch. Divides the limit evenly among partitions.
kinesis.executor.maxFetchRecordsPerShard Kinesis Limit per each shard, therefore should be revised if the number of shards changes.
kinesis.executor.maxFetchTimeInMs Kinesis When increasing the batch size (either by increasing the batch interval or the previous property), the executor might need more time, allotted by this property.
startingOffsets Kafka Normally you want to read all the data available and therefore use earliest. However, if there is a big backlog, the system might take a long time to catch up and instead use latest to skip the history.
startingposition Kinesis Similar to startingOffsets, in this case the values to use are TRIM_HORIZON to backload and LATEST to start processing from now on.
includeHeaders Kafka Enable this flag if you need to merge and split multiple topics in the same job (see the previous section for details).
kinesis.executor.maxconnections Kinesis When writing to Kinesis, by default it uses a single connection. Increasing this might improve performance.
kinesis.client.avoidEmptyBatches Kinesis It’s best to set it to true to avoid wasting resources (for example, generating empty files) when there is no data (like the Kafka connector does). GlueContext.forEachBatch prevents empty batches by default.

Further optimizations

In general, it’s worth doing some compression on the messages to save on transfer time (at the expense of some CPU, depending on the compression type used).

If the producer compresses the messages individually, AWS Glue can detect it and decompress automatically in most cases, depending on the format and type. For more information, refer to Adding Streaming ETL Jobs in AWS Glue.

If using Kafka, you have the option to compress the topic. This way, the compression is more effective because it’s done in batches, end-to-end, and it’s transparent to the producer and consumer.

By default, the GlueContext.forEachBatch function caches the incoming data. This is helpful if the data needs to be sent to multiple sinks (for example, as Amazon S3 files and also to update a DynamoDB table) because otherwise the job would read the data multiple times from the source. But it can be detrimental to performance if the volume of data is big and there is only one output.

To disable this option, set persistDataFrame as false:

glueContext.forEachBatch(
    frame=myStreamDataFrame,
    batch_function=processBatch,
    options={
        "windowSize": "30 seconds",
        "checkpointLocation": myCheckpointPath,
        "persistDataFrame":  "false"
    }
)

In streaming jobs, it’s common to have to join streaming data with another DataFrame to do enrichment (for example, lookups). In that case, you want to avoid any shuffle if possible, because it splits stages and causes data to be moved between nodes.

When the DataFrame you’re joining to is relatively small to fit in memory, consider using a broadcast join. However, bear in mind it will be distributed to the nodes on every batch, so it might not be worth it if the batches are too small.

If you need to shuffle, consider enabling the Kryo serializer (if using custom serializable classes you need to register them first to use it).

As in any AWS Glue jobs, avoid using custom udf() if you can do the same with the provided API like Spark SQL. User-defined functions (UDFs) prevent the runtime engine from performing many optimizations (the UDF code is a black box for the engine) and in the case of Python, it forces the movement of data between processes.

Avoid generating too many small files (especially columnar like Parquet or ORC, which have overhead per file). To do so, it might be a good idea to coalesce the micro-batch DataFrame before writing the output. If you’re writing partitioned data to Amazon S3, repartition based on columns can significantly reduce the number of output files created.

Conclusion

In this post, you saw how to approach sizing and tuning an AWS Glue streaming job in different scenarios, including planning considerations, configuration, monitoring, tips, and pitfalls.

You can now use these techniques to monitor and improve your existing streaming jobs or use them when designing and building new ones.


About the author

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.