Tag Archives: jupyternotebook

Accelerate your data exploration and experimentation with the AWS Analytics Reference Architecture library

Post Syndicated from Lotfi Mouhib original https://aws.amazon.com/blogs/big-data/accelerate-your-data-exploration-and-experimentation-with-the-aws-analytics-reference-architecture-library/

Organizations use their data to solve complex problems by starting small, running iterative experiments, and refining the solution. Although the power of experiments can’t be ignored, organizations have to be cautious about the cost-effectiveness of such experiments. If time is spent creating the underlying infrastructure for enabling experiments, it further adds to the cost.

Developers need an integrated development environment (IDE) for data exploration and debugging of workflows, and different compute profiles for running these workflows. If you choose Amazon EMR for such use cases, you can use an IDE called Amazon EMR Studio for data exploration, transformation, version control, and debugging, and run Spark jobs to process large volume of data. Deploying Amazon EMR on Amazon EKS simplifies management, reduces costs, and improves performance. However, a data engineer or IT administrator needs to spend time creating the underlying infrastructure, configuring security, and creating a managed endpoint for users to connect to. This means such projects have to wait until these experts create the infrastructure.

In this post, we show how a data engineer or IT administrator can use the AWS Analytics Reference Architecture (ARA) to accelerate infrastructure deployment, saving your organization both time and money spent on these data analytics experiments. We use the library to deploy an Amazon Elastic Kubernetes (Amazon EKS) cluster, configure it to use Amazon EMR on EKS, and deploy a virtual cluster and managed endpoints and EMR Studio. You can then either run jobs on the virtual cluster or run exploratory data analysis with Jupyter notebooks on Amazon EMR Studio and Amazon EMR on EKS. The architecture below represent the infrastructure you will deploy with the AWS Analytics Reference Architecture.

cdk-emr-eks-studio-architecture

Prerequisites

To follow along, you need to have an AWS account that is bootstrapped with the AWS Cloud Development Kit (AWS CDK). For instructions, refer to Bootstrapping. The following tutorial uses TypeScript, and requires version 2 or later of the AWS CDK. If you don’t have the AWS CDK installed, refer to Install the AWS CDK.

Set up an AWS CDK project

To deploy resources using the ARA, you first need to set up an AWS CDK project and install the ARA library. Complete the following steps:

  1. Create a folder named emr-eks-app:
    mkdir emr-eks-app && cd emr-eks-app

  2. Initialize an AWS CDK project in an empty directory and run the following command:
    cdk init app --language typescript

  3. Install the ARA library:
    npm install aws-analytics-reference-architecture --save

  4. In lib/emr-eks-app.ts, import the ARA library as follows. The first line calls the ARA library, the second one defines AWS Identity and Access Management (IAM) policies:
    import * as ara from 'aws-analytics-reference-architecture'; 
    import * as iam from 'aws-cdk-lib/aws-iam';

Create and define an EKS cluster and compute capacity

To create an EMR on EKS virtual cluster, you first need to deploy an EKS cluster. The ARA library defines a construct called EmrEksCluster. The construct provisions an EKS cluster, enables IAM roles for service accounts, and deploys a set of supporting controllers like certificate manager controller (needed by the managed endpoint that is used by Amazon EMR Studio) as well as a cluster auto scaler to have an elastic cluster and save on cost when no job is submitted to the cluster.

In lib/emr-eks-app.ts, add the following line:

const emrEks = ara.EmrEksCluster.getOrCreate(this,{ 
   eksAdminRoleArn:ROLE_ARN;, 
   eksClusterName:CLUSTER_NAME;
   autoscaling: Autoscaler.KARPENTER, 
});

To learn more about the properties you can customize, refer to EmrEksClusterProps. There are two mandatory parameters in EmrEksCluster construct: The first is eksAdminRoleArn role is mandatory and is the role you use to interact with the Kubernetes control plane. This role must have administrative permissions to create or update the cluster. The second parameter is autoscaling, this parameter allows you to select the autoscaling mechanism, either Karpenter or native Kubernetes Cluster Autoscaler. In this blog we will use Karpenter and we recommend its use due to faster autoscaling, simplified node management and provisioning. Now you’re ready to define the compute capacity.

One way to define worker nodes in Amazon EKS is to use managed node groups. We use one node group called tooling, which hosts the coredns, ingress controller, certificate manager, Karpenter and any other pod that is necessary for the running EMR on EKS jobs or ManagedEndpoint. We also define default Karpenter Provisioners that define capacity to be used for jobs submitted by EMR on EKS. These Provisioners are optimized for different Spark use cases (critical jobs, non-critical job, experimentation and interactive sessions). The construct also allows you to submit your own provisioner defined by a Kubernetes manifest through a method called addKarpenterProvisioner. Let’s discuss the predefined Provisioners.

Default Provisioners configurations

The default provisioners are set for rapid experimentation and are always created by default. However, if you don’t want to use them, you can set the defaultNodeGroups parameter to false in the EmrEksCluster properties at creation time. The Provisioners are defined as follows and are created in each of the subnets that are used by Amazon EKS:

  • Critical provisioner – It is dedicated to supporting jobs with aggressive SLAs and are time sensitive. The provisioner uses On-Demand Instances, which aren’t stopped, unlike Spot Instances, and their lifecycle follows through one of the jobs. The nodes use instance stores, which are NVMe disks physically attached to the host, which offer a high I/O throughput that allow better Spark performance, because it’s used as temporary storage for disk spill and shuffle. The instance types used in the node are of the m6gd family. The instances use the AWS Graviton processor, which offers better price/performance than x86 processors. To use this provisioner in your jobs, you can use the following sample configuration, which is referenced in the configuration override of the EMR on EKS job submission.
  • Non-critical provisioner – This Provisioner leverage Spot Instances to save costs for jobs that aren’t time sensitive or jobs that are used for experiments. This node use Spot Instances because the jobs aren’t critical and can be interrupted. These instances can be stopped if the instance is reclaimed. The instance types used in the node are of the m6gd family, the driver is On-Demand and executors are on spot instances.
  • Notebook provisioner – The Provisioner is for running managed endpoints that are used by Amazon EMR Studio for data exploration using Amazon EMR on EKS. The instances are of t3 family and are On-Demand for driver and Spot Instances for executors to keep the cost low. If the executor instances are stopped, new ones are started by Karpenter. If the executor instances are stopped too often, you can define your own that use On-Demand instances.

The following link provides more details about how each of the provisioner are defined. One import property that is defined in the default Provisioners is there is one for each AZ. This is important because it allows you to reduce inter-AZ network transfer cost when Spark runs a shuffle.

For this post, we use the default Provisioners, so you don’t need to add any lines of code for this section. If you want yo add your own Provisioners you can leverage the method addKarpenterProvisioner to apply your own manifests. You can use helper methods in Utils class like readYamlDocument to read YAML document and loadYaml load YAML files and pass them as arguments to addKarpenterProvisioner method.

Deploy the virtual cluster and an execution role

A virtual cluster is a Kubernetes namespace that Amazon EMR is registered with; when you submit a job, the driver and executor pods are running in the associated namespace. The EmrEksCluster construct offers a method called addEmrVirtualCluster, which creates the virtual cluster for you. The method takes EmrVirtualClusterOptions as a parameter, which has the following attributes:

  • name – The name of your virtual cluster.
  • createNamespace – An optional field that creates the EKS namespace. This is of type Boolean and by default it doesn’t create a separate EKS namespace, so your virtual cluster is created in the default namespace.
  • eksNamespace – The name of the EKS namespace to be linked with the virtual EMR cluster. If no namespace is supplied, the construct uses the default namespace.
  1. In lib/emr-eks-app.ts, add the following line to create your virtual cluster:
    const virtualCluster = emrEks.addEmrVirtualCluster(this,{ 
       name:'my-emr-eks-cluster', 
       eksNamespace: ‘batchjob’, 
       createNamespace: true 
    });

    Now we create the execution role, which is an IAM role that is used by the driver and executor to interact with AWS services. Before we can create the execution role for Amazon EMR, we need to first create the ManagedPolicy. Note that in the following code, we create a policy to allow access to the Amazon Simple Storage Service (Amazon S3) bucket and Amazon CloudWatch logs.

  2. In lib/emr-eks-app.ts, add the following line to create the policy:
    const emrEksPolicy = new iam.ManagedPolicy(this,'managed-policy',
    { statements: [ 
       new iam.PolicyStatement({ 
           effect: iam.Effect.ALLOW, 
           actions:['s3:PutObject','s3:GetObject','s3:ListBucket'], 
           resources:['YOUR-DATA-S3-BUCKET']
        }), 
       new iam.PolicyStatement({ 
           effect: iam.Effect.ALLOW, 
           actions:['logs:PutLogEvents','logs:CreateLogStream','logs:DescribeLogGroups','logs:DescribeLogStreams'], 
           resources:['arn:aws:logs:*:*:*'] 
        })
       ] 
    });

    If you want to use the AWS Glue Data Catalog, add its permission in the preceding policy.

    Now we create the execution role for Amazon EMR on EKS using the policy defined in the previous step using the createExecutionRole instance method. The driver and executor pods can then assume this role to access and process data. The role is scoped in such a way that only pods in the virtual cluster namespace can assume it. To learn more about the condition implemented by this method to restrict access to the role to only pods that are created by Amazon EMR on EKS in the namespace of the virtual cluster, refer to Using job execution roles with Amazon EMR on EKS.

  3. In lib/emr-eks-app.ts, add the following line to create the execution role:
    const role = emrEks.createExecutionRole(this,'emr-eks-execution-role', emrEksPolicy, ‘batchjob’,’ execRoleJob’);

    The preceding code produces an IAM role called execRoleJob with the IAM policy defined in emrekspolicy and scoped to the namespace dataanalysis.

  4. Lastly, we output parameters that are important for the job run:
// Virtual cluster Id to reference in jobs
new cdk.CfnOutput(this, 'VirtualClusterId', { value: virtualCluster.attrId });

// Job config for each nodegroup
new cdk.CfnOutput(this, 'CriticalConfig', { value: emrEks.criticalDefaultConfig });

// Execution role arn
new cdk.CfnOutput(this, 'ExecRoleArn', { value: role.roleArn });

Deploy Amazon EMR Studio and provision users

To deploy an EMR Studio for data exploration and job authoring, the ARA library has a construct called NotebookPlatform. This construct allows you to deploy as many EMR Studios as you need (within the account limit) and set them up with the authentication mode that is suitable for you and assign users to them. To learn more about the authentication modes available in Amazon EMR Studio, refer to Choose an authentication mode for Amazon EMR Studio.

The construct creates all the necessary IAM roles and policies needed by Amazon EMR Studio. It also creates an S3 bucket where all the notebooks are stored by Amazon EMR Studio. The bucket is encrypted with a customer managed key (CMK) generated by the AWS CDK stack. The following steps show you how to create your own EMR Studio with the construct.

The notebook platform construct takes NotebookPlatformProps as a property, which allows you to define your EMR Studio, a namespace, the name of the EMR Studio, and its authentication mode.

  1. In lib/emr-eks-app.ts, add the following line:
    const notebookPlatform = new ara.NotebookPlatform(this, 'platform-notebook', {
    emrEks: emrEks,
    eksNamespace: 'dataanalysis',
    studioName: 'platform',
    studioAuthMode: ara.StudioAuthMode.IAM,
    });

    For this post, we use IAM users so that you can easily reproduce it in your own account. However, if you have IAM federation or single sign-on (SSO) already in place, you can use them instead of IAM users.To learn more about the parameters of NotebookPlatformProps, refer to NotebookPlatformProps.

    Next, we need to create and assign users to the Amazon EMR Studio. For this, the construct has a method called addUser that takes a list of users and either assigns them to Amazon EMR Studio in case of SSO or updates the IAM policy to allows access to Amazon EMR Studio for the provided IAM users. The user can also have multiple managed endpoints, and each user can have their Amazon EMR version defined. They can use a different set of Amazon Elastic Compute Cloud (Amazon EC2) instances and different permissions using job execution roles.

  2. In lib/emr-eks-app.ts, add the following line:
    notebookPlatform.addUser([{
    identityName:<NAME-OF-EXISTING-IAM-USER>,
    notebookManagedEndpoints: [{
    emrOnEksVersion: 'emr-6.8.0-latest',
    executionPolicy: emrEksPolicy,
    managedEndpointName: ‘myendpoint’
    }],
    }]);

    In the preceding code, for the sake of brevity, we reuse the same IAM policy that we created in the execution role.

    Note that the construct optimizes the number of managed endpoints that are created. If two endpoints have the same name, then only one is created.

  3. Now that we have defined our deployment, we can deploy it:
   npm run build && cdk deploy

You can find a sample project that contains all the steps of the walk through in the following GitHub repository.

When the deployment is complete, the output contains the S3 bucket containing the assets for podTemplate, the link for the EMR Studio, and the EMR Studio virtual cluster ID. The following screenshot shows the output of the AWS CDK after the deployment is complete.

CDK output
Submit jobs

Because we’re using the default Provisioners, we will use the podTemplate that is defined by the construct available on the ARA GitHub repository. These are uploaded for you by the construct to an S3 bucket called <clustername>-emr-eks-assets; you only need to refer to them in your Spark job. In this job, you also use the job parameters in the output at the end of the AWS CDK deployment. These parameters allow you to use the AWS Glue Data Catalog and implement Spark on Kubernetes best practices like dynamicAllocation and pod collocation. At the end of cdk deploy ARA will output job sample configurations with the best practices listed before that you can use to submit a job. You can submit a job as follows.

A job run is a unit of work such as a Spark JAR file that is submitted to the EMR on EKS cluster. We start a job using the start-job-run command. Note you can use SparkSubmitParameters to specify the Amazon S3 path to the pod template, as shown in the following command:

aws emr-containers start-job-run \

--virtual-cluster-id <CLUSTER-ID>\

--name <SPARK-JOB-NAME>\

--execution-role-arn <ROLE-ARN> \

--release-label emr-6.8.0-latest \

--job-driver '{
"sparkSubmitJobDriver": {
"entryPoint": ""<S3URI-SPARK-JOB>"
}
}' --configuration-overrides '{
"applicationConfiguration": [
{
"classification": "spark-defaults",
"properties": {
"spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",

"spark.sql.catalogImplementation": "hive",

"spark.dynamicAllocation.enabled":"true",

"spark.dynamicAllocation.minExecutors": "8",

"spark.dynamicAllocation.maxExecutors": "40",

"spark.kubernetes.allocation.batch.size": "8",

"spark.executor.cores": "8",

"spark.kubernetes.executor.request.cores": "7",

"spark.executor.memory": "28G",

"spark.driver.cores": "2",

"spark.kubernetes.driver.request.cores": "2",

"spark.driver.memory": "6G",

"spark.dynamicAllocation.executorAllocationRatio": "1",

"spark.dynamicAllocation.shuffleTracking.enabled": "true",

"spark.dynamicAllocation.shuffleTracking.timeout": "300s",

"spark.kubernetes.driver.podTemplateFile": s3://<EKS-CLUSTER-NAME>-emr-eks-assets-<ACCOUNT-ID>-<REGION> /<EKS-CLUSTER-NAME>/pod-template/critical-driver.yaml ",

"spark.kubernetes.executor.podTemplateFile": s3://<EKS-CLUSTER-NAME>-emr-eks-assets-<ACCOUNT-ID>-<REGION> /<EKS-CLUSTER-NAME>/pod-template/critical-executor.yaml "
}
}
],
"monitoringConfiguration": {
"cloudWatchMonitoringConfiguration": {
"logGroupName": ""<Log_Group_Name>",
"logStreamNamePrefix": "<Log_Stream_Prefix>"
}
}'

The code takes the following values:

  • <CLUSTER-ID> – The EMR virtual cluster ID
  • <SPARK-JOB-NAME> – The name of your Spark job
  • <ROLE-ARN> – The execution role you created
  • <S3URI-SPARK-JOB> – The Amazon S3 URI of your Spark job
  • <S3URI-CRITICAL-DRIVER> – The Amazon S3 URI of the driver pod template, which you get from the AWS CDK output
  • <S3URI-CRITICAL-EXECUTOR> – The Amazon S3 URI of the executor pod template
  • <Log_Group_Name> – Your CloudWatch log group name
  • <Log_Stream_Prefix> – Your CloudWatch log stream prefix

You can go to the Amazon EMR console to check the status of your job and to view logs. You can also check the status by running the describe-job-run command:

aws emr-containers describe-job-run --<CLUSTER-ID> cluster-id --id <JOB-RUN-ID>

Explore data using Amazon EMR Studio

In this section, we show how you can create a workspace in Amazon EMR Studio and connect to the Amazon EKS managed endpoint from the workspace. From the output, use the link to Amazon EMR Studio to navigate to the EMR Studio deployment. You must sign in with the IAM username you provided in the addUser method.

Create a Workspace

To create a Workspace, complete the following steps:

  1. Log in to the EMR Studio created by the AWS CDK.
  2. Choose Create Workspace.
  3. Enter a workspace name and an optional description.
  4. Select Allow Workspace Collaboration if you want to work with other Studio users in this Workspace in real time.
  5. Choose Create Workspace.

create-emr-studio-workspace

After you create the Workspace, choose it from the list of Workspaces to open the JupyterLab environment.
emr studio workspace running

The following screenshot shows what the terminal looks like. For more information about the user interface, refer to Understand the Workspace user interface.

EMR Studio workspace view

Connect to an EMR on EKS managed endpoint

You can easily connect to the EMR on EKS managed endpoint from the Workspace.

  1. In the navigation pane, on the Clusters menu, select EMR Cluster on EKS for Cluster type.
    The virtual clusters appear on the EMR Cluster on EKS drop-down menu, and the endpoint appears on the Endpoint drop-down menu. If there are multiple endpoints, they appear here, and you can easily switch between endpoints from the Workspace.
  2. Select the appropriate endpoint and choose Attach.
    attach to managedendpoint

Work with a notebook

You can now open a notebook and connect to a preferred kernel to do your tasks. For instance, you can select a PySpark kernel, as shown in the following screenshot.
select-kernel

Explore your data

The first step of our data exploration exercise is to create a Spark session and then load the New York taxi dataset from the S3 bucket into a data frame. Use the following code block to load the data into a data frame. Copy the Amazon S3 URI for the location where the dataset resides in Amazon S3.

	from pyspark.sql import SparkSession
	from pyspark.sql.functions import *
	from datetime import datetime
	spark = SparkSession.builder.appName("SparkEDAA").getOrCreate()

After we load the data into a data frame, we replace the data of the current_date column with the actual current date, count the number of rows, and save the data into a Parquet file:

print("Total number of records: " + str(updatedNYTaxi.count()))
updatedNYTaxi.write.parquet("<YOUR-S3-PATH>")

The following screenshot shows the result of our notebook running on Amazon EMR Studio and with PySpark running on Amazon EMR on EKS.
notebook execution

Clean up

To clean up after this post, run cdk destroy.

Conclusion

In this post, we showed how you can use the ARA to quickly deploy a data analytics infrastructure and start experimenting with your data. You can find the full example referenced in this post in the GitHub repository. The AWS Analytics Reference Architecture implements common Analytics pattern and AWS best practices to offer you ready to use constructs to for your experiments. One of the patterns is the data mesh, which you can consult how to use in this blog post.

You can also explore other constructs offered in this library to experiment with AWS Analytics services before transitioning your workload for production.


About the Authors

co-author-1Lotfi Mouhib is a Senior Solutions Architect working for the Public Sector team with Amazon Web Services. He helps public sector customers across EMEA realize their ideas, build new services, and innovate for citizens. In his spare time, Lotfi enjoys cycling and running.

Sandipan Bhaumik is a Senior Analytics Specialist Solutions Architect based in London. He has worked with customers in different industries like Banking & Financial Services, Healthcare, Power & Utilities, Manufacturing and Retail helping them solve complex challenges with large-scale data platforms. At AWS he focuses on strategic accounts in the UK and Ireland and helps customers to accelerate their journey to the cloud and innovate using AWS analytics and machine learning services. He loves playing badminton, and reading books.

Interactively develop your AWS Glue streaming ETL jobs using AWS Glue Studio notebooks

Post Syndicated from Arun A K original https://aws.amazon.com/blogs/big-data/interactively-develop-your-aws-glue-streaming-etl-jobs-using-aws-glue-studio-notebooks/

Enterprise customers are modernizing their data warehouses and data lakes to provide real-time insights, because having the right insights at the right time is crucial for good business outcomes. To enable near-real-time decision-making, data pipelines need to process real-time or near-real-time data. This data is sourced from IoT devices, change data capture (CDC) services like AWS Data Migration Service (AWS DMS), and streaming services such as Amazon Kinesis, Apache Kafka, and others. These data pipelines need to be robust, able to scale, and able to process large data volumes in near-real time. AWS Glue streaming extract, transform, and load (ETL) jobs process data from data streams, including Kinesis and Apache Kafka, apply complex transformations in-flight, and load it into a target data stores for analytics and machine learning (ML).

Hundreds of customers are using AWS Glue streaming ETL for their near-real-time data processing requirements. These customers required an interactive capability to process streaming jobs. Previously, when developing and running a streaming job, you had to wait for the results to be available in the job logs or persisted into a target data warehouse or data lake to be able to view the results. With this approach, debugging and adjusting code is difficult, resulting in a longer development timeline.

Today, we are launching a new AWS Glue streaming ETL feature to interactively develop streaming ETL jobs in AWS Glue Studio notebooks and interactive sessions.

In this post, we provide a use case and step-by-step instructions to develop and debug your AWS Glue streaming ETL job using a notebook.

Solution overview

To demonstrate the streaming interactive sessions capability, we develop, test, and deploy an AWS Glue streaming ETL job to process Apache Webserver logs. The following high-level diagram represents the flow of events in our job.
BDB-2464 High Level Application Architecture
Apache Webserver logs are streamed to Amazon Kinesis Data Streams. An AWS Glue streaming ETL job consumes the data in near-real time and runs an aggregation that computes how many times a webpage has been unavailable (status code 500 and above) due to an internal error. The aggregate information is then published to a downstream Amazon DynamoDB table. As part of this post, we develop this job using AWS Glue Studio notebooks.

You can either work with the instructions provided in the notebook, which you download when instructed later in this post, or follow along with this post to author your first streaming interactive session job.

Prerequisites

To get started, click the Launch Stack button below, to run an AWS CloudFormation template on your AWS environment.

BDB-2063-launch-cloudformation-stack

The template provisions a Kinesis data stream, DynamoDB table, AWS Glue job to generate simulated log data, and the necessary AWS Identity and Access Management (IAM) role and polices. After you deploy your resources, you can review the Resources tab on the AWS CloudFormation console for detailed information.

Set up the AWS Glue streaming interactive session job

To set up your AWS Glue streaming job, complete the following steps:

  1. Download the notebook file and save it to a local directory on your computer.
  2. On the AWS Glue console, choose Jobs in the navigation pane.
  3. Choose Create job.
  4. Select Jupyter Notebook.
  5. Under Options, select Upload and edit an existing notebook.
  6. Choose Choose file and browse to the notebook file you downloaded.
  7. Choose Create.
BDB-2464 Create Job
  1. For Job name¸ enter a name for the job.
  2. For IAM Role, use the role glue-iss-role-0v8glq, which is provisioned as part of the CloudFormation template.
  3. Choose Start notebook job.
BDB-2464 Start Notebook

You can see that the notebook is loaded into the UI. There are markdown cells with instructions as well as code blocks that you can run sequentially. You can either run the instructions on the notebook or follow along with this post to continue with the job development.

BDB-2464 Explore Notebook

Run notebook cells

Let’s run the code block that has the magics. The notebook has notes on what each magic does.

  1. Run the first cell.
BDB-2464 Run First Cell

After running the cell, you can see in the output section that the defaults have been reconfigured.

BDB-2464 Configurations Set

In the context of streaming interactive sessions, an important configuration is job type, which is set to streaming. Additionally, to minimize costs, the number of workers is set to 2 (default 5), which is sufficient for our use case that deals with a low-volume simulated dataset.

Our next step is to initialize an AWS Glue streaming session.

  1. Run the next code cell.
BDB-2464 Initiate Session

After we run this cell, we can see that a session has been initialized and a session ID is created.

A Kinesis data stream and AWS Glue data generator job that feeds into this stream have already been provisioned and triggered by the CloudFormation template. With the next cell, we consume this data as an Apache Spark DataFrame.

  1. Run the next cell.
BDB-2464 Fetch From Kinesis

Because there are no print statements, the cells don’t show any output. You can proceed to run the following cells.

Explore the data stream

To help enhance the interactive experience in AWS Glue interactive sessions, GlueContext provides the method getSampleStreamingDynamicFrame. It provides a snapshot of the stream in a static DynamicFrame. It takes three arguments:

  • The Spark streaming DataFrame
  • An options map
  • A writeStreamFunction to apply a function to every sampled record

Available options are as follows:

  • windowSize – Also known as the micro-batch duration, this parameter determines how long a streaming query will wait after the previous batch was triggered.
  • pollingTimeInMs – This is the total length of time the method will run. It starts at least one micro-batch to obtain sample records from the input stream. The time unit is milliseconds, and the value should be greater than the windowSize.
  • recordPollingLimit – This is defaulted to 100, and helps you set an upper bound on the number of records that is retrieved from the stream.

Run the next code cell and explore the output.

BDB-2464 Sample Data

We see that the sample consists of 100 records (the default record limit), and we have successfully displayed the first 10 records from the sample.

Work with the data

Now that we know what our data looks like, we can write the logic to clean and format it for our analytics.

Run the code cell containing the reformat function.

Note that Python UDFs aren’t the recommended way to handle data transformations in a Spark application. We use reformat() to exemplify troubleshooting. When working with a real-world production application, we recommend using native APIs wherever possible.

BDB-2464 Run The UDF

We see that the code cell failed to run. The failure was on purpose. We deliberately created a division by zero exception in our parser.

BDB-2464 Error Running The Code

Failure and recovery

In case of a regular AWS Glue job, for any error, the whole application exits, and you have to make code changes and resubmit the application. However, in case of interactive sessions, the coding context and definitions are fully preserved and the session is still operational. There is no need to bootstrap a new cluster and rerun all the preceding transformation. This allows you to focus on quickly iterating your batch function implementation to obtain the desired outcome. You can fix the defects and run them in a matter of seconds.

To test this out, go back to the code and comment or delete the erroneous line error_line=1/0 and rerun the cell.

BDB-2464 Error Corrected

Implement business logic

Now that we have successfully tested our parsing logic on the sample stream, let’s implement the actual business logic. The logics are implemented in the processBatch method within the next code cell. In this method, we do the following:

  • Pass the streaming DataFrame in micro-batches
  • Parse the input stream
  • Filter messages with status code >=500
  • Over a 1-minute interval, get the count of failures per webpage
  • Persist the preceding metric to a DynamoDB table (glue-iss-ddbtbl-0v8glq)
  1. Run the next code cell to trigger the stream processing.
BDB-2464 Trigger DDB Write
  1. Wait a few minutes for the cell to complete.
  2. On the DynamoDB console, navigate to the Items page and select the glue-iss-ddbtbl-0v8glq table.
BDB-2464 Explore DDB

The page displays the aggregated results that have been written by our interactive session job.

Deploy the streaming job

So far, we have been developing and testing our application using the streaming interactive sessions. Now that we’re confident of the job, let’s convert this into an AWS Glue job. We have seen that the majority of code cells are doing exploratory analysis and sampling, and aren’t required to be a part of the main job.

A commented code cell that represents the whole application is provided to you. You can uncomment the cell and delete all other cells. Another option would be to not use the commented cell, but delete just the two cells from the notebook that do the sampling or debugging and print statements.

To delete a cell, choose the cell and then choose the delete icon.

BDB-2464 Delete a Cell

Now that you have the final application code ready, save and deploy the AWS Glue job by choosing Save.

BDB-2464 Save Job

A banner message appears when the job is updated.

BDB-2464 Save Job Banner

Explore the AWS Glue job

After you save the notebook, you should be able to access the job like any regular AWS Glue job on the Jobs page of the AWS Glue console.

BDB-2464 Job Page

Additionally, you can look at the Job details tab to confirm the initial configurations, such as number of workers, have taken effect after deploying the job.

BDB-2464 Job Details Page

Run the AWS Glue job

If needed, you can choose Run to run the job as an AWS Glue streaming job.

BDB-2464 Job Run

To track progress, you can access the run details on the Runs tab.

BDB-2464 Job Run Details

Clean up

To avoid incurring additional charges to your account, stop the streaming job that you started as part of the instructions. Also, on the AWS CloudFormation console, select the stack that you provisioned and delete it.

Conclusion

In this post, we demonstrated how to do the following:

  • Author a job using notebooks
  • Preview incoming data streams
  • Code and fix issues without having to publish AWS Glue jobs
  • Review the end-to-end working code, remove any debugging, and print statements or cells from the notebook
  • Publish the code as an AWS Glue job

We did all of this via a notebook interface.

With these improvements in the overall development timelines of AWS Glue jobs, it’s easier to author jobs using the streaming interactive sessions. We encourage you to use the prescribed use case, CloudFormation stack, and notebook to jumpstart your individual use cases to adopt AWS Glue streaming workloads.

The goal of this post was to give you hands-on experience working with AWS Glue streaming and interactive sessions. When onboarding a productionized workload onto your AWS environment, based on the data sensitivity and security requirements, ensure you implement and enforce tighter security controls.


About the authors

Arun A K is a Big Data Solutions Architect with AWS. He works with customers to provide architectural guidance for running analytics solutions on the cloud. In his free time, Arun loves to enjoy quality time with his family.

Linan Zheng is a Software Development Engineer at AWS Glue Streaming Team, helping building the serverless data platform. His works involve large scale optimization engine for transactional data formats and streaming interactive sessions.

Roman Gavrilov is an Engineering Manager at AWS Glue. He has over a decade of experience building scalable Big Data and Event-Driven solutions. His team works on Glue Streaming ETL to allow near real time data preparation and enrichment for machine learning and analytics.

Shiv Narayanan is a Senior Technical Product Manager on the AWS Glue team. He works with AWS customers across the globe to strategize, build, develop, and deploy modern data platforms.

Provide data reliability in Amazon Redshift at scale using Great Expectations library

Post Syndicated from Faizan Ahmed original https://aws.amazon.com/blogs/big-data/provide-data-reliability-in-amazon-redshift-at-scale-using-great-expectations-library/

Ensuring data reliability is one of the key objectives of maintaining data integrity and is crucial for building data trust across an organization. Data reliability means that the data is complete and accurate. It’s the catalyst for delivering trusted data analytics and insights. Incomplete or inaccurate data leads business leaders and data analysts to make poor decisions, which can lead to negative downstream impacts and subsequently may result in teams spending valuable time and money correcting the data later on. Therefore, it’s always a best practice to run data reliability checks before loading the data into any targets like Amazon Redshift, Amazon DynamoDB, or Amazon Timestream databases.

This post discusses a solution for running data reliability checks before loading the data into a target table in Amazon Redshift using the open-source library Great Expectations. You can automate the process for data checks via the extensive built-in Great Expectations glossary of rules using PySpark, and it’s flexible for adding or creating new customized rules for your use case.

Amazon Redshift is a cloud data warehouse solution and delivers up to three times better price-performance than other cloud data warehouses. With Amazon Redshift, you can query and combine exabytes of structured and semi-structured data across your data warehouse, operational database, and data lake using standard SQL. Amazon Redshift lets you save the results of your queries back to your Amazon Simple Storage Service (Amazon S3) data lake using open formats like Apache Parquet, so that you can perform additional analytics from other analytics services like Amazon EMR, Amazon Athena, and Amazon SageMaker.

Great Expectations (GE) is an open-source library and is available in GitHub for public use. It helps data teams eliminate pipeline debt through data testing, documentation, and profiling. Great Expectations helps build trust, confidence, and integrity of data across data engineering and data science teams in your organization. GE offers a variety of expectations developers can configure. The tool defines expectations as statements describing verifiable properties of a dataset. Not only does it offer a glossary of more than 50 built-in expectations, it also allows data engineers and scientists to write custom expectation functions.

Use case overview

Before performing analytics or building machine learning (ML) models, cleaning data can take up a lot of time in the project cycle. Without automated and systematic data quality checks, we may spend most of our time cleaning data and hand-coding one-off quality checks. As most data engineers and scientists know, this process can be both tedious and error-prone.

Having an automated quality check system is critical to project efficiency and data integrity. Such systems help us understand data quality expectations and the business rules behind them, know what to expect in our data analysis, and make communicating the data’s intricacies much easier. For example, in a raw dataset of customer profiles of a business, if there’s a column for date of birth in format YYYY-mm-dd, values like 1000-09-01 would be correctly parsed as a date type. However, logically this value would be incorrect in 2021, because the age of the person would be 1021 years, which is impossible.

Another use case could be to use GE for streaming analytics, where you can use AWS Database Migration Service (AWS DMS) to migrate a relational database management system. AWS DMS can export change data capture (CDC) files in Parquet format to Amazon S3, where these files can then be cleansed by an AWS Glue job using GE and written to either a destination bucket for Athena consumption or the rows can be streamed in AVRO format to Amazon Kinesis or Kafka.

Additionally, automated data quality checks can be versioned and also bring benefit in the form of optimal data monitoring and reduced human intervention. Data lineage in an automated data quality system can also indicate at which stage in the data pipeline the errors were introduced, which can help inform improvements in upstream systems.

Solution architecture

This post comes with a ready-to-use blueprint that automatically provisions the necessary infrastructure and spins up a SageMaker notebook that walks you step by step through the solution. Additionally, it enforces the best practices in data DevOps and infrastructure as code. The following diagram illustrates the solution architecture.

The architecture contains the following components:

  1. Data lake – When we run the AWS CloudFormation stack, an open-source sample dataset in CSV format is copied to an S3 bucket in your account. As an output of the solution, the data destination is an S3 bucket. This destination consists of two separate prefixes, each of which contains files in Parquet format, to distinguish between accepted and rejected data.
  2. DynamoDB – The CloudFormation stack persists data quality expectations in a DynamoDB table. Four predefined column expectations are populated by the stack in a table called redshift-ge-dq-dynamo-blog-rules. Apart from the pre-populated rules, you can add any rule from the Great Expectations glossary according to the data model showcased later in the post.
  3. Data quality processing – The solution utilizes a SageMaker notebook instance powered by Amazon EMR to process the sample dataset using PySpark (v3.1.1) and Great Expectations (v0.13.4). The notebook is automatically populated with the S3 bucket location and Amazon Redshift cluster identifier via the SageMaker lifecycle config provisioned by AWS CloudFormation.
  4. Amazon Redshift – We create internal and external tables in Amazon Redshift for the accepted and rejected datasets produced from processing the sample dataset. The external dq_rejected.monster_com_rejected table, for rejected data, uses Amazon Redshift Spectrum and creates an external database in the AWS Glue Data Catalog to reference the table. The dq_accepted.monster_com table is created as a regular Amazon Redshift table by using the COPY command.

Sample dataset

As part of this post, we have performed tests on the Monster.com job applicants sample dataset to demonstrate the data reliability checks using the Great Expectations library and loading data into an Amazon Redshift table.

The dataset contains nearly 22,000 different sample records with the following columns:

  • country
  • country_code
  • date_added
  • has_expired
  • job_board
  • job_description
  • job_title
  • job_type
  • location
  • organization
  • page_url
  • salary
  • sector
  • uniq_id

For this post, we have selected four columns with inconsistent or dirty data, namely organization, job_type, uniq_id, and location, whose inconsistencies are flagged according to the rules we define from the GE glossary as described later in the post.

Prerequisites

For this solution, you should have the following prerequisites:

  • An AWS account if you don’t have one already. For instructions, see Sign Up for AWS.
  • For this post, you can launch the CloudFormation stack in the following Regions:
    • us-east-1
    • us-east-2
    • us-west-1
    • us-west-2
  • An AWS Identity and Access Management (IAM) user. For instructions, see Create an IAM User.
  • The user should have create, write, and read access for the following AWS services:
  • Familiarity with Great Expectations and PySpark.

Set up the environment

Choose Launch Stack to start creating the required AWS resources for the notebook walkthrough:

For more information about Amazon Redshift cluster node types, see Overview of Amazon Redshift clusters. For the type of workflow described in this post, we recommend using the RA3 Instance Type family.

Run the notebooks

When the CloudFormation stack is complete, complete the following steps to run the notebooks:

  1. On the SageMaker console, choose Notebook instances in the navigation pane.

This opens the notebook instances in your Region. You should see a notebook titled redshift-ge-dq-EMR-blog-notebook.

  1. Choose Open Jupyter next to this notebook to open the Jupyter notebook interface.

You should see the Jupyter notebook file titled ge-redshift.ipynb.

  1. Choose the file to open the notebook and follow the steps to run the solution.

Run configurations to create a PySpark context

When the notebook is open, make sure the kernel is set to Sparkmagic (PySpark). Run the following block to set up Spark configs for a Spark context.

Create a Great Expectations context

In Great Expectations, your data context manages your project configuration. We create a data context for our solution by passing our S3 bucket location. The S3 bucket’s name, created by the stack, should already be populated within the cell block. Run the following block to create a context:

from great_expectations.data_context.types.base import DataContextConfig,DatasourceConfig,S3StoreBackendDefaults
from great_expectations.data_context import BaseDataContext

bucket_prefix = "ge-redshift-data-quality-blog"
bucket_name = "ge-redshift-data-quality-blog-region-account_id"
region_name = '-'.join(bucket_name.replace(bucket_prefix,'').split('-')[1:4])
dataset_path=f"s3://{bucket_name}/monster_com-job_sample.csv"
project_config = DataContextConfig(
    config_version=2,
    plugins_directory=None,
    config_variables_file_path=None,
    datasources={
        "my_spark_datasource": {
            "data_asset_type": {
                "class_name": "SparkDFDataset",//Setting dataset type to Spark
                "module_name": "great_expectations.dataset",
            },
            "spark_config": dict(spark.sparkContext.getConf().getAll()) //Passing Spark Session configs,
            "class_name": "SparkDFDatasource",
            "module_name": "great_expectations.datasource"
        }
    },
    store_backend_defaults=S3StoreBackendDefaults(default_bucket_name=bucket_name)//
)
context = BaseDataContext(project_config=project_config)

For more details on creating a GE context, see Getting started with Great Expectations.

Get GE validation rules from DynamoDB

Our CloudFormation stack created a DynamoDB table with prepopulated rows of expectations. The data model in DynamoDB describes the properties related to each dataset and its columns and the number of expectations you want to configure for each column. The following code describes an example of the data model for the column organization:

{
 "id": "job_reqs-organization", 
 "dataset_name": "job_reqs", 
 "rules": [ //list of expectations to apply to this column
  {
   "kwargs": {
    "result_format": "SUMMARY|COMPLETE|BASIC|BOOLEAN_ONLY" //The level of detail of the result
   },
   "name": "expect_column_values_to_not_be_null",//name of GE expectation   "reject_msg": "REJECT:null_values_found_in_organization"
  }
 ],
 "column_name": "organization"
}

The code contains the following parameters:

  • id – Unique ID of the document
  • dataset_name – Name of the dataset, for example monster_com
  • rules – List of GE expectations to apply:
    • kwargs – Parameters to pass to an individual expectation
    • name – Name of the expectation from the GE glossary
    • reject_msg – String to flag for any row that doesn’t pass this expectation
  • column_name – Name of dataset column to run the expectations on

Each column can have one or more expectations associated that it needs to pass. You can also add expectations for more columns or to existing columns by following the data model shown earlier. With this technique, you can automate verification of any number of data quality rules for your datasets without performing any code change. Apart from its flexibility, what makes GE powerful is the ability to create custom expectations if the GE glossary doesn’t cover your use case. For more details on creating custom expectations, see How to create custom Expectations.

Now run the cell block to fetch the GE rules from the DynamoDB client:

  1. Read the monster.com sample dataset and pass through validation rules.

After we have the expectations fetched from DynamoDB, we can read the raw CSV dataset. This dataset should already be copied to your S3 bucket location by the CloudFormation stack. You should see the following output after reading the CSV as a Spark DataFrame.

To evaluate whether a row passes each column’s expectations, we need to pass the necessary columns to a Spark user-defined function. This UDF evaluates each row in the DataFrame and appends the results of each expectation to a comments column.

Rows that pass all column expectations have a null value in the comments column.

A row that fails at least one column expectation is flagged with the string format REJECT:reject_msg_from_dynamo. For example, if a row has a null value in the organization column, then according to the rules defined in DynamoDB, the comments column is populated by the UDF as REJECT:null_values_found_in_organization.

The technique with which the UDF function recognizes a potentially erroneous column is done by evaluating the result dictionary generated by the Great Expectations library. The generation and structure of this dictionary is dependent upon the keyword argument of result_format. In short, if the count of unexpected column values of any column is greater than zero, we flag that as a rejected row.

  1. Split the resulting dataset into accepted and rejected DataFrames.

Now that we have all the rejected rows flagged in the source DataFrame within the comments column, we can use this property to split the original dataset into accepted and rejected DataFrames. In the previous step, we mentioned that we append an action message in the comments column for each failed expectation in a row. With this fact, we can select rejected rows that start with the string REJECT (alternatively, you can also filter by non-null values in the comments column to get the accepted rows). When we have the set of rejected rows, we can get the accepted rows as a separate DataFrame by using the following PySpark except function.

Write the DataFrames to Amazon S3.

Now that we have the original DataFrame divided, we can write them both to Amazon S3 in Parquet format. We need to write the accepted DataFrame without the comments column because it’s only added to flag rejected rows. Run the cell blocks to write the Parquet files under appropriate prefixes as shown in the following screenshot.

Copy the accepted dataset to an Amazon Redshift table

Now that we have written the accepted dataset, we can use the Amazon Redshift COPY command to load this dataset into an Amazon Redshift table. The notebook outlines the steps required to create a table for the accepted dataset in Amazon Redshift using the Amazon Redshift Data API. After the table is created successfully, we can run the COPY command.

Another noteworthy point to mention is that one of the advantages that we witness due to the data quality approach described in this post is that the Amazon Redshift COPY command doesn’t fail due to schema or datatype errors for the columns, which have clear expectations defined that match the schema. Similarly, you can define expectations for every column in the table that satisfies the schema constraints and can be considered a dq_accepted.monster_com row.

Create an external table in Amazon Redshift for rejected data

We need to have the rejected rows available to us in Amazon Redshift for comparative analysis. These comparative analyses can help inform upstream systems regarding the quality of data being collected and how they can be corrected to improve the overall quality of data. However, it isn’t wise to store the rejected data on the Amazon Redshift cluster, particularly for large tables, because it occupies extra disk space and increase cost. Instead, we use Redshift Spectrum to register an external table in an external schema in Amazon Redshift. The external schema lives in an external database in the AWS Glue Data Catalog and is referenced by Amazon Redshift. The following screenshot outlines the steps to create an external table.

Verify and compare the datasets in Amazon Redshift.

12,160 records got processed successfully out of a total of 22,000 from the input dataset, and were loaded to the monster_com table under the dq_accepted schema. These records successfully passed all the validation rules configured in DynamoDB.

A total 9,840 records got rejected due to breaking of one or more rules configured in DynamoDB and loaded to the monster_com_rejected table in the dq_rejected schema. In this section, we describe the behavior of each expectation on the dataset.

  • Expect column values to not be null in organization – This rule is configured to reject a row if the organization is null. The following query returns the sample of rows, from the dq_rejected.monster_com_rejected table, that are null in the organization column, with their reject message.
  • Expect column values to match the regex list in job_type – This rule expects the column entries to be strings that can be matched to either any of or all of a list of regular expressions. In our use case, we have only allowed values that match a pattern within [".*Full.*Time", ".*Part.*Time", ".*Contract.*"].
  • The following query shows rows that are rejected due to an invalid job type.

Most of the records were rejected with multiple reasons, and all those mismatches are captured under the comments column.

  • Expect column values to not match regex for uniq_id – Similar to the previous rule, this rule aims to reject any row whose value matches a certain pattern. In our case, that pattern is having an empty space (\s++) in the primary column uniq_id. This means we consider a value to be invalid if it has empty spaces in the string. The following query returned an invalid format for uniq_id.
  • Expect column entries to be strings with a length between a minimum value and a maximum value (inclusive) – A length check rule is defined in the DynamoDB table for the location column. This rule rejects values or rows if the length of the value violates the specified constraints. The following
  • query returns the records that are rejected due to a rule violation in the location column.

You can continue to analyze the other columns’ predefined rules from DynamoDB or pick any rule from the GE glossary and add it to an existing column. Rerun the notebook to see the result of your data quality rules in Amazon Redshift. As mentioned earlier, you can also try creating custom expectations for other columns.

Benefits and limitations

The efficiency and efficacy of this approach is delineated from the fact that GE enables automation and configurability to an extensive degree when compared with other approaches. A very brute force alternative to this could be writing stored procedures in Amazon Redshift that can perform data quality checks on staging tables before data is loaded into main tables. However, this approach might not be scalable because you can’t persist repeatable rules for different columns, as persisted here in DynamoDB, in stored procedures (or call DynamoDB APIs), and would have to write and store a rule for each column of every table. Furthermore, to accept or reject a row based on a single rule requires complex SQL statements that may result in longer durations for data quality checks or even more compute power, which can also incur extra costs. With GE, a data quality rule is generic, repeatable, and scalable across different datasets.

Another benefit of this approach, related to using GE, is that it supports multiple Python-based backends, including Spark, Pandas, and Dask. This provides flexibility across an organization where teams might have skills in different frameworks. If a data scientist prefers using Pandas to write their ML pipeline feature quality test, then a data engineer using PySpark can use the same code base to extend those tests due to the consistency of GE across backends.

Furthermore, GE is written natively in Python, which means it’s a good option for engineers and scientists who are more used to running their extract, transform, and load (ETL) workloads in PySpark in comparison to frameworks like Deequ, which is natively written in Scala over Apache Spark and fits better for Scala use cases (the Python interface, PyDeequ, is also available). Another benefit of using GE is the ability to run multi-column unit tests on data, whereas Deequ doesn’t support that (as of this writing).

However, the approach described in this post might not be the most performant in some cases for full table load batch reads for very large tables. This is due to the serde (serialization/deserialization) cost of using UDFs. Because the GE functions are embedded in PySpark UDFs, the performance of these functions is slower than native Spark functions. Therefore, this approach gives the best performance when integrated with incremental data processing workflows, for example using AWS DMS to write CDC files from a source database to Amazon S3.

Clean up

Some of the resources deployed in this post, including those deployed using the provided CloudFormation template, incur costs as long as they’re in use. Be sure to remove the resources and clean up your work when you’re finished in order to avoid unnecessary cost.

Go to the CloudFormation console and click the ‘delete stack’ to remove all resources.

The resources in the CloudFormation template are not production ready. If you would like to use this solution in production, enable logging for all S3 buckets and ensure the solution adheres to your organization’s encryption policies through EMR Security Best Practices.

Conclusion

In this post, we demonstrated how you can automate data reliability checks using the Great Expectations library before loading data into an Amazon Redshift table. We also showed how you can use Redshift Spectrum to create external tables. If dirty data were to make its way into the accepted table, all downstream consumers such as business intelligence reporting, advanced analytics, and ML pipelines can get affected and produce inaccurate reports and results. The trends of such data can generate wrong leads for business leaders while making business decisions. Furthermore, flagging dirty data as rejected before loading into Amazon Redshift also helps reduce the time and effort a data engineer might have to spend in order to investigate and correct the data.

We are interested to hear how you would like to apply this solution for your use case. Please share your thoughts and questions in the comments section.


About the Authors

Faizan Ahmed is a Data Architect at AWS Professional Services. He loves to build data lakes and self-service analytics platforms for his customers. He also enjoys learning new technologies and solving, automating, and simplifying customer problems with easy-to-use cloud data solutions on AWS. In his free time, Faizan enjoys traveling, sports, and reading.

Bharath Kumar Boggarapu is a Data Architect at AWS Professional Services with expertise in big data technologies. He is passionate about helping customers build performant and robust data-driven solutions and realize their data and analytics potential. His areas of interests are open-source frameworks, automation, and data architecting. In his free time, he loves to spend time with family, play tennis, and travel.