Tag Archives: AWS Glue

Interactively develop your AWS Glue streaming ETL jobs using AWS Glue Studio notebooks

Post Syndicated from Arun A K original https://aws.amazon.com/blogs/big-data/interactively-develop-your-aws-glue-streaming-etl-jobs-using-aws-glue-studio-notebooks/

Enterprise customers are modernizing their data warehouses and data lakes to provide real-time insights, because having the right insights at the right time is crucial for good business outcomes. To enable near-real-time decision-making, data pipelines need to process real-time or near-real-time data. This data is sourced from IoT devices, change data capture (CDC) services like AWS Data Migration Service (AWS DMS), and streaming services such as Amazon Kinesis, Apache Kafka, and others. These data pipelines need to be robust, able to scale, and able to process large data volumes in near-real time. AWS Glue streaming extract, transform, and load (ETL) jobs process data from data streams, including Kinesis and Apache Kafka, apply complex transformations in-flight, and load it into a target data stores for analytics and machine learning (ML).

Hundreds of customers are using AWS Glue streaming ETL for their near-real-time data processing requirements. These customers required an interactive capability to process streaming jobs. Previously, when developing and running a streaming job, you had to wait for the results to be available in the job logs or persisted into a target data warehouse or data lake to be able to view the results. With this approach, debugging and adjusting code is difficult, resulting in a longer development timeline.

Today, we are launching a new AWS Glue streaming ETL feature to interactively develop streaming ETL jobs in AWS Glue Studio notebooks and interactive sessions.

In this post, we provide a use case and step-by-step instructions to develop and debug your AWS Glue streaming ETL job using a notebook.

Solution overview

To demonstrate the streaming interactive sessions capability, we develop, test, and deploy an AWS Glue streaming ETL job to process Apache Webserver logs. The following high-level diagram represents the flow of events in our job.
BDB-2464 High Level Application Architecture
Apache Webserver logs are streamed to Amazon Kinesis Data Streams. An AWS Glue streaming ETL job consumes the data in near-real time and runs an aggregation that computes how many times a webpage has been unavailable (status code 500 and above) due to an internal error. The aggregate information is then published to a downstream Amazon DynamoDB table. As part of this post, we develop this job using AWS Glue Studio notebooks.

You can either work with the instructions provided in the notebook, which you download when instructed later in this post, or follow along with this post to author your first streaming interactive session job.

Prerequisites

To get started, click the Launch Stack button below, to run an AWS CloudFormation template on your AWS environment.

BDB-2063-launch-cloudformation-stack

The template provisions a Kinesis data stream, DynamoDB table, AWS Glue job to generate simulated log data, and the necessary AWS Identity and Access Management (IAM) role and polices. After you deploy your resources, you can review the Resources tab on the AWS CloudFormation console for detailed information.

Set up the AWS Glue streaming interactive session job

To set up your AWS Glue streaming job, complete the following steps:

  1. Download the notebook file and save it to a local directory on your computer.
  2. On the AWS Glue console, choose Jobs in the navigation pane.
  3. Choose Create job.
  4. Select Jupyter Notebook.
  5. Under Options, select Upload and edit an existing notebook.
  6. Choose Choose file and browse to the notebook file you downloaded.
  7. Choose Create.
BDB-2464 Create Job
  1. For Job name¸ enter a name for the job.
  2. For IAM Role, use the role glue-iss-role-0v8glq, which is provisioned as part of the CloudFormation template.
  3. Choose Start notebook job.
BDB-2464 Start Notebook

You can see that the notebook is loaded into the UI. There are markdown cells with instructions as well as code blocks that you can run sequentially. You can either run the instructions on the notebook or follow along with this post to continue with the job development.

BDB-2464 Explore Notebook

Run notebook cells

Let’s run the code block that has the magics. The notebook has notes on what each magic does.

  1. Run the first cell.
BDB-2464 Run First Cell

After running the cell, you can see in the output section that the defaults have been reconfigured.

BDB-2464 Configurations Set

In the context of streaming interactive sessions, an important configuration is job type, which is set to streaming. Additionally, to minimize costs, the number of workers is set to 2 (default 5), which is sufficient for our use case that deals with a low-volume simulated dataset.

Our next step is to initialize an AWS Glue streaming session.

  1. Run the next code cell.
BDB-2464 Initiate Session

After we run this cell, we can see that a session has been initialized and a session ID is created.

A Kinesis data stream and AWS Glue data generator job that feeds into this stream have already been provisioned and triggered by the CloudFormation template. With the next cell, we consume this data as an Apache Spark DataFrame.

  1. Run the next cell.
BDB-2464 Fetch From Kinesis

Because there are no print statements, the cells don’t show any output. You can proceed to run the following cells.

Explore the data stream

To help enhance the interactive experience in AWS Glue interactive sessions, GlueContext provides the method getSampleStreamingDynamicFrame. It provides a snapshot of the stream in a static DynamicFrame. It takes three arguments:

  • The Spark streaming DataFrame
  • An options map
  • A writeStreamFunction to apply a function to every sampled record

Available options are as follows:

  • windowSize – Also known as the micro-batch duration, this parameter determines how long a streaming query will wait after the previous batch was triggered.
  • pollingTimeInMs – This is the total length of time the method will run. It starts at least one micro-batch to obtain sample records from the input stream. The time unit is milliseconds, and the value should be greater than the windowSize.
  • recordPollingLimit – This is defaulted to 100, and helps you set an upper bound on the number of records that is retrieved from the stream.

Run the next code cell and explore the output.

BDB-2464 Sample Data

We see that the sample consists of 100 records (the default record limit), and we have successfully displayed the first 10 records from the sample.

Work with the data

Now that we know what our data looks like, we can write the logic to clean and format it for our analytics.

Run the code cell containing the reformat function.

Note that Python UDFs aren’t the recommended way to handle data transformations in a Spark application. We use reformat() to exemplify troubleshooting. When working with a real-world production application, we recommend using native APIs wherever possible.

BDB-2464 Run The UDF

We see that the code cell failed to run. The failure was on purpose. We deliberately created a division by zero exception in our parser.

BDB-2464 Error Running The Code

Failure and recovery

In case of a regular AWS Glue job, for any error, the whole application exits, and you have to make code changes and resubmit the application. However, in case of interactive sessions, the coding context and definitions are fully preserved and the session is still operational. There is no need to bootstrap a new cluster and rerun all the preceding transformation. This allows you to focus on quickly iterating your batch function implementation to obtain the desired outcome. You can fix the defects and run them in a matter of seconds.

To test this out, go back to the code and comment or delete the erroneous line error_line=1/0 and rerun the cell.

BDB-2464 Error Corrected

Implement business logic

Now that we have successfully tested our parsing logic on the sample stream, let’s implement the actual business logic. The logics are implemented in the processBatch method within the next code cell. In this method, we do the following:

  • Pass the streaming DataFrame in micro-batches
  • Parse the input stream
  • Filter messages with status code >=500
  • Over a 1-minute interval, get the count of failures per webpage
  • Persist the preceding metric to a DynamoDB table (glue-iss-ddbtbl-0v8glq)
  1. Run the next code cell to trigger the stream processing.
BDB-2464 Trigger DDB Write
  1. Wait a few minutes for the cell to complete.
  2. On the DynamoDB console, navigate to the Items page and select the glue-iss-ddbtbl-0v8glq table.
BDB-2464 Explore DDB

The page displays the aggregated results that have been written by our interactive session job.

Deploy the streaming job

So far, we have been developing and testing our application using the streaming interactive sessions. Now that we’re confident of the job, let’s convert this into an AWS Glue job. We have seen that the majority of code cells are doing exploratory analysis and sampling, and aren’t required to be a part of the main job.

A commented code cell that represents the whole application is provided to you. You can uncomment the cell and delete all other cells. Another option would be to not use the commented cell, but delete just the two cells from the notebook that do the sampling or debugging and print statements.

To delete a cell, choose the cell and then choose the delete icon.

BDB-2464 Delete a Cell

Now that you have the final application code ready, save and deploy the AWS Glue job by choosing Save.

BDB-2464 Save Job

A banner message appears when the job is updated.

BDB-2464 Save Job Banner

Explore the AWS Glue job

After you save the notebook, you should be able to access the job like any regular AWS Glue job on the Jobs page of the AWS Glue console.

BDB-2464 Job Page

Additionally, you can look at the Job details tab to confirm the initial configurations, such as number of workers, have taken effect after deploying the job.

BDB-2464 Job Details Page

Run the AWS Glue job

If needed, you can choose Run to run the job as an AWS Glue streaming job.

BDB-2464 Job Run

To track progress, you can access the run details on the Runs tab.

BDB-2464 Job Run Details

Clean up

To avoid incurring additional charges to your account, stop the streaming job that you started as part of the instructions. Also, on the AWS CloudFormation console, select the stack that you provisioned and delete it.

Conclusion

In this post, we demonstrated how to do the following:

  • Author a job using notebooks
  • Preview incoming data streams
  • Code and fix issues without having to publish AWS Glue jobs
  • Review the end-to-end working code, remove any debugging, and print statements or cells from the notebook
  • Publish the code as an AWS Glue job

We did all of this via a notebook interface.

With these improvements in the overall development timelines of AWS Glue jobs, it’s easier to author jobs using the streaming interactive sessions. We encourage you to use the prescribed use case, CloudFormation stack, and notebook to jumpstart your individual use cases to adopt AWS Glue streaming workloads.

The goal of this post was to give you hands-on experience working with AWS Glue streaming and interactive sessions. When onboarding a productionized workload onto your AWS environment, based on the data sensitivity and security requirements, ensure you implement and enforce tighter security controls.


About the authors

Arun A K is a Big Data Solutions Architect with AWS. He works with customers to provide architectural guidance for running analytics solutions on the cloud. In his free time, Arun loves to enjoy quality time with his family.

Linan Zheng is a Software Development Engineer at AWS Glue Streaming Team, helping building the serverless data platform. His works involve large scale optimization engine for transactional data formats and streaming interactive sessions.

Roman Gavrilov is an Engineering Manager at AWS Glue. He has over a decade of experience building scalable Big Data and Event-Driven solutions. His team works on Glue Streaming ETL to allow near real time data preparation and enrichment for machine learning and analytics.

Shiv Narayanan is a Senior Technical Product Manager on the AWS Glue team. He works with AWS customers across the globe to strategize, build, develop, and deploy modern data platforms.

Set up and monitor AWS Glue crawlers using the enhanced AWS Glue UI and crawler history

Post Syndicated from Leonardo Gomez original https://aws.amazon.com/blogs/big-data/set-up-and-monitor-aws-glue-crawlers-using-the-enhanced-aws-glue-ui-and-crawler-history/

A data lake is a centralized, curated, and secured repository that stores all your data, both in its original form and prepared for analysis. Setting up and managing data lakes today involves a lot of manual, complicated, and time-consuming tasks. AWS Glue and AWS Lake Formation make it easy to build, secure, and manage data lakes. As data from existing data stores is moved in the data lake, there is a need to catalog the data to prepare it for analytics from services such as Amazon Athena.

AWS Glue crawlers are a popular way to populate the AWS Glue Catalog. AWS Glue crawlers are a key component that allow you to connect to data sources or targets, use different classifiers to determine the logical schema for the data, and create metadata in the Data Catalog. You can run crawlers on a schedule, on demand, or triggered based on an Amazon Simple Storage Service (Amazon S3) event to ensure that the Data Catalog is up to date. Using S3 event notifications can reduce the cost and time a crawler needs to update large and frequently changing tables.

The AWS Glue crawlers UI has been redesigned to offer a better user experience, and new functionalities have been added. This new UI provides easier setup of crawlers across multiple sources, including Amazon S3, Amazon DynamoDB, Amazon Redshift, Amazon Aurora, Amazon DocumentDB (with MongoDB compatibility), Delta Lake, MariaDB, Microsoft SQL Server, MySQL, Oracle, PostgreSQL, and MongoDB. A new AWS Glue crawler history feature has also been launched, which provides a convenient way to view crawler runs, their schedules, data sources, and tags. For each crawl, the crawler history offers a summary of data modifications such as changes in the database schema or Amazon S3 partition changes. Crawler history also provides DPU hours that can reduce the time to analyze and debug crawler operations and costs.

This post shows how to create an AWS Glue crawler that supports S3 event notification using the new UI. We also show how to navigate through the new crawler history section and get valuable insights.

Overview of solution

To demonstrate how to create an AWS Glue crawler using the new UI, we use the Toronto parking tickets dataset, specifically the data about parking tickets issued in the city of Toronto between 2017–2018. The goal is to create a crawler based on S3 events, run it, and explore the information showed in the UI about the run of this crawler.

As mentioned before, instead of crawling all the subfolders on Amazon S3, we use an S3 event-based approach. This helps improve the crawl time by using S3 events to identify the changes between two crawls by listing all the files from the subfolder that triggered the event instead of listing the full Amazon S3 target. For this post, we create an S3 event, Amazon Simple Storage Service (Amazon SNS) topic, and Amazon Simple Queue Service (Amazon SQS ) queue.

The following diagram illustrates our solution architecture.

Prerequisites

For this walkthrough, you should have the following prerequisites:

If the AWS account you use to follow this post uses Lake Formation to manage permissions on the AWS Glue Data Catalog, make sure that you log in as a user with access to create databases and tables. For more information, refer to Implicit Lake Formation permissions.

Launch your CloudFormation stack

To create your resources for this use case, complete the following steps:

  1. Launch your CloudFormation stack in us-east-1:
    BDB-2063-launch-cloudformation-stack
  2. Under Parameters, enter a name for your S3 bucket (include your account number).
  3. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Create stack.
  5. Wait until the creation of the stack is complete, as shown on the AWS CloudFormation console.
  6. On the stack’s Outputs tab, take note of the SQS queue ARN—we use it during the crawler creation process.

Launching this stack creates AWS resources. You need the following resources from the Outputs tab for the next steps:

  • GlueCrawlerRole – The IAM role to run AWS Glue jobs
  • BucketName – The name of the S3 bucket to store solution-related files
  • GlueSNSTopic – The SNS topic, which we use as the target for the S3 event
  • SQSArn – The SQS queue ARN; this queue is going to be consumed by the AWS Glue crawler

Create an AWS Glue crawler

Let’s first create the dataset that is going to be used as the source of the AWS Glue crawler:

  1. Open AWS CloudShell.
  2. Run the following command:
    aws s3 cp s3://aws-bigdata-blog/artifacts/gluenewcrawlerui/sourcedata/year=2017/Parking_Tags_Data_2017_2.csv s3://glue-crawler-blog-<YOUR ACCOUNT NUMBER>/torontotickets/year=2017/Parking_Tags_Data_2017_2.csv


    This action triggers an S3 event that sends a message to the SNS topic that you created using the CloudFormation template. This message is consumed by an SQS queue that will be input for the AWS Glue crawler.

    Now, let’s create the AWS Glue crawler.

  3. On the AWS Glue console, choose Crawlers in the navigation pane.
  4. Choose Create crawler.
  5. For Name, enter a name (for example, BlogPostCrawler).
  6. Choose Next.
  7. For Is your data already mapped to Glue tables, select Not yet.
  8. In the Data sources section, choose Add data source.

    For this post, you use an S3 dataset as a source.
  9. For Data source, choose S3.
  10. For Location of S3 data, select In this account.
  11. For S3 path, enter the path to the S3 bucket you created with the CloudFormation template (s3://glue-crawler-blog-YOUR ACCOUNT NUMBER/torontotickets/).
  12. For Subsequent crawler runs, select Crawl based on events.
  13. Enter the SQS queue ARN you created earlier.
  14. Choose Add a S3 data source.
  15. Choose Next.
  16. For Existing IAM role¸ choose the role you created (GlueCrawlerBlogRole).
  17. Choose Next.

    Now let’s create an AWS Glue database.
  18. Under Target database, choose Add database.
  19. For Name, enter blogdb.
  20. For Location, choose the S3 bucket created by the CloudFormation template.
  21. Choose Create database.
  22. On the Set output and scheduling page, for Target database, choose the database you just created (blogdb).
  23. For Table name prefix, enter blog.
  24. For Maximum table threshold, you can optionally set a limit for the number of tables that this crawler can scan. For this post, we leave this option blank.
  25. For Frequency, choose On demand.
  26. Choose Next.
  27. Review the configuration and choose Create crawler.

Run the AWS Glue crawler

To run the crawler, navigate to the crawler on the AWS Glue console.

Choose Run crawler.

On the Crawler runs tab, you can see the current run of the crawler.

Explore the crawler run history data

When the crawler is complete, you can see the following details:

  • Duration – The exact duration time of the crawler run
  • DPU hours – The number of DPU hours spent during the crawler run; this is very useful to calculate costs
  • Table changes – The changes applied to the table, like new columns or partitions

Choose Table changes to see the crawler run summary.

You can see the table blogtorontotickets was created, and also a 2017 partition.

Let’s add more data to the S3 bucket to see how the crawler processes this change.

  1. Open CloudShell.
  2. Run the following command:
    aws s3 cp s3://aws-bigdata-blog/artifacts/gluenewcrawlerui/sourcedata/year=2018/Parking_Tags_Data_2018_1.csv s3://glue-crawler-blog-<YOUR ACCOUNT NUMBER>/torontotickets/year=2018/Parking_Tags_Data_2018_1.csv

  3. Choose Run crawler to run the crawler one more time.

You can see the second run of the crawler listed.

Note that the DPU hours were reduced by more than half; this is because only one partition was scanned and added. Having an event-based crawler helps reduce runtime and cost.

You can choose the Table changes information of the second run to see more details.

Note under Partitions added, the 2018 partition was created.

Additional notes

Keep in mind the following considerations:

  • Crawler history is supported for crawls that have occurred since the launch date of the crawler history feature, and only retains up to 12 months of crawls. Older crawls will not be returned.
  • To set up a crawler using AWS CloudFormation, you can use following template.
  • You can get all the crawls of a specified crawler by using list-crawls APIs.
  • You can update existing crawlers with a single Amazon S3 target to use this new feature. You can do this either via the AWS Glue console or by calling the update_crawler API.

Clean up

To avoid incurring future charges, and to clean up unused roles and policies, delete the resources you created: the CloudFormation stack, S3 bucket, AWS Glue crawler, AWS Glue database, and AWS Glue table.

Conclusion

You can use AWS Glue crawlers to discover datasets, extract schema information, and populate the AWS Glue Data Catalog. AWS Glue crawlers now provide an easier-to-use UI workflow to set up crawlers and also provide metrics associated with past crawlers run to simplify monitoring and auditing. In this post, we provided a CloudFormation template to set up AWS Glue crawlers to use S3 event notifications, which reduces the time and cost needed to incrementally process table data updates in the AWS Glue Data Catalog. We also showed you how to monitor and understand the cost of crawlers.

Special thanks to everyone who contributed to the crawler history launch: Theo Xu, Jessica Cheng and Joseph Barlan.

Happy crawling!


About the authors

Leonardo Gómez is a Senior Analytics Specialist Solutions Architect at AWS. Based in Toronto, Canada, He has over a decade of experience in data management, helping customers around the globe address their business and technical needs. Connect with him on LinkedIn.

Sandeep Adwankar is a Senior Technical Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Set up federated access to Amazon Athena for Microsoft AD FS users using AWS Lake Formation and a JDBC client

Post Syndicated from Mostafa Safipour original https://aws.amazon.com/blogs/big-data/set-up-federated-access-to-amazon-athena-for-microsoft-ad-fs-users-using-aws-lake-formation-and-a-jdbc-client/

Tens of thousands of AWS customers choose Amazon Simple Storage Service (Amazon S3) as their data lake to run big data analytics, interactive queries, high-performance computing, and artificial intelligence (AI) and machine learning (ML) applications to gain business insights from their data. On top of these data lakes, you can use AWS Lake Formation to ingest, clean, catalog, transform, and help secure your data and make it available for analysis and ML. Once you have setup your data lake, you can use Amazon Athena which is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL.

With Lake Formation, you can configure and manage fine-grained access control to new or existing databases, tables, and columns defined in the AWS Glue Data Catalog for data stored in Amazon S3. After you set access permissions using Lake Formation, you can use analytics services such as Amazon Athena, Amazon Redshift, and Amazon EMR without needing to configure policies for each service.

Many of our customers use Microsoft Active Directory Federation Services (AD FS) as their identity provider (IdP) while using cloud-based services. In this post, we provide a step-by-step walkthrough of configuring AD FS as the IdP for SAML-based authentication with Athena to query data stored in Amazon S3, with access permissions defined using Lake Formation. This enables end-users to log in to their SQL client using Active Directory credentials and access data with fine-grained access permissions.

Solution overview

To build the solution, we start by establishing trust between AD FS and your AWS account. With this trust in place, AD users can federate into AWS using their AD credentials and assume permissions of an AWS Identity and Access Management (IAM) role to access AWS resources such as the Athena API.

To create this trust, you add AD FS as a SAML provider into your AWS account and create an IAM role that federated users can assume. On the AD FS side, you add AWS as a relying party and write SAML claim rules to send the right user attributes to AWS (specifically Lake Formation) for authorization purposes.

The steps in this post are structured into the following sections:

  1. Set up an IAM SAML provider and role.
  2. Configure AD FS.
  3. Create Active Directory users and groups.
  4. Create a database and tables in the data lake.
  5. Set up the Lake Formation permission model.
  6. Set up a SQL client with JDBC connection.
  7. Verify access permissions.

The following diagram provides an overview of the solution architecture.

The flow for the federated authentication process is as follows:

  1. The SQL client which has been configured with Active Directory credentials sends an authentication request to AD FS.
  2. AD FS authenticates the user using Active Directory credentials, and returns a SAML assertion.
  3. The client makes a call to Lake Formation, which initiates an internal call with AWS Security Token Service (AWS STS) to assume a role with SAML for the client.
  4. Lake Formation returns temporary AWS credentials with permissions of the defined IAM role to the client.
  5. The client uses the temporary AWS credentials to call the Athena API StartQueryExecution.
  6. Athena retrieves the table and associated metadata from the AWS Glue Data Catalog.
  7. On behalf of the user, Athena requests access to the data from Lake Formation (GetDataAccess). Lake Formation assumes the IAM role associated with the data lake location and returns temporary credentials.
  8. Athena uses the temporary credentials to retrieve data objects from Amazon S3.
  9. Athena returns the results to the client based on the defined access permissions.

For our use case, we use two sample tables:

  • LINEORDER – A fact table containing orders
  • CUSTOMER – A dimension table containing customer information including Personally Identifiable Information (PII) columns (c_name, c_phone, c_address)

We also have data consumer users who are members of the following teams:

  • CustomerOps – Can see both orders and customer information, including PII attributes of the customer
  • Finance – Can see orders for analytics and aggregation purposes but only non-PII attributes of the customer

To demonstrate this use case, we create two users called CustomerOpsUser and FinanceUser and three AD groups for different access patterns: data-customer (customer information access excluding PII attributes), data-customer-pii (full customer information access including PII attributes), and data-order (order information access). By adding the users to these three groups, we can grant the right level of access to different tables and columns.

Prerequisites

To follow along with this walkthrough, you must meet the following prerequisites:

Set up an IAM SAML provider and role

To set up your SAML provider, complete the following steps:

  1. In the IAM console, choose Identity providers in the navigation pane.
  2. Choose Add provider.
  3. For Provider Type, choose SAML.
  4. For Provider Name, enter adfs-saml-provider.
  5. For Metadata Document, download your AD FS server’s federation XML file by entering the following address in a browser with access to the AD FS server:
    https://<adfs-server-name>/FederationMetadata/2007-06/FederationMetadata.xml

  6. Upload the file to AWS by choosing Choose file.
  7. Choose Add provider to finish.

Now you’re ready to create a new IAM role.

  1. In the navigation pane, choose Roles.
  2. Choose Create role.
  3. For the type of trusted entity, choose SAML 2.0 federation.
  4. For SAML provider, choose the provider you created (adfs-saml-provider).
  5. Choose Allow programmatic and AWS Management Console access.
  6. The Attribute and Value fields should automatically populate with SAML:aud and https://signin.aws.amazon.com/saml.
  7. Choose Next:Permissions.
  8. Add the necessary IAM permissions to this role. For this post, attach AthenaFullAccess.

If the Amazon S3 location for your Athena query results doesn’t start with aws-athena-query-results, add another policy to allow users write query results into your Amazon S3 location. For more information, see Specifying a Query Result Location Using the Athena Console and Writing IAM Policies: How to Grant Access to an Amazon S3 Bucket.

  1. Leave the defaults in the next steps and for Role name, enter adfs-data-access.
  2. Choose Create role.
  3. Take note of the SAML provider and IAM role names to use in later steps when creating the trust between the AWS account and AD FS.

Configure AD FS

SAML-based federation has two participant parties: the IdP (Active Directory) and the relying party (AWS), which is the service or application that wants to use authentication from the IdP.

To configure AD FS, you first add a relying party trust, then you configure SAML claim rules for the relying party. Claim rules are the way that AD FS forms a SAML assertion sent to a relying party. The SAML assertion states that the information about the AD user is true, and that it has authenticated the user.

Add a relying party trust

To create your relying party in AD FS, complete the following steps:

  1. Log in to the AD FS server.
  2. On the Start menu, open ServerManger.
  3. On the Tools menu, choose the AD FS Management console.
  4. Under Trust Relationships in the navigation pane, choose Relying Party Trusts.
  5. Choose Add Relying Party Trust.
  6. Choose Start.
  7. Select Import data about the relying party published online or on a local network and enter the URL https://signin.aws.amazon.com/static/saml-metadata.xml.

The metadata XML file is a standard SAML metadata document that describes AWS as a relying party.

  1. Choose Next.
  2. For Display name, enter a name for your relying party.
  3. Choose Next.
  4. Select I do not want to configure multi-factor authentication.

For increased security, we recommend that you configure multi-factor authentication to help protect your AWS resources. We don’t enable multi-factor authentication for this post because we’re using a sample dataset.

  1. Choose Next.
  2. Select Permit all users to access this relying party and choose Next.

This allows all users in Active Directory to use AD FS with AWS as a relying party. You should consider your security requirements and adjust this configuration accordingly.

  1. Finish creating your relying party.

Configure SAML claim rules for the relying party

You create two sets of claim rules in this post. The first set (rules 1–4) contains AD FS claim rules that are required to assume an IAM role based on AD group membership. These are the rules that you also create if you want to establish federated access to the AWS Management Console. The second set (rules 5–6) are claim rules that are required for Lake Formation fine-grained access control.

To create AD FS claim rules, complete the following steps:

  1. On the AD FS Management console, find the relying party you created in the previous step.
  2. Right-click the relying party and choose Edit Claim Rules.
  3. Choose Add Rule and create your six new rules.
  4. Create claim rule 1, called NameID:
    1. For Rule template, use Transform an Incoming Claim.
    2. For Incoming claim type, choose Windows account name.
    3. For Outgoing claim type, choose Name ID.
    4. For Outgoing name ID format, choose Persistent Identifier.
    5. Select Pass through all claim values.
  5. Create claim rule 2, called RoleSessionName:
    1. For Rule template, use Send LDAP Attribute as Claims.
    2. For Attribute store, choose Active Directory.
    3. For Mapping of LDAP attributes to outgoing claim types, add the attribute E-Mail-Addresses and outgoing claim type https://aws.amazon.com/SAML/Attributes/RoleSessionName.
  6. Create claim rule 3, called Get AD Groups:
    1. For Rule template, use Send Claims Using a Custom Rule.
    2. For Custom rule, enter the following code:
      c:[Type == "http://schemas.microsoft.com/ws/2008/06/identity/claims/windowsaccountname", Issuer == "AD AUTHORITY"]
      => add(store = "Active Directory", types = ("http://temp/variable"), query = ";tokenGroups;{0}", param = c.Value);

  7. Create claim rule 4, called Roles:
    1. For Rule template, use Send Claims Using a Custom Rule.
    2. For Custom rule, enter the following code (enter your account number and name of the SAML provider you created earlier):
      c:[Type == "http://temp/variable", Value =~ "(?i)^aws-"]
      => issue(Type = "https://aws.amazon.com/SAML/Attributes/Role", Value = RegExReplace(c.Value, "aws-", "arn:aws:iam::<AWS ACCOUNT NUMBER>:saml-provider/<adfs-saml-provider>,arn:aws:iam::<AWS ACCOUNT NUMBER>:role/"));

Claim rules 5 and 6 allow Lake Formation to make authorization decisions based on user name or the AD group membership of the user.

  1. Create claim rule 5, called LF-UserName, which passes the user name and SAML assertion to Lake Formation:
    1. For Rule template, use Send LDAP Attributes as Claims.
    2. For Attribute store, choose Active Directory.
    3. For Mapping of LDAP attributes to outgoing claim types, add the attribute User-Principal-Name and outgoing claim type https://lakeformation.amazon.com/SAML/Attributes/Username.
  2. Create claim rule 6, called LF-Groups, which passes data and analytics-related AD groups that the user is a member of, along with the SAML assertion to Lake Formation:
    1. For Rule template, use Send Claims Using a Custom Rule.
    2. For Custom rule, enter the following code:
      c:[Type == "http://temp/variable", Value =~ "(?i)^data-"]
      => issue(Type = "https://lakeformation.amazon.com/SAML/Attributes/Groups", Value = c.Value);

The preceding rule snippet filters AD group names starting with data-. This is an arbitrary naming convention; you can adopt your preferred naming convention for AD groups that are related to data lake access.

Create Active Directory users and groups

In this section, we create two AD users and required AD groups to demonstrate varying levels of access to the data.

Create users

You create two AD users: FinanceUser and CustomerOpsUser. Each user corresponds to an individual who is a member of the Finance or Customer business units. The following table summarizes the details of each user.

 

FinanceUser CustomerOpsUser
First Name FinanceUser CustomerOpsUser
User logon name [email protected] [email protected]
Email [email protected] [email protected]

To create your users, complete the following steps:

  1. On the Server Manager Dashboard, on the Tools menu, choose Active Directory Users and Computers.
  2. In the navigation pane, choose Users.
  3. On the tool bar, choose the Create user icon.
  4. For First name, enter FinanceUser.
  5. For Full name, enter FinanceUser.
  6. For User logon name, enter [email protected].
  7. Choose Next.
  8. Enter a password and deselect User must change password at next logon.

We choose this option for simplicity, but in real-world scenarios, newly created users must change their password for security reasons.

  1. Choose Next.
  2. In Active Directory Users and Computers, choose the user name.
  3. For Email, enter [email protected].

Adding an email is mandatory because it’s used as the RoleSessionName value in the SAML assertion.

  1. Choose OK.
  2. Repeat these steps to create CustomerOpsUser.

Create AD groups to represent data access patterns

Create the following AD groups to represent three different access patterns and also the ability to assume an IAM role:

  • data-customer – Members have access to non-PII columns of the customer table
  • data-customer-pii – Members have access to all columns of the customer table, including PII columns
  • data-order – Members have access to the lineorder table
  • aws-adfs-data-access – Members assume the adfs-data-access IAM role when logging in to AWS

To create the groups, complete the following steps:

  1. On the Server Manager Dashboard, on the Tools menu, choose Active Directory Users and Computers.
  2. On the tool bar, choose the Create new group icon.
  3. For Group name¸ enter data-customer.
  4. For Group scope, select Global.
  5. For Group type¸ select Security.
  6. Choose OK.
  7. Repeat these steps to create the remaining groups.

Add users to appropriate groups

Now you add your newly created users to their appropriate groups, as detailed in the following table.

User Group Membership Description
CustomerOpsUser data-customer-pii
data-order
aws-adfs-data-access
Sees all customer information including PII and their orders
FinanceUser data-customer
data-order
aws-adfs-data-access
Sees only non-PII customer data and orders

Complete the following steps:

  1. On the Server Manager Dashboard, on the Tools menu, choose Active Directory Users and Computers.
  2. Choose the user FinanceUser.
  3. On the Member Of tab, choose Add.
  4. Add the appropriate groups.
  5. Repeat these steps for CustomerOpsUser.

Create a database and tables in the data lake

In this step, you copy data files to an S3 bucket in your AWS account by running the following AWS Command Line Interface (AWS CLI) commands. For more information on how to set up the AWS CLI, refer to Configuration Basics.

These commands copy the files that contain data for customer and lineorder tables. Replace <BUCKET NAME> with the name of an S3 bucket in your AWS account.

aws s3 sync s3://awssampledb/load/ s3://<BUCKET NAME>/customer/ \
--exclude "*" --include "customer-fw.tbl-00*" --exclude "*.bak"

aws s3api copy-object --copy-source awssampledb/load/lo/lineorder-single.tbl000.gz \
--key lineorder/lineorder-single.tbl000.gz --bucket <BUCKET NAME> \
--tagging-directive REPLACE

For this post, we use the default settings for storing data and logging access requests within Amazon S3. You can enhance the security of your sensitive data with the following methods:

  • Implement encryption at rest using AWS Key Management Service (AWS KMS) and customer managed encryption keys
  • Use AWS CloudTrail and audit logging
  • Restrict access to AWS resources based on the least privilege principle

Additionally, Lake Formation is integrated with CloudTrail, a service that provides a record of actions taken by a user, role, or AWS service in Lake Formation. CloudTrail captures all Lake Formation API calls as events and is enabled by default when you create a new AWS account. When activity occurs in Lake Formation, that activity is recorded as a CloudTrail event along with other AWS service events in event history. For audit and access monitoring purposes, all federated user logins are logged via CloudTrail under the AssumeRoleWithSAML event name. You can also view specific user activity based on their user name in CloudTrail.

To create a database and tables in the Data Catalog, open the query editor on the Athena console and enter the following DDL statements. Replace <BUCKET NAME> with the name of the S3 bucket in your account.

CREATE DATABASE salesdata;
CREATE EXTERNAL TABLE salesdata.customer
(
    c_custkey VARCHAR(10),
    c_name VARCHAR(25),
    c_address VARCHAR(25),
    c_city VARCHAR(10),
    c_nation VARCHAR(15),
    c_region VARCHAR(12),
    c_phone VARCHAR(15),
    c_mktsegment VARCHAR(10)
)
-- The data files contain fixed width columns hence using RegExSerDe
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
    "input.regex" = "(.{10})(.{25})(.{25})(.{10})(.{15})(.{12})(.{15})(.{10})"
)
LOCATION 's3://<BUCKET NAME>/customer/';

CREATE EXTERNAL TABLE salesdata.lineorder(
  `lo_orderkey` int, 
  `lo_linenumber` int, 
  `lo_custkey` int, 
  `lo_partkey` int, 
  `lo_suppkey` int, 
  `lo_orderdate` int, 
  `lo_orderpriority` varchar(15), 
  `lo_shippriority` varchar(1), 
  `lo_quantity` int, 
  `lo_extendedprice` int, 
  `lo_ordertotalprice` int, 
  `lo_discount` int, 
  `lo_revenue` int, 
  `lo_supplycost` int, 
  `lo_tax` int, 
  `lo_commitdate` int, 
  `lo_shipmode` varchar(10))
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY '|' 
LOCATION 's3://<BUCKET NAME>/lineorder/';

Verify that tables are created and you can see the data:

SELECT * FROM "salesdata"."customer" limit 10;
SELECT * FROM "salesdata"."lineorder" limit 10;

Set up the Lake Formation permission model

Lake Formation uses a combination of Lake Formation permissions and IAM permissions to achieve fine-grained access control. The recommended approach includes the following:

  • Coarse-grained IAM permissions – These apply to the IAM role that users assume when running queries in Athena. IAM permissions control access to Lake Formation, AWS Glue, and Athena APIs.
  • Fine-grained Lake Formation grants – These control access to Data Catalog resources, Amazon S3 locations, and the underlying data at those locations. With these grants, you can give access to specific tables or only columns that contain specific data values.

Configure IAM role permissions

Earlier in the walkthrough, you created the IAM role adfs-data-access and attached the AWS managed IAM policy AthenaFullAccess to it. This policy has all the permissions required for the purposes of this post.

For more information, see the Data Analyst Permissions section in Lake Formation Personas and IAM Permissions Reference.

Register an S3 bucket as a data lake location

The mechanism to govern access to an Amazon S3 location using Lake Formation is to register a data lake location. Complete the following steps:

  1. On the Lake Formation console, choose Data lake locations.
  2. Choose Register location.
  3. For Amazon S3 path, choose Browse and locate your bucket.
  4. For IAM role, choose AWSServiceRoleForLakeFormationDataAccess.

In this step, you specify an IAM service-linked role, which Lake Formation assumes when it grants temporary credentials to integrated AWS services that access the data in this location. This role and its permissions are managed by Lake Formation and can’t be changed by IAM principals.

  1. Choose Register location.

Configure data permissions

Now that you have registered the Amazon S3 path, you can give AD groups appropriate permissions to access tables and columns in the salesdata database. The following table summarizes the new permissions.

Database and Table AD Group Name Table Permissions Data Permissions
salesdata.customer data-customer Select c_city, c_custkey, c_mktsegment, c_nation, and c_region
salesdata.customer data-customer-pii Select All data access
salesdata.lineorder data-order Select All data access
  1. On the Lake Formation console, choose Tables in the navigation pane.
  2. Filter tables by the salesdata database.
  3. Select the customer table and on the Actions menu, choose View permissions.

You should see following existing permissions. These entries allow the current data lake administrator to access the table and all its columns.

  1. To add new permissions, select the table and on the Actions menu, choose Grant.
  2. Select SAML user and groups.
  3. For SAML and Amazon QuickSight users and groups, enter arn:aws:iam::<AWS ACCOUNT NUMBER>:saml-provider/adfs-saml-provider:group/data-customer.

To get this value, get the ARN of the SAML provider from the IAM console and append :group/data-customer to the end of it.

  1. Select Named data catalog resources.
  2. For Databases, choose the salesdata database.
  3. For Tables, choose the customer table.
  4. For Table permissions, select Select.
  5. For Data permissions, select Column-based access.
  6. For Select columns, add the columns c_city, c_custkey, c_mktsegment, c_nation, and c_region.
  7. Choose Grant.

You have now allowed members of the AD group data-customer to have access to columns of the customer table that don’t include PII.

  1. Repeat these steps for the customer table and data-customer-pii group with all data access.
  2. Repeat these steps for the lineorder table and data-order group with all data access.

Set up a SQL client with JDBC connection and verify permissions

In this post, we use SQL Workbench to access Athena through AD authentication and verify the Lake Formation permissions you created in the previous section.

Prepare the SQL client

To set up the SQL client, complete the following steps:

  1. Download and extract the Lake Formation-compatible Athena JDBC driver with AWS SDK (2.0.14 or later version) from Using Athena with the JDBC Driver.
  2. Go to the SQL Workbench/J website and download the latest stable package.
  3. Install SQL Workbench/J on your client computer.
  4. In SQL Workbench, on the File menu, choose Manage Drivers.
  5. Choose the New driver icon.
  6. For Name, enter Athena JDBC Driver.
  7. For Library, browse to and choose the Simba Athena JDBC .jar file that you just downloaded.
  8. Choose OK.

You’re now ready to create connections in SQL Workbench for your users.

Create connections in SQL Workbench

To create your connections, complete the following steps:

  1. On the File menu, choose Connect.
  2. Enter the name Athena-FinanceUser.
  3. For Driver, choose the Simba Athena JDBC driver.
  4. For URL, enter the following code (replace the placeholders with actual values from your setup and remove the line breaks to make a single line connection string):
jdbc:awsathena://AwsRegion=<AWS Region Name e.g. ap-southeast-2>;
S3OutputLocation=s3://<Athena Query Result Bucket Name>/jdbc;
plugin_name=com.simba.athena.iamsupport.plugin.AdfsCredentialsProvider;
idp_host=<adfs-server-name e.g. adfs.company.com>;
idp_port=443;
preferred_role=<ARN of the role created in step1 e.g. arn>;
user=financeuser@<Domain Name e.g. company.com>;
password=<password>;
SSL_Insecure=true;
LakeFormationEnabled=true;

For this post, we used a self-signed certificate with AD FS. This certificate is not trusted by the client, therefore authentication doesn’t succeed. This is why the SSL_Insecure attribute is set to true to allow authentication despite the self-signed certificate. In real-world setups, you would use valid trusted certificates and can remove the SSL_Insecure attribute.

  1. Create a new SQL workbench profile named Athena-CustomerOpsUser and repeat the earlier steps with CustomerOpsUser in the connection URL string.
  2. To test the connections, choose Test for each user, and confirm that the connection succeeds.

Verify access permissions

Now we can verify permissions for FinanceUser. In the SQL Workbench Statement window, run the following SQL SELECT statement:

SELECT * FROM "salesdata"."lineorder" limit 10;
SELECT * FROM "salesdata"."customer" limit 10;

Verify that only non-PII columns are returned from the customer table.

As you see in the preceding screenshots, FinanceUser only has access to non-PII columns of the customer table and full access to (all columns) of the lineorder table. This allows FinanceUser, for example, to run aggregate and summary queries based on market segment or location of customers without having access to their personal information.

Run a similar query for CustomerOpsUser. You should be able to see all columns, including columns containing PII, in the customer table.

Conclusion

This post demonstrated how to configure your data lake permissions using Lake Formation for AD users and groups. We configured AD FS 3.0 on your Active Directory and used it as an IdP to federate into AWS using SAML. This post also showed how you can integrate your Athena JDBC driver to AD FS and use your AD credentials directly to connect to Athena.

Integrating your Active Directory with the Athena JDBC driver gives you the flexibility to access Athena from business intelligence tools you’re already familiar with to analyze the data in your Amazon S3 data lake. This enables you to have a consistent central permission model that is managed through AD users and their group memberships.


About the Authors

Mostafa Safipour is a Solutions Architect at AWS based out of Sydney. Over the past decade he has helped many large organizations in the ANZ region build their data, digital, and enterprise workloads on AWS.

Praveen Kumar is a Specialist Solution Architect at AWS with expertise in designing, building, and implementing modern data and analytics platforms using cloud-native services. His areas of interests are serverless technology, streaming applications, and modern cloud data warehouses.

Introducing AWS Glue interactive sessions for Jupyter

Post Syndicated from Zach Mitchell original https://aws.amazon.com/blogs/big-data/introducing-aws-glue-interactive-sessions-for-jupyter/

Interactive Sessions for Jupyter is a new notebook interface in the AWS Glue serverless Spark environment. Starting in seconds and automatically stopping compute when idle, interactive sessions provide an on-demand, highly-scalable, serverless Spark backend to Jupyter notebooks and Jupyter-based IDEs such as Jupyter Lab, Microsoft Visual Studio Code, JetBrains PyCharm, and more. Interactive sessions replace AWS Glue development endpoints for interactive job development with AWS Glue and offers the following benefits:

  • No clusters to provision or manage
  • No idle clusters to pay for
  • No up-front configuration required
  • No resource contention for the same development environment
  • Easy installation and usage
  • The exact same serverless Spark runtime and platform as AWS Glue extract, transform, and load (ETL) jobs

Getting started with interactive sessions for Jupyter

Installing interactive sessions is simple and only takes a few terminal commands. After you install it, you can run interactive sessions anytime within seconds of deciding to run. In the following sections, we walk you through installation on macOS and getting started in Jupyter.

To get started with interactive sessions for Jupyter on Windows, follow the instructions in Getting started with AWS Glue interactive sessions.

Prerequisites

These instructions assume you’re running Python 3.6 or later and have the AWS Command Line Interface (AWS CLI) properly running and configured. You use the AWS CLI to make API calls to AWS Glue. For more information on installing the AWS CLI, refer to Installing or updating the latest version of the AWS CLI.

Install AWS Glue interactive sessions on macOS and Linux

To install AWS Glue interactive sessions, complete the following steps:

  1. Open a terminal and run the following to install and upgrade Jupyter, Boto3, and AWS Glue interactive sessions from PyPi. If desired, you can install Jupyter Lab instead of Jupyter.
    pip3 install --user --upgrade jupyter boto3 aws-glue-sessions

  2. Run the following commands to identify the package install location and install the AWS Glue PySpark and AWS Glue Spark Jupyter kernels with Jupyter:
    SITE_PACKAGES=$(pip3 show aws-glue-sessions | grep Location | awk '{print $2}')
    jupyter kernelspec install $SITE_PACKAGES/aws_glue_interactive_sessions_kernel/glue_pyspark
    jupyter kernelspec install $SITE_PACKAGES/aws_glue_interactive_sessions_kernel/glue_spark

  3. To validate your install, run the following command:
    jupyter kernelspec list

In the output, you should see both the AWS Glue PySpark and the AWS Glue Spark kernels listed alongside the default Python3 kernel. It should look something like the following:

Available kernels:
  Python3		~/.venv/share/jupyter/kernels/python3
  glue_pyspark    /usr/local/share/jupyter/kernels/glue_pyspark
  glue_spark      /usr/local/share/jupyter/kernels/glue_spark

Choose and prepare IAM principals

Interactive sessions use two AWS Identity and Access Management (IAM) principals (user or role) to function. The first is used to call the interactive sessions APIs and is likely the same user or role that you use with the AWS CLI. The second is GlueServiceRole, the role that AWS Glue assumes to run your session. This is the same role as AWS Glue jobs; if you’re developing a job with your notebook, you should use the same role for both interactive sessions and the job you create.

Prepare the client user or role

In the case of local development, the first role is already configured if you can run the AWS CLI. If you can’t run the AWS CLI, follow these steps for setting up. If you often use the AWS CLI or Boto3 to interact with AWS Glue and have full AWS Glue permissions, you can likely skip this step.

  1. To validate this first user or role is set up, open a new terminal window and run the following code:
    aws sts get-caller-identity

    You should see a response like the following. If not, you may not have permissions to call AWS Security Token Service (AWS STS), or you don’t have the AWS CLI set up properly. If you simply get access denied calling AWS STS, you may continue if you know your user or role and its needed permissions.

    {
        "UserId": "ABCDEFGHIJKLMNOPQR",
        "Account": "123456789123",
        "Arn": "arn:aws:iam::123456789123:user/MyIAMUser"
    }
    
    {
        "UserId": "ABCDEFGHIJKLMNOPQR",
        "Account": "123456789123",
        "Arn": "arn:aws:iam::123456789123:role/myIAMRole"
    }

  2. Ensure your IAM user or role can call the AWS Glue interactive sessions APIs by attaching the AWSGlueConsoleFullAccess managed IAM policy to your role.

If your caller identity returned a user, run the following:

aws iam attach-user-policy --role-name <myIAMUser> --policy-arn arn:aws:iam::aws:policy/AWSGlueConsoleFullAccess

If your caller identity returned a role, run the following:

aws iam attach-role-policy --role-name, --policy-arn arn:aws:iam::aws:policy/AWSGlueConsoleFullAccess

Prepare the AWS Glue service role for interactive sessions

You can specify the second principal, GlueServiceRole, either in the notebook itself by using the %iam_role magic or stored alongside the AWS CLI config. If you have a role that you typically use with AWS Glue jobs, this will be that role. If you don’t have a role you use for AWS Glue jobs, refer to Setting up IAM permissions for AWS Glue to set one up.

To set this role as the default role for interactive sessions, edit the AWS CLI credentials file and add glue_role_arn to the profile you intend to use.

  1. With a text editor, open ~/.aws/credentials.
    On Windows, use C:\Users\username\.aws\credentials.
  2. Look for the profile you use for AWS Glue; if you don’t use a profile, you’re looking for [Default].
  3. Add a line in the profile for the role you intend to use like, glue_role_arn=<AWSGlueServiceRole>.
  4. I recommend adding a default Region to your profile if one is not specified already. You can do so by adding the line region=us-east-1, replacing us-east-1 with your desired Region.
    If you don’t add a Region to your profile, you’re required to specify the Region at the top of each notebook with the %region magic.When finished, your config should look something like the following:

    [Defaut]
    aws_access_key_id=ABCDEFGHIJKLMNOPQRST
    aws_secret_access_key=1234567890ABCDEFGHIJKLMNOPQRSTUVWZYX1234
    glue_role_arn=arn:aws:iam::123456789123:role/AWSGlueServiceRoleForSessions
    region=us-west-2

  5. Save the config.

Start Jupyter and an AWS Glue PySpark notebook

To start Jupyter and your notebook, complete the following steps:

  1. Run the following command in your terminal to open the Jupyter notebook in your browser:
    jupyter notebook

    Your browser should open and you’re presented with a page that looks like the following screenshot.

  2. On the New menu, choose Glue PySpark.

A new tab opens with a blank Jupyter notebook using the AWS Glue PySpark kernel.

Configure your notebook with magics

AWS Glue interactive sessions are configured with Jupyter magics. Magics are small commands prefixed with % at the start of Jupyter cells that provide shortcuts to control the environment. In AWS Glue interactive sessions, magics are used for all configuration needs, including:

  • %region – Region
  • %profile – AWS CLI profile
  • %iam_role – IAM role for the AWS Glue service role
  • %worker_type – Worker type
  • %number_of_workers – Number of workers
  • %idle_timeout – How long to allow a session to idle before stopping it
  • %additional_python_modules – Python libraries to install from pip

Magics are placed at the beginning of your first cell, before your code, to configure AWS Glue. To discover all the magics of interactive sessions, run %help in a cell and a full list is printed. With the exception of %%sql, running a cell of only magics doesn’t start a session, but sets the configuration for the session that starts next when you run your first cell of code. For this post, we use three magics to configure AWS Glue with version 2.0 and two G.2X workers. Let’s enter the following magics into our first cell and run it:

%glue_version 2.0
%number_of_workers 2
%worker_type G.2X
%idle_tiemout 60


Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Setting Glue version to: 2.0
Previous number of workers: 5
Setting new number of workers to: 2
Previous worker type: G.1X
Setting new worker type to: G.2X

When you run magics, the output lets us know the values we’re changing along with their previous settings. Explicitly setting all your configuration in magics helps ensure consistent runs of your notebook every time and is recommended for production workloads.

Run your first code cell and author your AWS Glue notebook

Next, we run our first code cell. This is when a session is provisioned for use with this notebook. When interactive sessions are properly configured within an account, the session is completely isolated to this notebook. If you open another notebook in a new tab, it gets its own session on its own isolated compute. Run your code cell as follows:

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Authenticating with profile=default
glue_role_arn defined by user: arn:aws:iam::123456789123:role/AWSGlueServiceRoleForSessions
Attempting to use existing AssumeRole session credentials.
Trying to create a Glue session for the kernel.
Worker Type: G.2X
Number of Workers: 2
Session ID: 12345678-12fa-5315-a234-567890abcdef
Applying the following default arguments:
--glue_kernel_version 0.31
--enable-glue-datacatalog true
Waiting for session 12345678-12fa-5315-a234-567890abcdef to get into ready status...
Session 12345678-12fa-5315-a234-567890abcdef has been created

When you ran the first cell containing code, Jupyter invoked interactive sessions, provisioned an AWS Glue cluster, and sent the code to AWS Glue Spark. The notebook was given a session ID, as shown in the preceding code. We can also see the properties used to provision AWS Glue, including the IAM role that AWS Glue used to create the session, the number of workers and their type, and any other options that were passed as part of the creation.

Interactive sessions automatically initialize a Spark session as spark and SparkContext as sc; having Spark ready to go saves a lot of boilerplate code. However, if you want to convert your notebook to a job, spark and sc must be initialized and declared explicitly.

Work in the notebook

Now that we have a session up, let’s do some work. In this exercise, we look at population estimates from the AWS COVID-19 dataset, clean them up, and write the results a table.

This walkthrough uses data from the COVID-19 data lake.

To make the data from the AWS COVID-19 data lake available in the Data Catalog in your AWS account, create an AWS CloudFormation stack using the following template.

If you’re signed in to your AWS account, deploy the CloudFormation stack by clicking the following Launch stack button:

BDB-2063-launch-cloudformation-stack

It fills out most of the stack creation form for you. All you need to do is choose Create stack. For instructions on creating a CloudFormation stack, see Get started.

When I’m working on a new data integration process, the first thing I often do is identify and preview the datasets I’m going to work on. If I don’t recall the exact location or table name, I typically open the AWS Glue console and search or browse for the table then return to my notebook to preview it. With interactive sessions, there is a quicker way to browse the Data Catalog. We can use the %%sql magic to show databases and tables without leaving the notebook. For this example, the population table I want in is the COVID-19 dataset but I don’t recall its exact name, so I use the %%sql magic to look it up:

%%sql
show tables in `covid-19`  # Remember, dashes in names must be escaped with backticks.

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
|covid-19|alleninstitute_co...|      false|
|covid-19|alleninstitute_me...|      false|
|covid-19|aspirevc_crowd_tr...|      false|
|covid-19|aspirevc_crowd_tr...|      false|
|covid-19|cdc_moderna_vacci...|      false|
|covid-19|cdc_pfizer_vaccin...|      false|
|covid-19|       country_codes|      false|
|covid-19|  county_populations|      false|
|covid-19|covid_knowledge_g...|      false|
|covid-19|covid_knowledge_g...|      false|
|covid-19|covid_knowledge_g...|      false|
|covid-19|covid_knowledge_g...|      false|
|covid-19|covid_knowledge_g...|      false|
|covid-19|covid_knowledge_g...|      false|
|covid-19|covid_testing_sta...|      false|
|covid-19|covid_testing_us_...|      false|
|covid-19|covid_testing_us_...|      false|
|covid-19|      covidcast_data|      false|
|covid-19|  covidcast_metadata|      false|
|covid-19|enigma_aggregatio...|      false|
+--------+--------------------+-----------+
only showing top 20 rows

Looking through the returned list, we see a table named county_populations. Let’s select from this table, sorting for the largest counties by population:

%%sql
select * from `covid-19`.county_populations sort by `population estimate 2018` desc limit 10

+--------------+-----+---------------+-----------+------------------------+
|            id|  id2|         county|      state|population estimate 2018|
+--------------+-----+---------------+-----------+------------------------+
|            Id|  Id2|         County|      State|    Population Estima...|
|0500000US01085| 1085|        Lowndes|    Alabama|                    9974|
|0500000US06057| 6057|         Nevada| California|                   99696|
|0500000US29189|29189|      St. Louis|   Missouri|                  996945|
|0500000US22021|22021|Caldwell Parish|  Louisiana|                    9960|
|0500000US06019| 6019|         Fresno| California|                  994400|
|0500000US28143|28143|         Tunica|Mississippi|                    9944|
|0500000US05051| 5051|        Garland|   Arkansas|                   99154|
|0500000US29079|29079|         Grundy|   Missouri|                    9914|
|0500000US27063|27063|        Jackson|  Minnesota|                    9911|
+--------------+-----+---------------+-----------+------------------------+

Our query returned data but in an unexpected order. It looks like population estimate 2018 sorted lexicographically if the values were strings. Let’s use an AWS Glue DynamicFrame to get the schema of the table and verify the issue:

# Create a DynamicFrame of county_populations and print it's schema
dyf = glueContext.create_dynamic_frame.from_catalog(
    database="covid-19", table_name="county_populations"
)
dyf.printSchema()

root
|-- id: string
|-- id2: string
|-- county: string
|-- state: string
|-- population estimate 2018: string

The schema shows population estimate 2018 to be a string, which is why our column isn’t sorting properly. We can use the apply_mapping transform in our next cell to correct the column type. In the same transform, we also clean up the column names and other column types: clarifying the distinction between id and id2, removing spaces from population estimate 2018 (conforming to Hive’s standards), and casting id2 as an integer for proper sorting. After validating the schema, we show the data with the new schema:

# Rename id2 to simple_id and convert to Int
# Remove spaces and rename population est. and convert to Long
mapped = dyf.apply_mapping(
    mappings=[
        ("id", "string", "id", "string"),
        ("id2", "string", "simple_id", "int"),
        ("county", "string", "county", "string"),
        ("state", "string", "state", "string"),
        ("population estimate 2018", "string", "population_est_2018", "long"),
    ]
)
mapped.printSchema()
 
root
|-- id: string
|-- simple_id: int
|-- county: string
|-- state: string
|-- population_est_2018: long


mapped_df = mapped.toDF()
mapped_df.show()

+--------------+---------+---------+-------+-------------------+
|            id|simple_id|   county|  state|population_est_2018|
+--------------+---------+---------+-------+-------------------+
|0500000US01001|     1001|  Autauga|Alabama|              55601|
|0500000US01003|     1003|  Baldwin|Alabama|             218022|
|0500000US01005|     1005|  Barbour|Alabama|              24881|
|0500000US01007|     1007|     Bibb|Alabama|              22400|
|0500000US01009|     1009|   Blount|Alabama|              57840|
|0500000US01011|     1011|  Bullock|Alabama|              10138|
|0500000US01013|     1013|   Butler|Alabama|              19680|
|0500000US01015|     1015|  Calhoun|Alabama|             114277|
|0500000US01017|     1017| Chambers|Alabama|              33615|
|0500000US01019|     1019| Cherokee|Alabama|              26032|
|0500000US01021|     1021|  Chilton|Alabama|              44153|
|0500000US01023|     1023|  Choctaw|Alabama|              12841|
|0500000US01025|     1025|   Clarke|Alabama|              23920|
|0500000US01027|     1027|     Clay|Alabama|              13275|
|0500000US01029|     1029| Cleburne|Alabama|              14987|
|0500000US01031|     1031|   Coffee|Alabama|              51909|
|0500000US01033|     1033|  Colbert|Alabama|              54762|
|0500000US01035|     1035|  Conecuh|Alabama|              12277|
|0500000US01037|     1037|    Coosa|Alabama|              10715|
|0500000US01039|     1039|Covington|Alabama|              36986|
+--------------+---------+---------+-------+-------------------+
only showing top 20 rows

With the data sorting correctly, we can write it to Amazon Simple Storage Service (Amazon S3) as a new table in the AWS Glue Data Catalog. We use the mapped DynamicFrame for this write because we didn’t modify any data past that transform:

# Create "demo" Database if none exists
spark.sql("create database if not exists demo")


# Set glueContext sink for writing new table
S3_BUCKET = "<S3_BUCKET>"
s3output = glueContext.getSink(
    path=f"s3://{S3_BUCKET}/interactive-sessions-blog/populations/",
    connection_type="s3",
    updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=[],
    compression="snappy",
    enableUpdateCatalog=True,
    transformation_ctx="s3output",
)
s3output.setCatalogInfo(catalogDatabase="demo", catalogTableName="populations")
s3output.setFormat("glueparquet")
s3output.writeFrame(mapped)


# Write out ‘mapped’ to a table in Glue Catalog
s3output = glueContext.getSink(
    path=f"s3://{S3_BUCKET}/interactive-sessions-blog/populations/",
    connection_type="s3",
    updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=[],
    compression="snappy",
    enableUpdateCatalog=True,
    transformation_ctx="s3output",
)
s3output.setCatalogInfo(catalogDatabase="demo", catalogTableName="populations")
s3output.setFormat("glueparquet")
s3output.writeFrame(mapped)

Finally, we run a query against our new table to show our table created successfully and validate our work:

%%sql
select * from demo.populations

Convert notebooks to AWS Glue jobs with nbconvert

Jupyter notebooks are saved as .ipynb files. AWS Glue doesn’t currently run .ipynb files directly, so they need to be converted to Python scripts before they can be uploaded to Amazon S3 as jobs. Use the jupyter nbconvert command from a terminal to convert the script.

  1. Open a new terminal or PowerShell tab or window.
  2. cd to the working directory where your notebook is.
    This is likely the same directory where you ran jupyter notebook at the beginning of this post.
  3. Run the following bash command to convert the notebook, providing the correct file name for your notebook:
    jupyter nbconvert --to script <Untitled-1>.ipynb

  4. Run cat <Untitled-1>.ipynb to view your new file.
  5. Upload the .py file to Amazon S3 using the following command, replacing the bucket, path, and file name as needed:
    aws s3 cp <Untitled-1>.py s3://<bucket>/<path>/<Untitled-1.py>

  6. Create your AWS Glue job with the following command.

Note that the magics aren’t automatically converted to job parameters when converting notebooks locally. You need to put in your job arguments correctly, or import your notebook to AWS Glue Studio and complete the following steps to keep your magic settings.

aws glue create-job \
    --name is_blog_demo
    --role "<GlueServiceRole>" \
    --command {"Name": "glueetl", "PythonVersion": "3", "ScriptLocation": "s3://<bucket>/<path>/<Untitled-1.py"} \
    --default-arguments {"--enable-glue-datacatalog": "true"} \
    --number-of-workers 2 \
    --worker-type G.2X

Run the job

After you have authored the notebook, converted it to a Python file, uploaded it to Amazon S3, and finally made it into an AWS Glue job, the only thing left to do is run it. Do so with the following terminal command:

aws glue start-job-run --job-name is_blog --region us-east-1

Conclusion

AWS Glue interactive sessions offer a new way to interact with the AWS Glue serverless Spark environment. Set it up in minutes, start sessions in seconds, and only pay for what you use. You can use interactive sessions for AWS Glue job development, ad hoc data integration and exploration, or for large queries and audits. AWS Glue interactive sessions are generally available in all Regions that support AWS Glue.

To learn more and get started using AWS Glue Interactive Sessions visit our developer guide and begin coding in seconds.


About the author

Zach Mitchell is a Sr. Big Data Architect. He works within the product team to enhance understanding between product engineers and their customers while guiding customers through their journey to develop data lakes and other data solutions on AWS analytics services.

AWS Glue Python shell now supports Python 3.9 with a flexible pre-loaded environment and support to install additional libraries

Post Syndicated from Alunnata Mulyadi original https://aws.amazon.com/blogs/big-data/aws-glue-python-shell-now-supports-python-3-9-with-a-flexible-pre-loaded-environment-and-support-to-install-additional-libraries/

AWS Glue is the central service of an AWS modern data architecture. It is a serverless data integration service that allows you to discover, prepare, and combine data for analytics and machine learning. AWS Glue offers you a comprehensive range of tools to perform ETL (extract, transform, and load) at the right scale. AWS Glue Python shell jobs are designed for running small-to-medium size ETL, and triggering SQLs (including long-running queries) on Amazon Redshift, Amazon Athena, Amazon EMR, and more.

Today, we are excited to announce a new release of AWS Glue Python shell that supports Python 3.9 with more pre-loaded libraries. Additionally, it allows you to customize your Python shell environment with pre-loaded libraries and offers you PIP support to install other native or custom Python libraries.

The new release of AWS Glue Python shell includes the necessary Python libraries to connect your script to SQL engines and data warehouses like SQLAlchemy, PyMySQL, pyodbc, psycopg2, redshift, and more. It also supports communications with other AWS services such as Amazon OpenSearch Service (opensearch-py, elasticsearch), Amazon Neptune (gremlinpython), or Athena (PyAthena). It integrates Amazon SageMaker Data Wrangler for ETL tasks like loading and unloading data from data lakes, data warehouses, and databases. It also includes library support for data serialization in industry formats such as avro and et-xmlfile.

In this post, we walk you through on how to use AWS Glue Python shell to create an ETL job that imports an Excel file and writes it in a relational database and data warehouse. The job reads the Excel file as a Pandas DataFrame, creates a data profiling report, and exports it into your Amazon Simple Storage Service (Amazon S3) bucket. This routine cleans inaccurate information and imputes missing values based on predefined business rules. It writes the data into a target MySQL database for low-latency data access. Additionally, in parallel, the script exports the DataFrame in the data lake in columnar format to be copied into Amazon Redshift for reporting and visualization.

AWS Glue Python shell new features

The new release of AWS Glue Python shell allows you to use new features of Python 3.9 and add custom libraries to your script using job parameter configurations. This gives you more flexibility to write your Python code and reduces the need to manually maintain and update Python libraries needed for your code.

Customized pre-loaded library environments

AWS Glue Python shell for Python 3.9 comes with two library environment options:

  • analytics (default) – You can run your script in a fullly pre-loaded environment for complex analytics workloads. This option loads the full package of libraries.
  • none – You can choose an empty environment for simple and fast ETL jobs. This option only loads awscli and botocore as basic libraries.

You can set this option by using the library-set parameter in the job creation, for example:

"library-set":"analytics"

For your reference, the following table lists the libraries included in each option.

Python version Python 3.9
Library set analytics (default) none
avro 1.11.0 .
awscli 1.23.5 1.23.5
awswrangler 2.15.1 .
botocore 1.23.5 1.23.5
boto3 1.22.5 .
elasticsearch 8.2.0 .
numpy 1.22.3 .
pandas 1.4.2 .
psycopg2 2.9.3 .
pyathena 2.5.3 .
PyMySQL 1.0.2 .
pyodbc 4.0.32 .
pyorc 0.6.0 .
redshift-connector 2.0.907 .
requests 2.27.1 .
scikit-learn 1.0.2 .
scipy 1.8.0 .
SQLAlchemy 1.4.36 .
s3fs 2022.3.0 .

Added support for library compilers

In this release, you can import and install libraries as part of the script, including your own C-based libraries. You have PIP support to install native or customer provided Python libraries with the support of the following compilers:

  • gcc
  • gcc-c++
  • gmake
  • cmake
  • cython
  • boost-devel
  • conda
  • python-dev

If you want to include a new package during your job creation, you can add the job parameter --additional-python-modules followed by the name of the library and the version. For example:

"--additional-python-modules":"boto3=1.22.13"

How to use the new features with the AWS Glue Python shell script

Now that we have introduced the new features, let’s create a Python 3.9 job with additional libraries with AWS Glue Python shell. You have two options to create and submit a job: you can use the interface of AWS Glue Studio, or the AWS Command Line Interface (AWS CLI) for a programmatic approach.

AWS Glue Studio

To use AWS Glue Studio, complete the following steps:

  1. On the AWS Glue Studio console, create a new job and select Python Shell script editor.
  2. Enter a job name and enter your Python script.
  3. On the Job details tab, enter an optional description.
  4. For IAM role¸ choose your job role.
  5. For Python version, choose Python 3.9.
  6. Select Load common Python libraries.
  7. Choose the script and the temporary files locations.
  8. Include the additional libraries as job parameters (--additional-python-modules).

AWS CLI

With the new release, you can now use the AWS CLI with the new parameters. The following is an example of an AWS CLI statement to create the AWS Glue Python shell script job with Python 3.9:

$ aws glue create-job 
--name <job_name> 
--role <glue_role> 
--command 
Name=pythonshell, 
PythonVersion=3.9, 
ScriptLocation=s3://<path_to_your_python_script>.py 
--default-arguments 
'{
    "--TempDir":"s3://<path_to_your_temp_dir>",
    "--job-language":"python",
    "library-set":"<analytics/default/none>",
    "--additional-python-modules":"<python package>=<version>, <>=<>"
}'
--connections <your_glue_connection> 
--timeout 30 
--max-capacity 0.0625

Let’s explore the main differences from the previous AWS Glue Python shell versions:

  • Set the option PythonVersion within the --command parameter to 3.9.
  • To add new libraries, use --additional-python-modules as a new parameter and then list the library and the required version as follows: boto3=1.22.13.
  • Include library-set within –default-arguments and choose one of the values, such as default/analytics/none.

Solution overview

This tutorial demonstrates the new features using a common use case where data flows into your system as spreadsheet files reports. In this case, you want to quickly orchestrate a way to serve this data to the right tools. This script imports the data from Amazon S3 into a Pandas DataFrame. It creates a profiling report that is exported into your S3 bucket as an HTML file. The routine cleans inaccurate information and imputes missing values based on predefined business rules. It writes the data directly from Python shell to an Amazon Relational Database Service (Amazon RDS) for MySQL server for low-latency app response. Additionally, it exports the data into a Parquet file and copies it into Amazon Redshift for visualization and reporting.

In our case, we treat each scenario as independent tasks with no dependency between them. You only need to create the infrastructure for the use cases that you want to test. Each section provides guidance and links to the documentation to set up the necessary infrastructure.

Prerequisites

There are a few requirements that are common to all scenarios:

  1. Create an S3 bucket to store the input and output files, script, and temporary files.

    Then, we create the AWS Identity and Access Management (IAM) user and role necessary to create and run the job.
  2. Create an IAM AWS Glue service role called glue-blog-role and attach the AWS managed policy AWSGlueServiceRole for general AWS Glue permissions.If you’re also testing an Amazon Redshift or Amazon RDS use case, you need to grant the necessary permission to this role. For more information, refer to Using identity-based policies (IAM policies) for Amazon Redshift and Identity-based policy examples for Amazon RDS.
  3. Create an IAM user with security credentials and configure your AWS CLI in your local terminal.
    This allows you to create and launch your scripts from your local terminal. It is recommended to create a profile associated to this configuration.

    $ aws configure --profile glue-python-shell
    AWS Access Key ID
    AWS Secret Access Key
    Default region name
    Default output format

    The dataset used in this example is an Excel file containing Amazon Video Review data with the following structure. In a later step, we place the Excel file in our S3 bucket to be processed by our ETL script.

  4. Finally, to work with sample data, we need four Python modules that were made available in AWS Glue Python shell when the parameter library-set is set to analytics:
    1. boto3
    2. awswrangler
    3. PyMySQL
    4. Pandas

Note that Amazon customer reviews are not licensed for commercial use. You should replace this data with your own authorized data source when implementing your application.

Load the data

In this section, you start writing the script by loading the data used in all the scenarios.

  1. Import the libraries that we need:
    import sys
    import io
    import os
    import boto3
    import pandas as pd
    import awswrangler as wr
    import pymysql
    import datetime
    from io import BytesIO

  2. Read the Excel spreadsheet into a DataFrame:
    AWS_S3_BUCKET = <your_s3_bucket_uri>
    s3 = boto3.resource(
        service_name='s3',
        region_name='<your_s3_region>' 
    )
    obj = s3.Bucket(AWS_S3_BUCKET).Object('amazon_reviews_us_Video.xlsx').get()
    df = pd.read_excel(io.BytesIO(obj['Body'].read())

Scenario 1: Data profiling and dataset cleaning

To assist with basic data profiling, we use the pandas-profiling module and generate a profile report from our Pandas DataFrame. Pandas profiling supports output files in JSON and HTML format. In this post, we generate an HTML output file and place it in an S3 bucket for quick data analysis.

To use this new library during the job, add the --additional-python-modules parameter from the job details page in AWS Glue Studio or during job creation from the AWS CLI. Remember to include this package in the imports of your script:

from pandas_profiling import ProfileReport
…
profile = ProfileReport(df)
s3.Object(AWS_S3_BUCKET,'output-profile/profile.html').put(Body=profile.to_html())

A common problem that we often see when dealing with a column’s data type is the mix of data types are identified as an object in a Pandas DataFrame. Mixed data type columns are flagged by pandas-profiling as Unsupported type and stored in the profile report description. We can access the information and standardize it to our desired data types.

The following lines of code loop every column in the DataFrame and check if any of the columns are flagged as Unsupported by pandas-profiling. We then cast it to string:

for col in df.columns:
    if (profile.description_set['variables'][col]['type']) == 'Unsupported':
        df[col] = df[col].astype(str)

To further clean or process your data, you can access variables provided by pandas-profiling. The following example prints out all columns with missing values:

for col in df.columns:
    if profile.description_set['variables'][col]['n_missing'] > 0:
        print (col, " is missing ", profile.description_set['variables'][col]['n_missing'], " data type ", profile2.description_set['variables'][col]['type'])
        #missing data handling
        #....

Scenario 2: Export data in columnar format and copy it to Amazon Redshift

In this scenario, we export our DataFrame into Parquet columnar format, store it in Amazon S3, and copy it to Amazon Redshift. We use Data Wrangler to connect our script to Amazon Redshift. This Python module is already included in the analytics environment. Complete the following steps to set up the necessary infrastructure:

Now we can write raw data to Amazon S3 in Parquet format and to Amazon Redshift.

A common partition strategy is to divide rows by year, month, and day from your date column and apply multi-level partitioning. This approach allows fast and cost-effective retrieval for all rows assigned to a particular year, month, or date. Another strategy to partition your data is by using a specific column directly. For example, using review_date as a partition gives you single level of directory for every unique date and stores the corresponding data in it.

In this post, we prepare our data for the multi-level date partitioning strategy. We start by extracting year, month, and day from our date column:

df['day']= pd.DatetimeIndex(df['review_date']).day.astype(str)
df['month']= pd.DatetimeIndex(df['review_date']).month.astype(str)
df['year']= pd.DatetimeIndex(df['review_date']).year.astype(str)

With our partition columns ready, we can use the awswrangler module to write to Amazon S3 in Parquet format:

wr.s3.to_parquet(
    df=df,
    path="s3://<your_output_s3_bucket>", #change this value with path to your bucket
    dataset=True,
    mode="overwrite",       
    partition_cols=['year','month','day']

To query your partitioned data in Amazon S3, you can use Athena, our serverless interactive query service. For more information, refer to Partitioning data with Athena.

Next, we write our DataFrame directly to Amazon Redshift internal storage by using Data Wrangler. Writing to Amazon Redshift internal storage is advised when you’re going to use this data frequently for complex analytics, large SQL operations, or business intelligence (BI) reporting. In Amazon Redshift, it’s advised to define the distribution style and sort key on the table to improve cluster performance. If you’re not sure about the right value for those parameters, you can use the Amazon Redshift auto distribution style and sort key and follow Amazon Redshift advisor recommendations. For more information on Amazon Redshift data distribution, refer to Working with data distribution styles.

#drop review columns and preserve other columns for analysis
df = df.drop(['review_body','review_headline'], axis=1)

#generate dictionary with length to be used by awswrangler to create varchar columns
max_length_object_cols = {col: df.loc[:, col].astype(str).apply(len).max() for col in df.select_dtypes([object]).columns}

#connect to Redshift via Glue connection
con = wr.redshift.connect("<your_glue_connection>")

#copy DataFrame into Redshift table 
wr.redshift.copy(
    df=df,
    path=<temporarty path for staging files>,
    con=con,
    table="<your_redshift_table_name>", #awswrangler will create table if it does not exist
    schema="<your_redshift_schema>",
    mode="overwrite",
    iam_role=<your_iam_role_arn_with_permission_to_redshift>,
    varchar_lengths= max_length_object_cols,
	   diststyle="AUTO",
    )

#close connection    
con.close()

Scenario 3: Data ingestion into Amazon RDS

In this scenario, we open a connection between AWS Glue Python shell and ingest the data directly into Amazon RDS for MySQL. The infrastructure you require for this scenario is an RDS for MySQL database in the same Region as the AWS Glue Python shell job. For more information, refer to Creating a MySQL DB instance and connecting to a database on a MySQL DB instance.

With the PyMySQL and boto3 modules, we can now connect to our RDS for MySQL database and write our DataFrame into a table.

Prepare the variables for connection and generate a database authentication token for database login:

#RDS connection details
MYSQL_ENDPOINT = "<mysql_endpoint>"
PORT= "3306"
USER= "<mysql_username>"
REGION = "<region_for_rds_mysql>"
DBNAME = "<database_name>"
session = boto3.Session(profile_name='<your_aws_profile>')
client = session.client('rds')

#generate db authentication token 
token = client.generate_db_auth_token(DBHostname=MYSQL_ENDPOINT, Port=PORT, DBUsername=USER, Region=REGION)

#connect to database
connection = pymysql.connect(host=MYSQL_ENDPOINT,
    user=USER,
    password=token,
    db=DBNAME,
    ssl_ca='global-bundle.pem')
    
#arrange columns and values for SQL insert statement    
columns = ','.join(df.columns)
values=','.join(['%s'.format(i+1) for i in range(len(df.columns))])

#SQL statement to insert into RDS
load_sql = f"INSERT INTO demo_blog.amazon_video_review({columns:}) VALUES ({values:})"

For more information about using an SSL connection with your RDS instance, refer to Using SSL/TLS to encrypt a connection to a DB instance.

Connect to your RDS for MySQL database and write a Pandas DataFrame into the table with the following code:

try:
    with connection.cursor() as cur:
        cur.executemany(load_sql, df.values.tolist())
    connection.commit()
finally:
    cur.close()

You need to create a table in Amazon RDS for MySQL prior to running the insert statement. Use the following DDL to create the demo_blog.amazon_video_review table:

CREATE TABLE `amazon_video_review` (
  `marketplace` varchar(100) NOT NULL,
  `customer_id` bigint NOT NULL,
  `review_id` varchar(100) DEFAULT NULL,
  `product_id` varchar(100) DEFAULT NULL,
  `product_parent` bigint NOT NULL,
  `product_title` varchar(100) DEFAULT NULL,
  `product_category` varchar(100) DEFAULT NULL,
  `star_rating` bigint NOT NULL,
  `helpful_votes` bigint NOT NULL,
  `total_votes` bigint NOT NULL,
  `vine` varchar(100) DEFAULT NULL,
  `verified_purchase` varchar(100) DEFAULT NULL,
  `review_headline` varchar(100) DEFAULT NULL,
  `review_body` varchar(5000) DEFAULT NULL,
  `review_date` date NOT NULL,
  `year` varchar(100) DEFAULT NULL,
  `month` varchar(100) DEFAULT NULL,
  `date` varchar(100) DEFAULT NULL,
  `day` varchar(100) DEFAULT NULL
)

When the data is available in database, you can perform a simple aggregation as follows:

agg_sql="insert into demo_blog.video_review_recap select product_title , year as review_year, count(*) as total_review, sum(case when verified_purchase=\"Y\" then 1 else 0 end) as total_verified_purchase,sum(case when verified_purchase=\"N\" then 1 else 0 end) as total_unverified_purchase from demo_blog.amazon_video_review avr group by 1 order by 2 DESC"
cursor = connection.cursor()
cursor.execute(agg_sql)

Create and run your job

After you finalize your code, you can run it from AWS Glue Studio or save it in a script .py file and submit a job with the AWS CLI. Remember to add the necessary parameters in your job creation depending of the scenario you’re testing. The following job parameters cover all the scenarios:

--command pythonVersion=3.9 …
--default-arguments '{"library-set":"analytics" , "--additional-python-modules":"pandas_profile", …}'

Review the results

In this section, we review the expected results for each scenario.

In Scenario 1, pandas-profiling generates a data report in HTML format. In this report, you can visualize missing values, duplicated values, size estimations, or correlations between columns, as shown in the following screenshots.

For Scenario 2, you can first review the Parquet file written to Amazon S3 in Parquet format with partition year/month/day.

Then you can use the Amazon Redshift query editor to query and visualize the data.

For Scenario 3, you can use a JDBC connection or database IDE to connect to your RDS database and query the data that you just ingested.

Clean up

AWS Glue Python shell is a serverless routine that won’t incur in any extra charges when it isn’t running. However, this demo used several services that will incur in extra costs. Clean up after completing this walkthrough with the following steps:

  1. Remove the contents of your S3 bucket and delete it. If you encounter any errors, refer to Why can’t I delete my S3 bucket using the Amazon S3 console or AWS CLI, even with full or root permissions.
  2. Stop and delete the RDS DB instance. For instructions, see Deleting a DB instance.
  3. Stop and delete the Amazon Redshift cluster. For instructions, refer to Deleting a cluster.

Conclusion

In this post, we introduced AWS Glue Python shell with Python 3.9 support and more pre-loaded libraries. We presented the customizable Python shell environment with pre-loaded libraries and PIP support to install other native or custom Python libraries. We covered the new features and how to get started through AWS Glue Studio and the AWS CLI. We also demonstrated a step-by-step tutorial of how you can easily use these new capabilities to accomplish common ETL use cases.

To learn more about AWS Glue Python shell and this new feature, refer to Python shell jobs in AWS Glue.


About the authors

Alunnata Mulyadi is an Analytics Specialist Solutions Architect at AWS. Alun has over a decade of experience in data engineering, helping customers address their business and technical needs. Outside of the work, he enjoys photography, cycling, and basketball.

Quim Bellmunt is an Analytics Specialist Solutions Architect at Amazon Web Services. Quim has a PhD in Computer Science and Knowledge Graph focusing on data modeling and transformation. With over 6 years of hands-on experience in the analytics and AI/ML space, he enjoys helping customers create systems that scale with their business needs and generate value from their data. Outside of the work, he enjoys walking with his dog and cycling.

Kush Rustagi is a Software Development Engineer on the AWS Glue team with over 4 years of experience in the industry having worked on large-scale financial systems in Python and C++, and is now using his scalable system design experience towards cloud development. Before working on Glue Python Shell, Kush worked on anomaly detection challenges in the fin-tech space. Aside from exploring new technologies, he enjoys EDM, traveling, and learning non-programming languages.

How NerdWallet uses AWS and Apache Hudi to build a serverless, real-time analytics platform

Post Syndicated from Kevin Chun original https://aws.amazon.com/blogs/big-data/how-nerdwallet-uses-aws-and-apache-hudi-to-build-a-serverless-real-time-analytics-platform/

This is a guest post by Kevin Chun, Staff Software Engineer in Core Engineering at NerdWallet.

NerdWallet’s mission is to provide clarity for all of life’s financial decisions. This covers a diverse set of topics: from choosing the right credit card, to managing your spending, to finding the best personal loan, to refinancing your mortgage. As a result, NerdWallet offers powerful capabilities that span across numerous domains, such as credit monitoring and alerting, dashboards for tracking net worth and cash flow, machine learning (ML)-driven recommendations, and many more for millions of users.

To build a cohesive and performant experience for our users, we need to be able to use large volumes of varying user data sourced by multiple independent teams. This requires a strong data culture along with a set of data infrastructure and self-serve tooling that enables creativity and collaboration.

In this post, we outline a use case that demonstrates how NerdWallet is scaling its data ecosystem by building a serverless pipeline that enables streaming data from across the company. We iterated on two different architectures. We explain the challenges we ran into with the initial design and the benefits we achieved by using Apache Hudi and additional AWS services in the second design.

Problem statement

NerdWallet captures a sizable amount of spending data. This data is used to build helpful dashboards and actionable insights for users. The data is stored in an Amazon Aurora cluster. Even though the Aurora cluster works well as an Online Transaction Processing (OLTP) engine, it’s not suitable for large, complex Online Analytical Processing (OLAP) queries. As a result, we can’t expose direct database access to analysts and data engineers. The data owners have to solve requests with new data derivations on read replicas. As the data volume and the diversity of data consumers and requests grow, this process gets more difficult to maintain. In addition, data scientists mostly require data files access from an object store like Amazon Simple Storage Service (Amazon S3).

We decided to explore alternatives where all consumers can independently fulfill their own data requests safely and scalably using open-standard tooling and protocols. Drawing inspiration from the data mesh paradigm, we designed a data lake based on Amazon S3 that decouples data producers from consumers while providing a self-serve, security-compliant, and scalable set of tooling that is easy to provision.

Initial design

The following diagram illustrates the architecture of the initial design.

The design included the following key components:

  1. We chose AWS Data Migration Service (AWS DMS) because it’s a managed service that facilitates the movement of data from various data stores such as relational and NoSQL databases into Amazon S3. AWS DMS allows one-time migration and ongoing replication with change data capture (CDC) to keep the source and target data stores in sync.
  2. We chose Amazon S3 as the foundation for our data lake because of its scalability, durability, and flexibility. You can seamlessly increase storage from gigabytes to petabytes, paying only for what you use. It’s designed to provide 11 9s of durability. It supports structured, semi-structured, and unstructured data, and has native integration with a broad portfolio of AWS services.
  3. AWS Glue is a fully managed data integration service. AWS Glue makes it easier to categorize, clean, transform, and reliably transfer data between different data stores.
  4. Amazon Athena is a serverless interactive query engine that makes it easy to analyze data directly in Amazon S3 using standard SQL. Athena scales automatically—running queries in parallel—so results are fast, even with large datasets, high concurrency, and complex queries.

This architecture works fine with small testing datasets. However, the team quickly ran into complications with the production datasets at scale.

Challenges

The team encountered the following challenges:

  • Long batch processing time and complexed transformation logic – A single run of the Spark batch job took 2–3 hours to complete, and we ended up getting a fairly large AWS bill when testing against billions of records. The core problem was that we had to reconstruct the latest state and rewrite the entire set of records per partition for every job run, even if the incremental changes were a single record of the partition. When we scaled that to thousands of unique transactions per second, we quickly saw the degradation in transformation performance.
  • Increased complexity with a large number of clients – This workload contained millions of clients, and one common query pattern was to filter by single client ID. There were numerous optimizations that we were forced to tack on, such as predicate pushdowns, tuning the Parquet file size, using a bucketed partition scheme, and more. As more data owners adopted this architecture, we would have to customize each of these optimizations for their data models and consumer query patterns.
  • Limited extendibility for real-time use cases – This batch extract, transform, and load (ETL) architecture wasn’t going to scale to handle hourly updates of thousands of records upserts per second. In addition, it would be challenging for the data platform team to keep up with the diverse real-time analytical needs. Incremental queries, time-travel queries, improved latency, and so on would require heavy investment over a long period of time. Improving on this issue would open up possibilities like near-real-time ML inference and event-based alerting.

With all these limitations of the initial design, we decided to go all-in on a real incremental processing framework.

Solution

The following diagram illustrates our updated design. To support real-time use cases, we added Amazon Kinesis Data Streams, AWS Lambda, Amazon Kinesis Data Firehose and Amazon Simple Notification Service (Amazon SNS) into the architecture.

The updated components are as follows:

  1. Amazon Kinesis Data Streams is a serverless streaming data service that makes it easy to capture, process, and store data streams. We set up a Kinesis data stream as a target for AWS DMS. The data stream collects the CDC logs.
  2. We use a Lambda function to transform the CDC records. We apply schema validation and data enrichment at the record level in the Lambda function. The transformed results are published to a second Kinesis data stream for the data lake consumption and an Amazon SNS topic so that changes can be fanned out to various downstream systems.
  3. Downstream systems can subscribe to the Amazon SNS topic and take real-time actions (within seconds) based on the CDC logs. This can support use cases like anomaly detection and event-based alerting.
  4. To solve the problem of long batch processing time, we use Apache Hudi file format to store the data and perform streaming ETL using AWS Glue streaming jobs. Apache Hudi is an open-source transactional data lake framework that greatly simplifies incremental data processing and data pipeline development. Hudi allows you to build streaming data lakes with incremental data pipelines, with support for transactions, record-level updates, and deletes on data stored in data lakes. Hudi integrates well with various AWS analytics services such as AWS Glue, Amazon EMR, and Athena, which makes it a straightforward extension of our previous architecture. While Apache Hudi solves the record-level update and delete challenges, AWS Glue streaming jobs convert the long-running batch transformations into low-latency micro-batch transformations. We use the AWS Glue Connector for Apache Hudi to import the Apache Hudi dependencies in the AWS Glue streaming job and write transformed data to Amazon S3 continuously. Hudi does all the heavy lifting of record-level upserts, while we simply configure the writer and transform the data into Hudi Copy-on-Write table type. With Hudi on AWS Glue streaming jobs, we reduce the data freshness latency for our core datasets from hours to under 15 minutes.
  5. To solve the partition challenges for high cardinality UUIDs, we use the bucketing technique. Bucketing groups data based on specific columns together within a single partition. These columns are known as bucket keys. When you group related data together into a single bucket (a file within a partition), you significantly reduce the amount of data scanned by Athena, thereby improving query performance and reducing cost. Our existing queries are filtered on the user ID already, so we significantly improve the performance of our Athena usage without having to rewrite queries by using bucketed user IDs as the partition scheme. For example, the following code shows total spending per user in specific categories:
    SELECT ID, SUM(AMOUNT) SPENDING
    FROM "{{DATABASE}}"."{{TABLE}}"
    WHERE CATEGORY IN (
    'ENTERTAINMENT',
    'SOME_OTHER_CATEGORY')
    AND ID_BUCKET ='{{ID_BUCKET}}'
    GROUP BY ID;

  1. Our data scientist team can access the dataset and perform ML model training using Amazon SageMaker.
  2. We maintain a copy of the raw CDC logs in Amazon S3 via Amazon Kinesis Data Firehose.

Conclusion

In the end, we landed on a serverless stream processing architecture that can scale to thousands of writes per second within minutes of freshness on our data lakes. We’ve rolled out to our first high-volume team! At our current scale, the Hudi job is processing roughly 1.75 MiB per second per AWS Glue worker, which can automatically scale up and down (thanks to AWS Glue auto scaling). We’ve also observed an outstanding improvement of end-to-end freshness at less than 5 minutes due to Hudi’s incremental upserts vs. our first attempt.

With Hudi on Amazon S3, we’ve built a high-leverage foundation to personalize our users’ experiences. Teams that own data can now share their data across the organization with reliability and performance characteristics built into a cookie-cutter solution. This enables our data consumers to build more sophisticated signals to provide clarity for all of life’s financial decisions.

We hope that this post will inspire your organization to build a real-time analytics platform using serverless technologies to accelerate your business goals.


About the authors

Kevin Chun is a Staff Software Engineer in Core Engineering at NerdWallet. He builds data infrastructure and tooling to help NerdWallet provide clarity for all of life’s financial decisions.

Dylan Qu is a Specialist Solutions Architect focused on big data and analytics with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Introducing AWS Glue Flex jobs: Cost savings on ETL workloads

Post Syndicated from Aniket Jiddigoudar original https://aws.amazon.com/blogs/big-data/introducing-aws-glue-flex-jobs-cost-savings-on-etl-workloads/

AWS Glue is a serverless data integration service that makes it simple to discover, prepare, and combine data for analytics, machine learning (ML), and application development. You can use AWS Glue to create, run, and monitor data integration and ETL (extract, transform, and load) pipelines and catalog your assets across multiple data stores. Typically, these data integration jobs can have varying degrees of priority and time sensitivity. For example, non-urgent workloads such as pre-production, testing, and one-time data loads often don’t require fast job startup times or consistent runtimes via dedicated resources.

Today, we are pleased to announce the general availability of a new AWS Glue job run class called Flex. Flex allows you to optimize your costs on your non-urgent or non-time sensitive data integration workloads such as pre-production jobs, testing, and one-time data loads. With Flex, AWS Glue jobs run on spare compute capacity instead of dedicated hardware. The start and runtimes of jobs using Flex can vary because spare compute resources aren’t readily available and can be reclaimed during the run of a job

Regardless of the run option used, AWS Glue jobs have the same capabilities, including access to custom connectors, visual authoring interface, job scheduling, and Glue Auto Scaling. With the Flex execution option, customers can optimize the costs of their data integration workloads by configuring the execution option based on the workloads’ requirements, using standard execution option for time-sensitive workloads, and Flex for non-urgent workloads. The Flex execution class is available for AWS Glue 3.0 Spark jobs.

The Flex execution class is available for AWS Glue 3.0 Spark jobs.

In this post, we provide more details about AWS Glue Flex jobs and how to enable Flex capacity.

How do you use Flexible capacity?

The AWS Glue jobs API now supports an additional parameter called execution-class, which lets you choose STANDARD or FLEX when running the job. To use Flex, you simply set the parameter to FLEX.

To enable Flex via the AWS Glue Studio console, complete the following steps:

  1. On the AWS Glue Studio console, while authoring a job, navigate to the Job details tab
  2. Select Flex Execution.
  3. Set an appropriate value for the Job Timeout parameter (defaults to 120 minutes for Flex jobs).
  4. Save the job.
  5. After finalizing all other details, choose Run to run the job with Flex capacity.

On the Runs tab, you should be able to see FLEX listed under Execution class.

You can also enable Flex via the AWS Command Line Interface (AWS CLI).

You can set the --execution-class setting in the start-job-run API, which lets you run a particular AWS Glue job’s run with Flex capacity:

aws glue start-job-run --job-name my-job \
    --execution-class FLEX \
    --timeout 300 \

You can also set the --execution-class during the create-job API. This sets the default run class of all the runs of this job to FLEX:

aws glue create-job \
    --name flexCLI \
    --role AWSGlueServiceRoleDefault \
    --command "Name=glueetl,ScriptLocation=s3://mybucket/myfolder/" \
    --region us-east-2 \
    --execution-class FLEX \
    --worker-type G.1X \
    --number-of-workers 10 \
    --glue-version 3.0

The following are additional details about the relevant parameters:

  • –execution-class – The enum string that specifies if a job should be run as FLEX or STANDARD capacity. The default is STANDARD.
  • –timeout – Specifies the time (in minutes) the job will run before it’s moved into a TIMEOUT state.

When should you use Flexible capacity?

The Flex execution class is ideal for reducing the costs of time-insensitive workloads. For example:

  • Nightly ETL jobs, or jobs that run over weekends for processing workloads
  • One-time bulk data ingestion jobs
  • Jobs running in test environments or pre-production workloads
  • Time-insensitive workloads where it’s acceptable to have variable start and end times

In comparison, the standard execution class is ideal for time-sensitive workloads that require fast job startup and dedicated resources. In addition, jobs that have downstream dependencies are better served by the standard execution class.

What is the typical life-cycle of a Flexible capacity Job?

When a start-job-run API call is issued, with the execution-class set to FLEX, AWS Glue will begin to request compute resources. If no resources are available immediately upon issuing the API call, the job will move into a WAITING state. No billing occurs at this point.

As soon as the job is able to acquire compute resources, the job moves to a RUNNING state. At this point, even if all the computes requested aren’t available, the job begins running on whatever hardware is present. As more Flex capacity becomes available, AWS Glue adds it to the job, up to a maximum value specified by Number of workers.

At this point, billing begins. You’re charged only for the compute resources that are running at any given time, and only for the duration that they ran for.

While the job is running, if Flex capacity is reclaimed, AWS Glue continues running the job on the existing compute resources while it tries to meet the shortfall by requesting more resources. If capacity is reclaimed, billing for that capacity is halted as well. Billing for new capacity will start when it is provisioned again. If the job completes successfully, the job’s state moves to SUCCEEDED. If the job fails due to various user or system errors, the job’s state transitions to FAILED. If the job is unable to complete before the time specified by the --timeout parameter, whether due to a lack of compute capacity or due to issues with the AWS Glue job script, the job goes into a TIMEOUT state.

Flexible job runs rely on the availability of non-dedicated compute capacity in AWS, which in turn depends on several factors, such as the Region and Availability Zone, time of day, day of the week, and the number of DPUs required by a job.

A parameter of particular importance for Flex Jobs is the --timeout value. It’s possible for Flex jobs to take longer to run than standard jobs, especially if capacity is reclaimed while the job is running. As a result, selecting the right timeout value that’s appropriate for your workload is critical. Choose a timeout value such that the total cost of the Flex job run doesn’t exceed a standard job run. If the value is set too high, the job can wait for too long, trying to acquire capacity that isn’t available. If the value is set too low, the job times out, even if capacity is available and the job execution is proceeding correctly.

How are Flex capacity jobs billed?

Flex jobs are billed per worker at the Flex DPU-hour rates. This means that you’re billed only for the capacity that actually ran during the execution of the job, for the duration that it ran.

For example, if you ran an AWS Glue Flex job for 10 workers, and AWS Glue was only able to acquire 5 workers, you’re only billed for five workers, and only for the duration that those workers ran. If, during the job run, two out of those five workers are reclaimed, then billing for those two workers is stopped, while billing for the remaining three workers continues. If provisioning for the two reclaimed workers is successful during the job run, billing for those two will start again.

For more information on Flex pricing, refer to AWS Glue pricing.

Conclusion

This post discusses the new AWS Glue Flex job execution class, which allows you to optimize costs for non-time-sensitive ETL workloads and test environments.

You can start using Flex capacity for your existing and new workloads today. However, note that the Flex class is not supported for Python Shell jobs, AWS Glue streaming jobs, or AWS Glue ML jobs.

For more information on AWS Glue Flex jobs, refer to their latest documentation.

Special thanks to everyone who contributed to the launch: Parag Shah, Sampath Shreekantha, Yinzhi Xi and Jessica Cheng,


About the authors

Aniket Jiddigoudar is a Big Data Architect on the AWS Glue team.

Vaibhav Porwal is a Senior Software Development Engineer on the AWS Glue team.

Sriram Ramarathnam is a Software Development Manager on the AWS Glue team.

Best practices to optimize cost and performance for AWS Glue streaming ETL jobs

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/best-practices-to-optimize-cost-and-performance-for-aws-glue-streaming-etl-jobs/

AWS Glue streaming extract, transform, and load (ETL) jobs allow you to process and enrich vast amounts of incoming data from systems such as Amazon Kinesis Data Streams, Amazon Managed Streaming for Apache Kafka (Amazon MSK), or any other Apache Kafka cluster. It uses the Spark Structured Streaming framework to perform data processing in near-real time.

This post covers use cases where data needs to be efficiently processed, delivered, and possibly actioned in a limited amount of time. This can cover a wide range of cases, such as log processing and alarming, continuous data ingestion and enrichment, data validation, internet of things, machine learning (ML), and more.

We discuss the following topics:

  • Development tools that help you code faster using our newly launched AWS Glue Studio notebooks
  • How to monitor and tune your streaming jobs
  • Best practices for sizing and scaling your AWS Glue cluster, using our newly launched features like auto scaling and the small worker type G 0.25X

Development tools

AWS Glue Studio notebooks can speed up the development of your streaming job by allowing data engineers to work using an interactive notebook and test code changes to get quick feedback—from business logic coding to testing configuration changes—as part of tuning.

Before you run any code in the notebook (which would start the session), you need to set some important configurations.

The magic %streaming creates the session cluster using the same runtime as AWS Glue streaming jobs. This way, you interactively develop and test your code using the same runtime that you use later in the production job.

Additionally, configure Spark UI logs, which will be very useful for monitoring and tuning the job.

See the following configuration:

%streaming
%%configure
{
"--enable-spark-ui": "true",
"--spark-event-logs-path": "s3://your_bucket/sparkui/"
}

For additional configuration options such as version or number of workers, refer to Configuring AWS Glue interactive sessions for Jupyter and AWS Glue Studio notebooks.

To visualize the Spark UI logs, you need a Spark history server. If you don’t have one already, refer to Launching the Spark History Server for deployment instructions.

Structured Streaming is based on streaming DataFrames, which represent micro-batches of messages.
The following code is an example of creating a stream DataFrame using Amazon Kinesis as the source:

kinesis_options = {
  "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
  "startingPosition": "TRIM_HORIZON",
  "inferSchema": "true",
  "classification": "json"
}
kinesisDF = glueContext.create_data_frame_from_options(
   connection_type="kinesis",
   connection_options=kinesis_options
)

The AWS Glue API helps you create the DataFrame by doing schema detection and auto decompression, depending on the format. You can also build it yourself using the Spark API directly:

kinesisDF = spark.readStream.format("kinesis").options(**kinesis_options).load()

After your run any code cell, it triggers the startup of the session, and the application soon appears in the history server as an incomplete app (at the bottom of the page there is a link to display incomplete apps) named GlueReplApp, because it’s a session cluster. For a regular job, it’s listed with the job name given when it was created.

History server home page

From the notebook, you can take a sample of the streaming data. This can help development and give an indication of the type and size of the streaming messages, which might impact performance.

Monitor the cluster with Structured Streaming

The best way to monitor and tune your AWS Glue streaming job is using the Spark UI; it gives you the overall streaming job trends on the Structured Streaming tab and the details of each individual micro-batch processing job.

Overall view of the streaming job

On the Structured Streaming tab, you can see a summary of the streams running in the cluster, as in the following example.

Normally there is just one streaming query, representing a streaming ETL. If you start multiple in parallel, it’s good if you give it a recognizable name, calling queryName() if you use the writeStream API directly on the DataFrame.

After a good number of batches are complete (such as 10), enough for the averages to stabilize, you can use Avg Input/sec column to monitor how many events or messages the job is processing. This can be confusing because the column to the right, Avg Process/sec, is similar but often has a higher number. The difference is that this process time tells us how efficient our code is, whereas the average input tells us how many messages the cluster is reading and processing.

The important thing to note is that if the two values are similar, it means the job is working at maximum capacity. It’s making the best use of the hardware but it likely won’t be able to cope with an increase in volume without causing delays.

In the last column is the latest batch number. Because they’re numbered incrementally from zero, this tells us how many batches the query has processed so far.

When you choose the link in the “Run ID” column of a streaming query, you can review the details with graphs and histograms, as in the following example.

The first two rows correspond to the data that is used to calculate the averages shown on the summary page.

For Input Rate, each data point is calculated by dividing the number of events read for the batch by the time passed between the current batch start and the previous batch start. In a healthy system that is able to keep up, this is equal to the configured trigger interval (in the GlueContext.forEachBatch() API, this is set using the option windowSize).

Because it uses the current batch rows with the previous batch latency, this graph is often unstable in the first batches until the Batch Duration (the last line graph) stabilizes.

In this example, when it stabilizes, it gets completely flat. This means that either the influx of messages is constant or the job is hitting the limit per batch set (we discuss how to do this later in the post).

Be careful if you set a limit per batch that is constantly hit, you could be silently building a backlog, but everything could look good in the job metrics. To monitor this, have a metric of latency measuring the difference between the message timestamp when it gets created and the time it’s processed.

Process Rate is calculated by dividing the number of messages in a batch by the time it took to process that batch. For instance, if the batch contains 1,000 messages, and the trigger interval is 10 seconds but the batch only needed 5 seconds to process it, the process rate would be 1000/5 = 200 msg/sec. while the input rate for that batch (assuming the previous batch also ran within the interval) is 1000/10 = 100 msg/sec.

This metric is useful to measure how efficient our code processing the batch is, and therefore it can get higher than the input rate (this doesn’t mean it’s processing more messages, just using less time). As mentioned earlier, if both metrics get close, it means the batch duration is close to the interval and therefore additional traffic is likely to start causing batch trigger delays (because the previous batch is still running) and increase latency.

Later in this post, we show how auto scaling can help prevent this situation.

Input Rows shows the number of messages read for each batch, like input rate, but using volume instead of rate.

It’s important to note that if the batch processes the data multiple times (for example, writing to multiple destinations), the messages are counted multiple times. If the rates are greater than the expected, this could be the reason. In general, to avoid reading messages multiple times, you should cache the batch while processing it, which is the default when you use the GlueContext.forEachBatch() API.

The last two rows tell us how long it takes to process each batch and how is that time spent. It’s normal to see the first batches take much longer until the system warms up and stabilizes.
The important thing to look for is that the durations are roughly stable and well under the configured trigger interval. If that’s not the case, the next batch gets delayed and could start a compounding delay by building a backlog or increasing batch size (if the limit allows taking the extra messages pending).

In Operation Duration, the majority of time should be spent on addBatch (the mustard color), which is the actual work. The rest are fixed overhead, therefore the smaller the batch process, the more percentage of time that will take. This represents the trade-off between small batches with lower latency or bigger batches but more computing efficient.

Also, it’s normal for the first batch to spend significant time in the latestOffset (the brown bar), locating the point at which it needs to start processing when there is no checkpoint.

The following query statistics show another example.

In this case, the input has some variation (meaning it’s not hitting the batch limit). Also, the process rate is roughly the same as the input rate. This tells us the system is at max capacity and struggling to keep up. By comparing the input rows and input rate, we can guess that the interval configured is just 3 seconds and the batch duration is barely able to meet that latency.

Finally, in Operation Duration, you can observe that because the batches are so frequent, a significant amount of time (proportionally speaking) is spent saving the checkpoint (the dark green bar).

With this information, we can probably improve the stability of the job by increasing the trigger interval to 5 seconds or more. This way, it checkpoints less often and has more time to process data, which might be enough to get batch duration consistently under the interval. The trade-off is that the latency between when a message is published and when it’s processed is longer.

Monitor individual batch processing

On the Jobs tab, you can see how long each batch is taking and dig into the different steps the processing involves to understand how the time is spent. You can also check if there are tasks that succeed after retry. If this happens continuously, it can silently hurt performance.

For instance, the following screenshot shows the batches on the Jobs tab of the Spark UI of our streaming job.

Each batch is considered a job by Spark (don’t confuse the job ID with the batch number; they only match if there is no other action). The job group is the streaming query ID (this is important only when running multiple queries).

The streaming job in this example has a single stage with 100 partitions. Both batches processed them successfully, so the stage is marked as succeeded and all the tasks completed (100/100 in the progress bar).

However, there is a difference in the first batch: there were 20 task failures. You know all the failed tasks succeeded in the retries, otherwise the stage would have been marked as failed. For the stage to fail, the same task would have to fail four times (or as configured by spark.task.maxFailures).

If the stage fails, the batch fails as well and possibly the whole job; if the job was started by using GlueContext.forEachBatch(), it has a number of retries as per the batchMaxRetries parameter (three by default).

These failures are important because they have two effects:

  • They can silently cause delays in the batch processing, depending on how long it took to fail and retry.
  • They can cause records to be sent multiple times if the failure is in the last stage of the batch, depending on the type of output. If the output is files, in general it won’t cause duplicates. However, if the destination is Amazon DynamoDB, JDBC, Amazon OpenSearch Service, or another output that uses batching, it’s possible that some part of the output has already been sent. If you can’t tolerate any duplicates, the destination system should handle this (for example, being idempotent).

Choosing the description link takes you to the Stages tab for that job. Here you can dig into the failure: What is the exception? Is it always in the same executor? Does it succeed on the first retry or took multiple?

Ideally, you want to identify these failures and solve them. For example, maybe the destination system is throttling us because doesn’t have enough provisioned capacity, or a larger timeout is needed. Otherwise, you should at least monitor it and decide if it is systemic or sporadic.

Sizing and scaling

Defining how to split the data is a key element in any distributed system to run and scale efficiently. The design decisions on the messaging system will have a strong influence on how the streaming job will perform and scale, and thereby affect the job parallelism.

In the case of AWS Glue Streaming, this division of work is based on Apache Spark partitions, which define how to split the work so it can be processed in parallel. Each time the job reads a batch from the source, it divides the incoming data into Spark partitions.

For Apache Kafka, each topic partition becomes a Spark partition; similarly, for Kinesis, each stream shard becomes a Spark partition. To simplify, I’ll refer to this parallelism level as number of partitions, meaning Spark partitions that will be determined by the input Kafka partitions or Kinesis shards on a one-to-one basis.

The goal is to have enough parallelism and capacity to process each batch of data in less time than the configured batch interval and therefore be able to keep up. For instance, with a batch interval of 60 seconds, the job lets 60 seconds of data build up and then processes that data. If that work takes more than 60 seconds, the next batch waits until the previous batch is complete before starting a new batch with the data that has built up since the previous batch started.

It’s a good practice to limit the amount of data to process in a single batch, instead of just taking everything that has been added since the last one. This helps make the job more stable and predictable during peak times. It allows you to test that the job can handle volume of data without issues (for example, memory or throttling).

To do so, specify a limit when defining the source stream DataFrame:

  • For Kinesis, specify the limit using kinesis.executor.maxFetchRecordsPerShard, and revise this number if the number of shards changes substantially. You might need to increase kinesis.executor.maxFetchTimeInMs as well, in order to allow more time to read the batch and make sure it’s not truncated.
  • For Kafka, set maxOffsetsPerTrigger, which divides that allowance equally between the number of partitions.

The following is an example of setting this config for Kafka (for Kinesis, it’s equivalent but using Kinesis properties):

kafka_properties= {
  "kafka.bootstrap.servers": "bootstrapserver1:9092",
  "subscribe": "mytopic",
  "startingOffsets": "latest",
  "maxOffsetsPerTrigger": "5000000"
}
# Pass the properties as options when creating the DataFrame
spark.spark.readStream.format("kafka").options(**kafka_properties).load()

Initial benchmark

If the events can be processed individually (no interdependency such as grouping), you can get a rough estimation of how many messages a single Spark core can handle by running with a single partition source (one Kafka partition or one Kinesis shard stream) with data preloaded into it and run batches with a limit and the minimum interval (1 second). This simulates a stress test with no downtime between batches.

For these repeated tests, clear the checkpoint directory, use a different one (for example, make it dynamic using the timestamp in the path), or just disable the checkpointing (if using the Spark API directly), so you can reuse the same data.
Leave a few batches to run (at least 10) to give time for the system and the metrics to stabilize.

Start with a small limit (using the limit configuration properties explained in the previous section) and do multiple reruns, increasing the value. Record the batch duration for that limit and the throughput input rate (because it’s a stress test, the process rate should be similar).

In general, larger batches tend to be more efficient up to a point. This is because the fixed overhead taken for each to checkpoint, plan, and coordinate the nodes is more significant if the batches are smaller and therefore more frequent.

Then pick your reference initial settings based on the requirements:

  • If a goal SLA is required, use the largest batch size whose batch duration is less than half the latency SLA. This is because in the worst case, a message that is stored just after a batch is triggered has to wait at least the interval and then the processing time (which should be less than the interval). When the system is keeping up, the latency in this worst case would be close to twice the interval, so aim for the batch duration to be less than half the target latency.
  • In the case where the throughput is the priority over latency, just pick the batch size that provides a higher average process rate and define an interval that allows some buffer over the observed batch duration.

Now you have an idea of the number of messages per core our ETL can handle and the latency. These numbers are idealistic because the system won’t scale perfectly linearly when you add more partitions and nodes. You can use the messages per core obtained to divide the total number of messages per second to process and get the minimum number of Spark partitions needed (each core handles one partition in parallel).

With this number of estimated Spark cores, calculate the number of nodes needed depending on the type and version, as summarized in the following table.

AWS Glue Version Worker Type vCores Spark Cores per Worker
2 G 1X 4 8
2 G 2X 8 16
3 G 0.25X 2 2
3 G 1X 4 4
3 G 2X 8 8

Using the newer version 3 is preferable because it includes more optimizations and features like auto scaling (which we discuss later). Regarding size, unless the job has some operation that is heavy on memory, it’s preferable to use the smaller instances so there aren’t so many cores competing for memory, disk, and network shared resources.

Spark cores are equivalent to threads; therefore, you can have more (or less) than the actual cores available in the instance. This doesn’t mean that having more Spark cores is going to necessarily be faster if they’re not backed by physical cores, it just means you have more parallelism competing for the same CPU.

Sizing the cluster when you control the input message system

This is the ideal case because you can optimize the performance and the efficiency as needed.

With the benchmark information you just gathered, you can define your initial AWS Glue cluster size and configure Kafka or Kinesis with the number of partitions or topics estimated, plus some buffer. Test this baseline setup and adjust as needed until the job can comfortably meet the total volume and required latency.

For instance, if we have determined that we need 32 cores to be well within the latency requirement for the volume of data to process, then we can create an AWS Glue 3.0 cluster with 9 G.1X nodes (a driver and 8 workers with 4 cores = 32) which reads from a Kinesis data stream with 32 shards.

Imagine that the volume of data in that stream doubles and we want to keep the latency requirements. To do so, we double the number of workers (16 + 1 driver = 17) and the number of shards on the stream (now 64). Remember this is just a reference and needs to be validated; in practice you might need more or less nodes depending on the cluster size, if the destination system can keep up, complexity of transformations, or other parameters.

Sizing the cluster when you don’t control the message system configuration

In this case, your options for tuning are much more limited.

Check if a cluster with the same number of Spark cores as existing partitions (determined by the message system) is able to keep up with the expected volume of data and latency, plus some allowance for peak times.

If that’s not the case, adding more nodes alone won’t help. You need to repartition the incoming data inside AWS Glue. This operation adds an overhead to redistribute the data internally, but it’s the only way the job can scale out in this scenario.

Let’s illustrate with an example. Imagine we have a Kinesis data stream with one shard that we don’t control, and there isn’t enough volume to justify asking the owner to increase the shards. In the cluster, significant computing for each message is needed; for each message, it runs heuristics and other ML techniques to take action depending on the calculations. After running some benchmarks, the calculations can be done promptly for the expected volume of messages using 8 cores working in parallel. By default, because there is only one shard, only one core will process all the messages sequentially.

To solve this scenario, we can provision an AWS Glue 3.0 cluster with 3 G 1X nodes to have 8 worker cores available. In the code repartition, the batch distributes the messages randomly (as evenly as possible) between them:

def batch_function(data_frame, batch_id):
    # Repartition so the udf is called in parallel for each partition
    data_frame.repartition(8).foreach(process_event_udf)

glueContext.forEachBatch(frame=streaming_df, batch_function=batch_function)

If the messaging system resizes the number of partitions or shards, the job picks up this change on the next batch. You might need to adjust the cluster capacity accordingly with the new data volume.

The streaming job is able to process more partitions than Spark cores are available, but might cause inefficiencies because the additional partitions will be queued and won’t start being processed until others finish. This might result in many nodes being idle while the remaining partitions finish and the next batch can be triggered.

When the messages have processing interdependencies

If the messages to be processed depend on other messages (either in the same or previous batches), that’s likely to be a limiting factor on the scalability. In that case, it might help to analyze a batch (job in Spark UI) to see where the time is spent and if there are imbalances by checking the task duration percentiles on the Stages tab (you can also reach this page by choosing a stage on the Jobs tab).

Auto scaling

Up to now, you have seen sizing methods to handle a stable stream of data with the occasional peak.
However, for variable incoming volumes of data, this isn’t cost-effective because you need to size for the worst-case scenario or accept higher latency at peak times.

This is where AWS Glue Streaming 3.0 auto scaling comes in. You can enable it for the job and define the maximum number of workers you want to allow (for example, using the number you have determined needed for the peak times).

The runtime monitors the trend of time spent on batch processing and compares it with the configured interval. Based on that, it makes a decision to increase or decrease the number of workers as needed, being more aggressive as the batch times get near or go over the allowed interval time.

The following screenshot is an example of a streaming job with auto scaling enabled.

Splitting workloads

You have seen how to scale a single job by adding nodes and partitioning the data as needed, which is enough on most cases. As the cluster grows, there is still a single driver and the nodes have to wait for the others to complete the batch before they can take additional work. If it reaches a point that increasing the cluster size is no longer effective, you might want to consider splitting the workload between separate jobs.

In the case of Kinesis, you need to divide the data into multiple streams, but for Apache Kafka, you can divide a topic into multiple jobs by assigning partitions to each one. To do so, instead of the usual subscribe or subscribePattern where the topics are listed, use the property assign to assign using JSON a subset of the topic partitions that the job will handle (for example, {"topic1": [0,1,2]}). At the time of this writing, it’s not possible to specify a range, so you need to list all the partitions, for instance building that list dynamically in the code.

Sizing down

For low volumes of traffic, AWS Glue Streaming has a special type of small node: G 0.25X, which provides two cores and 4 GB RAM for a quarter of the cost of a DPU, so it’s very cost-effective. However, even with that frugal capacity, if you have many small streams, having a small cluster for each one is still not practical.

For such situations, there are currently a few options:

  • Configure the stream DataFrame to feed from multiple Kafka topics or Kinesis streams. Then in the DataFrame, use the columns topic and streamName, for Kafka and Kinesis sources respectively, to determine how to handle the data (for example, different transformations or destinations). Make sure the DataFrame is cached, so you don’t read the data multiple times.
  • If you have a mix of Kafka and Kinesis sources, you can define a DataFrame for each, join them, and process as needed using the columns mentioned in the previous point.
  • The preceding two cases require all the sources to have the same batch interval and links their processing (for example, a busier stream can delay a slower one). To have independent stream processing inside the same cluster, you can trigger the processing of separate stream’s DataFrames using separate threads. Each stream is monitored separately in the Spark UI, but you’re responsible for starting and managing those threads and handle errors.

Settings

In this post, we showed some config settings that impact performance. The following table summarizes the ones we discussed and other important config properties to use when creating the input stream DataFrame.

Property Applies to Remarks
maxOffsetsPerTrigger Kafka Limit of messages per batch. Divides the limit evenly among partitions.
kinesis.executor.maxFetchRecordsPerShard Kinesis Limit per each shard, therefore should be revised if the number of shards changes.
kinesis.executor.maxFetchTimeInMs Kinesis When increasing the batch size (either by increasing the batch interval or the previous property), the executor might need more time, allotted by this property.
startingOffsets Kafka Normally you want to read all the data available and therefore use earliest. However, if there is a big backlog, the system might take a long time to catch up and instead use latest to skip the history.
startingposition Kinesis Similar to startingOffsets, in this case the values to use are TRIM_HORIZON to backload and LATEST to start processing from now on.
includeHeaders Kafka Enable this flag if you need to merge and split multiple topics in the same job (see the previous section for details).
kinesis.executor.maxconnections Kinesis When writing to Kinesis, by default it uses a single connection. Increasing this might improve performance.
kinesis.client.avoidEmptyBatches Kinesis It’s best to set it to true to avoid wasting resources (for example, generating empty files) when there is no data (like the Kafka connector does). GlueContext.forEachBatch prevents empty batches by default.

Further optimizations

In general, it’s worth doing some compression on the messages to save on transfer time (at the expense of some CPU, depending on the compression type used).

If the producer compresses the messages individually, AWS Glue can detect it and decompress automatically in most cases, depending on the format and type. For more information, refer to Adding Streaming ETL Jobs in AWS Glue.

If using Kafka, you have the option to compress the topic. This way, the compression is more effective because it’s done in batches, end-to-end, and it’s transparent to the producer and consumer.

By default, the GlueContext.forEachBatch function caches the incoming data. This is helpful if the data needs to be sent to multiple sinks (for example, as Amazon S3 files and also to update a DynamoDB table) because otherwise the job would read the data multiple times from the source. But it can be detrimental to performance if the volume of data is big and there is only one output.

To disable this option, set persistDataFrame as false:

glueContext.forEachBatch(
    frame=myStreamDataFrame,
    batch_function=processBatch,
    options={
        "windowSize": "30 seconds",
        "checkpointLocation": myCheckpointPath,
        "persistDataFrame":  "false"
    }
)

In streaming jobs, it’s common to have to join streaming data with another DataFrame to do enrichment (for example, lookups). In that case, you want to avoid any shuffle if possible, because it splits stages and causes data to be moved between nodes.

When the DataFrame you’re joining to is relatively small to fit in memory, consider using a broadcast join. However, bear in mind it will be distributed to the nodes on every batch, so it might not be worth it if the batches are too small.

If you need to shuffle, consider enabling the Kryo serializer (if using custom serializable classes you need to register them first to use it).

As in any AWS Glue jobs, avoid using custom udf() if you can do the same with the provided API like Spark SQL. User-defined functions (UDFs) prevent the runtime engine from performing many optimizations (the UDF code is a black box for the engine) and in the case of Python, it forces the movement of data between processes.

Avoid generating too many small files (especially columnar like Parquet or ORC, which have overhead per file). To do so, it might be a good idea to coalesce the micro-batch DataFrame before writing the output. If you’re writing partitioned data to Amazon S3, repartition based on columns can significantly reduce the number of output files created.

Conclusion

In this post, you saw how to approach sizing and tuning an AWS Glue streaming job in different scenarios, including planning considerations, configuration, monitoring, tips, and pitfalls.

You can now use these techniques to monitor and improve your existing streaming jobs or use them when designing and building new ones.


About the author

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.

How Epos Now modernized their data platform by building an end-to-end data lake with the AWS Data Lab

Post Syndicated from Debadatta Mohapatra original https://aws.amazon.com/blogs/big-data/how-epos-now-modernized-their-data-platform-by-building-an-end-to-end-data-lake-with-the-aws-data-lab/

Epos Now provides point of sale and payment solutions to over 40,000 hospitality and retailers across 71 countries. Their mission is to help businesses of all sizes reach their full potential through the power of cloud technology, with solutions that are affordable, efficient, and accessible. Their solutions allow businesses to leverage actionable insights, manage their business from anywhere, and reach customers both in-store and online.

Epos Now currently provides real-time and near-real-time reports and dashboards to their merchants on top of their operational database (Microsoft SQL Server). With a growing customer base and new data needs, the team started to see some issues in the current platform.

First, they observed performance degradation for serving the reporting requirements from the same OLTP database with the current data model. A few metrics that needed to be delivered in real time (seconds after a transaction was complete) and a few metrics that needed to be reflected in the dashboard in near-real-time (minutes) took several attempts to load in the dashboard.

This started to cause operational issues for their merchants. The end consumers of reports couldn’t access the dashboard in a timely manner.

Cost and scalability also became a major problem because one single database instance was trying to serve many different use cases.

Epos Now needed a strategic solution to address these issues. Additionally, they didn’t have a dedicated data platform for doing machine learning and advanced analytics use cases, so they decided on two parallel strategies to resolve their data problems and better serve merchants:

  • The first was to rearchitect the near-real-time reporting feature by moving it to a dedicated Amazon Aurora PostgreSQL-Compatible Edition database, with a specific reporting data model to serve to end consumers. This will improve performance, uptime, and cost.
  • The second was to build out a new data platform for reporting, dashboards, and advanced analytics. This will enable use cases for internal data analysts and data scientists to experiment and create multiple data products, ultimately exposing these insights to end customers.

In this post, we discuss how Epos Now designed the overall solution with support from the AWS Data Lab. Having developed a strong strategic relationship with AWS over the last 3 years, Epos Now opted to take advantage of the AWS Data lab program to speed up the process of building a reliable, performant, and cost-effective data platform. The AWS Data Lab program offers accelerated, joint-engineering engagements between customers and AWS technical resources to create tangible deliverables that accelerate data and analytics modernization initiatives.

Working with an AWS Data Lab Architect, Epos Now commenced weekly cadence calls to come up with a high-level architecture. After the objective, success criteria, and stretch goals were clearly defined, the final step was to draft a detailed task list for the upcoming 3-day build phase.

Overview of solution

As part of the 3-day build exercise, Epos Now built the following solution with the ongoing support of their AWS Data Lab Architect.

Epos Now Arch Image

The platform consists of an end-to-end data pipeline with three main components:

  • Data lake – As a central source of truth
  • Data warehouse – For analytics and reporting needs
  • Fast access layer – To serve near-real-time reports to merchants

We chose three different storage solutions:

  • Amazon Simple Storage Service (Amazon S3) for raw data landing and a curated data layer to build the foundation of the data lake
  • Amazon Redshift to create a federated data warehouse with conformed dimensions and star schemas for consumption by Microsoft Power BI, running on AWS
  • Aurora PostgreSQL to store all the data for near-real-time reporting as a fast access layer

In the following sections, we go into each component and supporting services in more detail.

Data lake

The first component of the data pipeline involved ingesting the data from an Amazon Managed Streaming for Apache Kafka (Amazon MSK) topic using Amazon MSK Connect to land the data into an S3 bucket (landing zone). The Epos Now team used the Confluent Amazon S3 sink connector to sink the data to Amazon S3. To make the sink process more resilient, Epos Now added the required configuration for dead-letter queues to redirect the bad messages to another topic. The following code is a sample configuration for a dead-letter queue in Amazon MSK Connect:

Because Epos Now was ingesting from multiple data sources, they used Airbyte to transfer the data to a landing zone in batches. A subsequent AWS Glue job reads the data from the landing bucket , performs data transformation, and moves the data to a curated zone of Amazon S3 in optimal format and layout. This curated layer then became the source of truth for all other use cases. Then Epos Now used an AWS Glue crawler to update the AWS Glue Data Catalog. This was augmented by the use of Amazon Athena for doing data analysis. To optimize for cost, Epos Now defined an optimal data retention policy on different layers of the data lake to save money as well as keep the dataset relevant.

Data warehouse

After the data lake foundation was established, Epos Now used a subsequent AWS Glue job to load the data from the S3 curated layer to Amazon Redshift. We used Amazon Redshift to make the data queryable in both Amazon Redshift (internal tables) and Amazon Redshift Spectrum. The team then used dbt as an extract, load, and transform (ELT) engine to create the target data model and store it in target tables and views for internal business intelligence reporting. The Epos Now team wanted to use their SQL knowledge to do all ELT operations in Amazon Redshift, so they chose dbt to perform all the joins, aggregations, and other transformations after the data was loaded into the staging tables in Amazon Redshift. Epos Now is currently using Power BI for reporting, which was migrated to the AWS Cloud and connected to Amazon Redshift clusters running inside Epos Now’s VPC.

Fast access layer

To build the fast access layer to deliver the metrics to Epos Now’s retail and hospitality merchants in near-real time, we decided to create a separate pipeline. This required developing a microservice running a Kafka consumer job to subscribe to the same Kafka topic in an Amazon Elastic Kubernetes Service (Amazon EKS) cluster. The microservice received the messages, conducted the transformations, and wrote the data to a target data model hosted on Aurora PostgreSQL. This data was delivered to the UI layer through an API also hosted on Amazon EKS, exposed through Amazon API Gateway.

Outcome

The Epos Now team is currently building both the fast access layer and a centralized lakehouse architecture-based data platform on Amazon S3 and Amazon Redshift for advanced analytics use cases. The new data platform is best positioned to address scalability issues and support new use cases. The Epos Now team has also started offloading some of the real-time reporting requirements to the new target data model hosted in Aurora. The team has a clear strategy around the choice of different storage solutions for the right access patterns: Amazon S3 stores all the raw data, and Aurora hosts all the metrics to serve real-time and near-real-time reporting requirements. The Epos Now team will also enhance the overall solution by applying data retention policies in different layers of the data platform. This will address the platform cost without losing any historical datasets. The data model and structure (data partitioning, columnar file format) we designed greatly improved query performance and overall platform stability.

Conclusion

Epos Now revolutionized their data analytics capabilities, taking advantage of the breadth and depth of the AWS Cloud. They’re now able to serve insights to internal business users, and scale their data platform in a reliable, performant, and cost-effective manner.

The AWS Data Lab engagement enabled Epos Now to move from idea to proof of concept in 3 days using several previously unfamiliar AWS analytics services, including AWS Glue, Amazon MSK, Amazon Redshift, and Amazon API Gateway.

Epos Now is currently in the process of implementing the full data lake architecture, with a rollout to customers planned for late 2022. Once live, they will deliver on their strategic goal to provide real-time transactional data and put insights directly in the hands of their merchants.


About the Authors

Jason Downing is VP of Data and Insights at Epos Now. He is responsible for the Epos Now data platform and product direction. He specializes in product management across a range of industries, including POS systems, mobile money, payments, and eWallets.

Debadatta Mohapatra is an AWS Data Lab Architect. He has extensive experience across big data, data science, and IoT, across consulting and industrials. He is an advocate of cloud-native data platforms and the value they can drive for customers across industries.

Use SQL queries to define Amazon Redshift datasets in AWS Glue DataBrew

Post Syndicated from Suraj Shivananda original https://aws.amazon.com/blogs/big-data/use-sql-queries-to-define-amazon-redshift-datasets-in-aws-glue-databrew/

In the post Data preparation using Amazon Redshift with AWS Glue DataBrew, we saw how to create an AWS Glue DataBrew job using a JDBC connection for Amazon Redshift. In this post, we show you how to create a DataBrew profile job and a recipe job using an Amazon Redshift connection with custom SQL.

DataBrew is a visual data preparation tool that can help you simplify your extract, transform, and load (ETL) process. You can now define a dataset from Amazon Redshift by applying custom SQL statements. Applying a custom SQL statement to a large source table allows you to select, join, and filter the data before cleaning, normalizing, and transforming it in a DataBrew project. Filtering and joining the data from your data source and only bringing in the data you want to transform simplifies the ETL process.

In this post, we demonstrate how to use custom SQL queries to define your Amazon Redshift datasets in DataBrew.

Solution overview

To implement this solution, you complete the following high-level steps:

  1. Create an Amazon Redshift connection.
  2. Create your dataset and use SQL queries to define your Amazon Redshift source datasets.
  3. Create a DataBrew profile job to profile the source data.
  4. Create a DataBrew project and recipe job to transform the data and load it to Amazon Simple Storage Service (Amazon S3).

The following diagram illustrates the architecture for our solution.

Prerequisites

To use this solution, complete the following prerequisite steps:

  1. Have an AWS account.
  2. Create an Amazon Redshift cluster in a private subnet within a VPC as a security best practice.
  3. Because DataBrew commands require that the cluster has access to Amazon S3, make sure you create a gateway VPC endpoint to Amazon S3. The gateway endpoint provides reliable connectivity to Amazon S3 without requiring an internet gateway or NAT device from your VPC.
  4. Enable the enhanced VPC routing in the Amazon Redshift cluster. Enhanced VPC routing forces all Amazon Redshift commands to use the connectivity to the gateway VPC endpoint to Amazon S3 in the same AWS Region as your cluster.
  5. Create a database and tables, and load the sample data in the Amazon Redshift cluster.
  6. Prepare a SQL query to extract the source dataset. You use this SQL query later in this post to create an Amazon Redshift source dataset in DataBrew.
  7. Create an S3 bucket to store data from the profile and recipe jobs. The DataBrew connection temporarily stores intermediate data in Amazon S3.
  8. For our use case, we use a mock dataset. You can download the DDL and data files from GitHub.

Security best practices

Consider the following best practices in order to mitigate security threats:

  • Review the shared responsibility model when using DataBrew.
  • Restrict network access for inbound and outbound traffic to least privilege. Take advantage of the routing traffic within the VPC by using an Amazon S3 gateway endpoint and enhanced VPC routing in Amazon Redshift.
  • Enable the lifecycle policy in Amazon S3 to retain only necessary data, and delete unnecessary data.
  • Enable Amazon S3 versioning and cross-Region replication for critical datasets to protect against accidental deletes.
  • Enable server-side encryption using AWS KMS (SSE-KMS) or Amazon S3 (SSE-S3).
  • DataBrew uses Amazon CloudWatch for logging, so you should update your log retention period to retain logs for the appropriate length of time.

Create an Amazon Redshift connection

In this section, you create a connection in DataBrew to connect to your Amazon Redshift cluster.

  1. On the DataBrew console, choose Datasets in the navigation pane.
  2. On the Connections tab, choose Create connection.
  3. For Connection name, enter a name, such as order-db-connection.
  4. For Connection type, select Amazon Redshift.
  5. Under Connection access, provide the Amazon Redshift cluster name, database name, database user, and database password.
  6. Choose Create connection.

Create your dataset by applying a custom SQL statement to filter the source data

In this section, you create a Amazon Redshift connection, add your custom SQL statement, and validate it. You can also validate your SQL statement directly in your Amazon Redshift cluster by using the Amazon Redshift query editor v2. The purpose of validating the SQL statement is to help you avoid failure in loading your dataset into a project or job. Also, checking the query runtime ensures that it runs in under 3 minutes, avoiding timeouts during project loading. To analyze and improve query performance in Amazon Redshift, see Tuning query performance.

  1. On the DataBrew console, choose Datasets in the navigation pane.
  2. On the Datasets tab, choose Connect new dataset.
  3. For Dataset name, enter a name, such as order-data.
  4. In the left pane, choose Amazon Redshift under Database connections.
  5. Add your Amazon Redshift connection and select Enter custom SQL.
  6. Enter the SQL query and choose Validate SQL.
  7. Under Additional configurations, for Enter S3 destination, provide an S3 destination to temporarily store the intermediate results.
  8. Choose Create dataset.

Create a DataBrew profile job

In this section, you use the newly created Amazon Redshift dataset to create a profile job. Data profiling helps you understand your dataset and plan the data preparation steps needed in running your recipe jobs.

  1. On the DataBrew console, choose Jobs in the navigation pane.
  2. On the Profile jobs tab, choose Create job.
  3. For Job name, enter a name, such as order-data-profile-job.
  4. For Job type¸ select Create a profile job.
  5. Under Job input, choose Browse datasets and choose the dataset you created earlier (order-data).
  6. For Data sample, select Full dataset.
  7. Under Job output settings¸ for S3 location, enter the S3 bucket for the job output files.
  8. For Role name, choose an AWS Identity and Access Management (IAM) role with permission for DataBrew to connect to the data on your behalf. For more information, refer to Adding an IAM role with data resource permissions.
  9. Choose Create and run job.

Check the status of your profile job. A profile output file is created and stored in Amazon S3 upon completion. You can choose View data profile to see more information.

In addition to an output file, DataBrew also provides visualizations. On the Dataset profile overview tab, you can see data visualizations that can help you understand your data better. Next, you can see detailed statistics about your data on the Column statistics tab, illustrated with graphics and charts. You can define data quality rules on the Data quality rules tab, and then see the results from the data quality ruleset that applies to this dataset.

For example, in the following screenshot, the amount column has 2% missing values, as shown on the Column statistics tab. You can provide rules that avoid triggering a recipe job in case of an anomaly. You can also notify the source teams to handle or acknowledge the missing values. DataBrew users can also add steps in the recipe job to handle the anomalies and missing values.

Create a DataBrew project and recipe job

In this section, you start analyzing and transforming your Amazon Redshift dataset in a DataBrew project. The custom SQL statement runs in Amazon Redshift when the project is loaded. Databrew performs read-only access to your source data.

Create a project

To create your project, complete the following steps:

  1. On the DataBrew console, choose Projects in the navigation pane.
  2. Choose Create project.
  3. For Project name, enter a name, such as order-data-proj.
  4. Under Recipe details¸ choose Create new recipe and enter a recipe name, such as order-data-proj-recipe.
  5. For Select a dataset, select My datasets.
  6. Select the dataset you created earlier (order-data).
  7. Under Permissions, for Role name, choose your DataBrew role.
  8. Choose Create project.

DataBrew starts a session, constructs a DataFrame, extracts sample data, infers basic statistics, and displays the sample data in a grid view. You can add steps to build a transformation recipe. As of this writing, DataBrew offers over 350 transformations, with more on the way.

For our example use case, Company ABC has set a target to ship all orders within 7 days after the order date (internal SLA). They want a list of orders that didn’t meet the 7-day SLA for additional investigation. The following sample recipe contains steps to handle the missing values, filter the values by amount, change the date format, calculate the date difference, and filter the values by shipping days. The detailed steps are as follows:

  1. Fill missing values with 0 for the amount column.
  2. Filter values by amount greater than 0.
  3. Change the format of order_timestamp to align with ship_date.
  4. Create a new column called days_for_shipping using the dateTime function DATEDIFF to show the difference between order_timestamp and ship_date in days.
  5. Filter the values by days_for_shipping greater than 7.

Create a recipe job

To create your DataBrew recipe job, complete the following steps:

  1. On the DataBrew console, choose Jobs in the navigation pane.
  2. Choose Create job.
  3. For Job name¸ enter a name, such as SHIPPING-SLA-MISS.
  4. Under Job output settings, configure your Amazon S3 output settings.
  5. For S3 location, enter the location of your output bucket.
  6. For Role name, choose the IAM role that contains permissions for DataBrew to connect on your behalf.
  7. Choose Create and run job.

You can check the status of your job on the Jobs page.

The output file is in Amazon S3 as specified, and your data transformation is now complete.

Clean up

To avoid incurring future charges, we recommend deleting the resources you created during this walkthrough.

Conclusion

In this post, we walked through applying custom SQL statements to an Amazon Redshift data source in your dataset, which you can use in profiling and transformation jobs. You can now focus on building your data transformation steps knowing that you’re working on only the needed data.

To learn more about the various supported data sources for DataBrew, see Connecting to data with AWS Glue DataBrew.


About the authors

Suraj Shivananda is a Solutions Architect at AWS. He has over a decade of experience in Software Engineering, Data and Analytics, DevOps specifically for data solutions, automating and optimizing cloud based solutions. He’s a trusted technical advisor and helps customers build Well Architected solutions on the AWS platform.

Marie Yap is a Principal Solutions Architect for Amazon Web Services based in Hawaii. In this role, she helps various organizations begin their journey to the cloud. She also specializes in analytics and modern data architectures.

Dhiraj Thakur is a Solutions Architect with Amazon Web Services. He works with AWS customers and partners to provide guidance on enterprise cloud adoption, migration, and strategy. He is passionate about technology and enjoys building and experimenting in the analytics and AI/ML space.

Process Apache Hudi, Delta Lake, Apache Iceberg dataset at scale, part 2: Using AWS Glue Studio Visual Editor

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/part-2-integrate-apache-hudi-delta-lake-apache-iceberg-dataset-at-scale-using-aws-glue-studio-visual-editor/

Transactional data lake technologies such as Apache Hudi, Delta Lake, Apache Iceberg, and AWS Lake Formation governed tables is evolving rapidly, and gaining great popularity. These technologies simplified the data processing pipeline significantly, and they provided further useful capabilities like upserts, rolling back, and time travel queries.

In the first post of this series, we went through how to process Apache Hudi, Delta Lake, and Apache Iceberg datasets using AWS Glue connectors. AWS Glue simplifies reading and writing your data in those data lake formats, and building the data lakes on top of those technologies. Running the sample notebooks on AWS Glue Studio notebook, you could interactively develop and run your code, then immediately see the results. The notebooks let you explore how those technologies work when you have coding experience.

This second post focuses on other use cases for customers who prefer visual job authoring without writing custom code. Even without coding experience, you can easily build your transactional data lakes on AWS Glue Studio visual editor, and take advantage of those transactional data lake technologies. In addition, you can also use Amazon Athena to query the data stored using Hudi and Iceberg. This tutorial demonstrates how to read and write each format on AWS Glue Studio visual editor, and then how to query from Athena.

Process Apache Hudi, Delta Lake, Apache Iceberg dataset at scale

Prerequisites

The following are the instructions to read/write tables using each data lake format on AWS Glue Studio Visual Editor. You can use any of the marketplace connector or the custom connector based on your requirements.

To continue this tutorial, you must create the following AWS resources in advance:

Reads/writes using the connector on AWS Glue Studio Visual Editor

In this tutorial, you read and write each of the transaction data lake format data on the AWS Glue Studio Visual Editor. There are three main configurations: connection, connection options, and job parameters that you must configure per the data lake format. Note that no code is included in this tutorial. Let’s see how it works.

Apache Hudi writes

Complete following steps to write into Apache Hudi table using the connector:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose Visual with a source and target.
  4. For Source, choose Amazon S3.
  5. For Target, choose hudi-0101-byoc-connector.
  6. Choose Create.
  7. Under Visual, choose Data source – S3 bucket.
  8. Under Node properties, for S3 source type, choose S3 location.
  9. For S3 URL, enter s3://covid19-lake/rearc-covid-19-world-cases-deaths-testing/json/.
  10. Choose Data target – Connector.
  11. Under Node properties, for Connection, choose hudi-0101-byoc-connection.
  12. For Connection options, enter the following pairs of Key and Value (choose Add new option to enter a new pair).
    1. Key: path. Value: <Your S3 path for Hudi table location>
    2. Key: hoodie.table.name, Value: test
    3. Key: hoodie.datasource.write.storage.type, Value: COPY_ON_WRITE
    4. Key: hoodie.datasource.write.operation, Value: upsert
    5. Key: hoodie.datasource.write.recordkey.field, Value: location
    6. Key: hoodie.datasource.write.precombine.field, Value: date
    7. Key: hoodie.datasource.write.partitionpath.field, Value: iso_code
    8. Key: hoodie.datasource.write.hive_style_partitioning, Value: true
    9. Key: hoodie.datasource.hive_sync.enable, Value: true
    10. Key: hoodie.datasource.hive_sync.database, Value: hudi
    11. Key: hoodie.datasource.hive_sync.table, Value: test
    12. Key: hoodie.datasource.hive_sync.partition_fields, Value: iso_code
    13. Key: hoodie.datasource.hive_sync.partition_extractor_class, Value: org.apache.hudi.hive.MultiPartKeysValueExtractor
    14. Key: hoodie.datasource.hive_sync.use_jdbc, Value: false
    15. Key: hoodie.datasource.hive_sync.mode, Value: hms
  13. Under Job details, for IAM Role, choose your IAM role.
  14. Under Advanced properties, for Job parameters, choose Add new parameter.
  15. For Key, enter --conf.
  16. For Value, enter spark.serializer=org.apache.spark.serializer.KryoSerializer.
  17. Choose Save.
  18. Choose Run.

Apache Hudi reads

Complete following steps to read from the Apache Hudi table that you created in the previous section using the connector:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose Visual with a source and target.
  4. For Source, choose hudi-0101-byoc-connector.
  5. For Target, choose Amazon S3.
  6. Choose Create.
  7. Under Visual, choose Data source – Connection.
  8. Under Node properties, for Connection, choose hudi-0101-byoc-connection.
  9. For Connection options, choose Add new option.
  10. For Key, enter path. For Value, enter your S3 path for your Hudi table that you created in the previous section.
  11. Choose Transform – ApplyMapping, and choose Remove.
  12. Choose Data target – S3 bucket.
  13. Under Data target properties, for Format, choose JSON.
  14. For S3 Target type. choose S3 location.
  15. For S3 Target Location enter your S3 path for output location.
  16. Under Job details, for IAM Role, choose your IAM role.
  17. Choose Save.
  18. Choose Run.

Delta Lake writes

Complete the following steps to write into the Delta Lake table using the connector:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose Visual with a source and target.
  4. For Source, choose Amazon S3.
  5. For Target, choose delta-100-byoc-connector.
  6. Choose Create.
  7. Under Visual, choose Data source – S3 bucket.
  8. Under Node properties, for S3 source type, choose S3 location.
  9. For S3 URL, enter s3://covid19-lake/rearc-covid-19-world-cases-deaths-testing/json/.
  10. Choose Data target – Connector.
  11. Under Node properties, for Connection, choose your delta-100-byoc-connection.
  12. For Connection options, choose Add new option.
  13. For Key, enter path. For Value, enter your S3 path for Delta table location. Choose Add new option.
  14. For Key, enter partitionKeys. For Value, enter iso_code.
  15. Under Job details, for IAM Role, choose your IAM role.
  16. Under Advanced properties, for Job parameters, choose Add new parameter.
  17. For Key, enter --conf.
  18. For Value, enter spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog.
  19. Choose Save.
  20. Choose Run.

Delta Lake reads

Complete the following steps to read from the Delta Lake table that you created in the previous section using the connector:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose Visual with a source and target.
  4. For Source, choose delta-100-byoc-connector.
  5. For Target, choose Amazon S3.
  6. Choose Create.
  7. Under Visual, choose Data source – Connection.
  8. Under Node properties, for Connection, choose delta-100-byoc-connection.
  9. For Connection options, choose Add new option.
  10. For Key, enter path. For Value, enter your S3 path for Delta table that you created in the previous section. Choose Add new option.
  11. For Key, enter partitionKeys. For Value, enter iso_code.
  12. Choose Transform – ApplyMapping, and choose Remove.
  13. Choose Data target – S3 bucket.
  14. Under Data target properties, for Format, choose JSON.
  15. For S3 Target type, choose S3 location.
  16. For S3 Target Location enter your S3 path for output location.
  17. Under Job details, for IAM Role, choose your IAM role.
  18. Under Advanced properties, for Job parameters, choose Add new parameter.
  19. For Key, enter --conf.
  20. For Value, enter spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog.
  21. Choose Save.
  22. Choose Run.

Apache Iceberg writes

Complete the following steps to write into Apache Iceberg table using the connector:

  1. Open AWS Glue console.
  2. Choose Databases.
  3. Choose Add database.
  4. For database name, enter iceberg, and choose Create.
  5. Open AWS Glue Studio.
  6. Choose Jobs.
  7. Choose Visual with a source and target.
  8. For Source, choose Amazon S3.
  9. For Target, choose iceberg-0131-byoc-connector.
  10. Choose Create.
  11. Under Visual, choose Data source – S3 bucket.
  12. Under Node properties, for S3 source type, choose S3 location.
  13. For S3 URL, enter s3://covid19-lake/rearc-covid-19-world-cases-deaths-testing/json/.
  14. Choose Data target – Connector.
  15. Under Node properties, for Connection, choose iceberg-0131-byoc-connection.
  16. For Connection options, choose Add new option.
  17. For Key, enter path. For Value, enter glue_catalog.iceberg.test.
  18. Choose SQL under Transform to create a new AWS Glue Studio node.
  19. Under Node properties, for Node parents, choose ApplyMapping.
  20. Under Transform, for SQL alias, verify that myDataSource is entered.
  21. For SQL query, enter CREATE TABLE glue_catalog.iceberg.test AS SELECT * FROM myDataSource WHERE 1=2. This is to create a table definition with no records because the Iceberg target requires table definition before data ingestion.
  22. Under Job details, for IAM Role, choose your IAM role.
  23. Under Advanced properties, for Job parameters, choose Add new parameter.
  24. For Key, enter --conf.
  25. For Value, enter the following value (replace the placeholder your_s3_bucket with your S3 bucket name): spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.warehouse=s3://your_s3_bucket/iceberg/warehouse --conf spark.sql.catalog.glue_catalog.catalog-impl --conf park.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.glue.DynamoLockManager --conf spark.sql.catalog.glue_catalog.lock.table=iceberg_lock --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
  26. Choose Save.
  27. Choose Run.

Apache Iceberg reads

Complete the following steps to read from Apache Iceberg table that you created in the previous section using the connector:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose Visual with a source and target.
  4. For Source, choose Apache Iceberg Connector for AWS Glue 3.0.
  5. For Target, choose Amazon S3.
  6. Choose Create.
  7. Under Visual, choose Data source – Connection.
  8. Under Node properties, for Connection, choose your Iceberg connection name.
  9. For Connection options, choose Add new option.
  10. For Key, enter path. For Value, enter glue_catalog.iceberg.test.
  11. Choose Transform – ApplyMapping, and choose Remove.
  12. Choose Data target – S3 bucket.
  13. Under Data target properties, for Format, choose JSON.
  14. For S3 Target type, choose S3 location.
  15. For S3 Target Location enter your S3 path for the output location.
  16. Under Job details, for IAM Role, choose your IAM role.
  17. Under Advanced properties, for Job parameters, choose Add new parameter.
  18. For Key, enter --conf.
  19. For Value, enter the following value (replace the placeholder your_s3_bucket with your S3 bucket name): spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.warehouse=s3://your_s3_bucket/iceberg/warehouse --conf spark.sql.catalog.glue_catalog.catalog-impl --conf park.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.glue.DynamoLockManager --conf spark.sql.catalog.glue_catalog.lock.table=iceberg_lock --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
  20. Choose Save.
  21. Choose Run.

Query from Athena

The Hudi table and the iceberg tables created with the above instructions are also queryable from Athena.

  1. Open the Athena console.
  2. Run the following SQL to query the Hudi table:
    SELECT * FROM "hudi"."test" LIMIT 10

  3. Run the following SQL to query the Iceberg table:
    SELECT * FROM "iceberg"."test" LIMIT 10

If you want to query the Delta table from Athena, follow Presto, Trino, and Athena to Delta Lake integration using manifests.

Conclusion

This post summarized how to utilize Apache Hudi, Delta Lake, and Apache Iceberg on AWS Glue platform, as well as demonstrated how each format works with the AWS Glue Studio Visual Editor. You can start using those data lake formats easily in any of the AWS Glue DynamicFrames, Spark DataFrames, and Spark SQL on the AWS Glue jobs, the AWS Glue Studio notebooks, and the AWS Glue Studio visual editor.


About the Author

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He enjoys collaborating with different teams to deliver results like this post. In his spare time, he enjoys playing video games with his family.

Process Apache Hudi, Delta Lake, Apache Iceberg datasets at scale, part 1: AWS Glue Studio Notebook

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/part-1-integrate-apache-hudi-delta-lake-apache-iceberg-datasets-at-scale-aws-glue-studio-notebook/

Cloud data lakes provides a scalable and low-cost data repository that enables customers to easily store data from a variety of data sources. Data scientists, business analysts, and line of business users leverage data lake to explore, refine, and analyze petabytes of data. AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. Customers use AWS Glue to discover and extract data from a variety of data sources, enrich and cleanse the data before storing it in data lakes and data warehouses.

Over years, many table formats have emerged to support ACID transaction, governance, and catalog usecases. For example, formats such as Apache Hudi, Delta Lake, Apache Iceberg, and AWS Lake Formation governed tables, enabled customers to run ACID transactions on Amazon Simple Storage Service (Amazon S3). AWS Glue supports these table formats for batch and streaming workloads. This post focuses on Apache Hudi, Delta Lake, and Apache Iceberg, and summarizes how to use them in AWS Glue 3.0 jobs. If you’re interested in AWS Lake Formation governed tables, then visit Effective data lakes using AWS Lake Formation series.

Bring libraries for the data lake formats

Today, there are three available options for bringing libraries for the data lake formats on the AWS Glue job platform: Marketplace connectors, custom connectors (BYOL), and extra library dependencies.

Marketplace connectors

AWS Glue Connector Marketplace is the centralized repository for cataloging the available Glue connectors provided by multiple vendors. You can subscribe to more than 60 connectors offered in AWS Glue Connector Marketplace as of today. There are marketplace connectors available for Apache Hudi, Delta Lake, and Apache Iceberg. Furthermore, the marketplace connectors are hosted on Amazon Elastic Container Registry (Amazon ECR) repository, and downloaded to the Glue job system in runtime. When you prefer simple user experience by subscribing to the connectors and using them on your Glue ETL jobs, the marketplace connector is a good option.

Custom connectors as bring-your-own-connector (BYOC)

AWS Glue custom connector enables you to upload and register your own libraries located in Amazon S3 as Glue connectors. You have more control over the library versions, patches, and dependencies. Since it uses your S3 bucket, you can configure the S3 bucket policy to share the libraries only with specific users, you can configure private network access to download the libraries using VPC Endpoints, etc. When you prefer having more control over those configurations, the custom connector as BYOC is a good option.

Extra library dependencies

There is another option – to download the data lake format libraries, upload them to your S3 bucket, and add extra library dependencies to them. With this option, you can add libraries directly to the job without a connector and use them. In Glue job, you can configure in Dependent JARs path. In API, it’s the --extra-jars parameter. In Glue Studio notebook, you can configure in the %extra_jars magic. To download the relevant JAR files, see the library locations in the section Create a Custom connection (BYOC).

Create a Marketplace connection

To create a new marketplace connection for Apache Hudi, Delta Lake, or Apache Iceberg, complete the following steps.

Apache Hudi 0.10.1

Complete the following steps to create a marketplace connection for Apache Hudi 0.10.1:

  1. Open AWS Glue Studio.
  2. Choose Connectors.
  3. Choose Go to AWS Marketplace.
  4. Search for Apache Hudi Connector for AWS Glue, and choose Apache Hudi Connector for AWS Glue.
  5. Choose Continue to Subscribe.
  6. Review the Terms and conditions, pricing, and other details, and choose the Accept Terms button to continue.
  7. Make sure that the subscription is complete and you see the Effective date populated next to the product, and then choose Continue to Configuration.
  8. For Delivery Method, choose Glue 3.0.
  9. For Software version, choose 0.10.1.
  10. Choose Continue to Launch.
  11. Under Usage instructions, choose Activate the Glue connector in AWS Glue Studio. You’re redirected to AWS Glue Studio.
  12. For Name, enter a name for your connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Delta Lake 1.0.0

Complete the following steps to create a marketplace connection for Delta Lake 1.0.0:

  1. Open AWS Glue Studio.
  2. Choose Connectors.
  3. Choose Go to AWS Marketplace.
  4. Search for Delta Lake Connector for AWS Glue, and choose Delta Lake Connector for AWS Glue.
  5. Choose Continue to Subscribe.
  6. Review the Terms and conditions, pricing, and other details, and choose the Accept Terms button to continue.
  7. Make sure that the subscription is complete and you see the Effective date populated next to the product, and then choose Continue to Configuration.
  8. For Delivery Method, choose Glue 3.0.
  9. For Software version, choose 1.0.0-2.
  10. Choose Continue to Launch.
  11. Under Usage instructions, choose Activate the Glue connector in AWS Glue Studio. You’re redirected to AWS Glue Studio.
  12. For Name, enter a name for your connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Apache Iceberg 0.12.0

Complete the following steps to create a marketplace connection for Apache Iceberg 0.12.0:

  1. Open AWS Glue Studio.
  2. Choose Connectors.
  3. Choose Go to AWS Marketplace.
  4. Search for Apache Iceberg Connector for AWS Glue, and choose Apache Iceberg Connector for AWS Glue.
  5. Choose Continue to Subscribe.
  6. Review the Terms and conditions, pricing, and other details, and choose the Accept Terms button to continue.
  7. Make sure that the subscription is complete and you see the Effective date populated next to the product, and then choose Continue to Configuration.
  8. For Delivery Method, choose Glue 3.0.
  9. For Software version, choose 0.12.0-2.
  10. Choose Continue to Launch.
  11. Under Usage instructions, choose Activate the Glue connector in AWS Glue Studio. You’re redirected to AWS Glue Studio.
  12. For Name, enter iceberg-0120-mp-connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Create a Custom connection (BYOC)

You can create your own custom connectors from JAR files. In this section, you can see the exact JAR files that are used in the marketplace connectors. You can just use the files for your custom connectors for Apache Hudi, Delta Lake, and Apache Iceberg.

To create a new custom connection for Apache Hudi, Delta Lake, or Apache Iceberg, complete the following steps.

Apache Hudi 0.9.0

Complete following steps to create a custom connection for Apache Hudi 0.9.0:

  1. Download the following JAR files, and upload them to your S3 bucket.
    1. https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3-bundle_2.12/0.9.0/hudi-spark3-bundle_2.12-0.9.0.jar
    2. https://repo1.maven.org/maven2/org/apache/hudi/hudi-utilities-bundle_2.12/0.9.0/hudi-utilities-bundle_2.12-0.9.0.jar
    3. https://repo1.maven.org/maven2/org/apache/parquet/parquet-avro/1.10.1/parquet-avro-1.10.1.jar
    4. https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar
    5. https://repo1.maven.org/maven2/org/apache/calcite/calcite-core/1.10.0/calcite-core-1.10.0.jar
    6. https://repo1.maven.org/maven2/org/datanucleus/datanucleus-core/4.1.17/datanucleus-core-4.1.17.jar
    7. https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar
  2. Open AWS Glue Studio.
  3. Choose Connectors.
  4. Choose Create custom connector.
  5. For Connector S3 URL, enter comma separated Amazon S3 paths for the above JAR files.
  6. For Name, enter hudi-090-byoc-connector.
  7. For Connector Type, choose Spark.
  8. For Class name, enter org.apache.hudi.
  9. Choose Create connector.
  10. Choose hudi-090-byoc-connector.
  11. Choose Create connection.
  12. For Name, enter hudi-090-byoc-connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Apache Hudi 0.10.1

Complete the following steps to create a custom connection for Apache Hudi 0.9.0:

  1. Download following JAR files, and upload them to your S3 bucket.
    1. hudi-utilities-bundle_2.12-0.10.1.jar
    2. hudi-spark3.1.1-bundle_2.12-0.10.1.jar
    3. spark-avro_2.12-3.1.1.jar
  2. Open AWS Glue Studio.
  3. Choose Connectors.
  4. Choose Create custom connector.
  5. For Connector S3 URL, enter comma separated Amazon S3 paths for the above JAR files.
  6. For Name, enter hudi-0101-byoc-connector.
  7. For Connector Type, choose Spark.
  8. For Class name, enter org.apache.hudi.
  9. Choose Create connector.
  10. Choose hudi-0101-byoc-connector.
  11. Choose Create connection.
  12. For Name, enter hudi-0101-byoc-connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Note that the above Hudi 0.10.1 installation on Glue 3.0 does not fully support Merge On Read (MoR) tables.

Delta Lake 1.0.0

Complete the following steps to create a custom connector for Delta Lake 1.0.0:

  1. Download the following JAR file, and upload it to your S3 bucket.
    1. https://repo1.maven.org/maven2/io/delta/delta-core_2.12/1.0.0/delta-core_2.12-1.0.0.jar
  2. Open AWS Glue Studio.
  3. Choose Connectors.
  4. Choose Create custom connector.
  5. For Connector S3 URL, enter a comma separated Amazon S3 path for the above JAR file.
  6. For Name, enter delta-100-byoc-connector.
  7. For Connector Type, choose Spark.
  8. For Class name, enter org.apache.spark.sql.delta.sources.DeltaDataSource.
  9. Choose Create connector.
  10. Choose delta-100-byoc-connector.
  11. Choose Create connection.
  12. For Name, enter delta-100-byoc-connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Apache Iceberg 0.12.0

Complete the following steps to create a custom connection for Apache Iceberg 0.12.0:

  1. Download the following JAR files, and upload them to your S3 bucket.
    1. https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark3-runtime/0.12.0/iceberg-spark3-runtime-0.12.0.jar
    2. https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.15.40/bundle-2.15.40.jar
    3. https://repo1.maven.org/maven2/software/amazon/awssdk/url-connection-client/2.15.40/url-connection-client-2.15.40.jar
  2. Open AWS Glue Studio.
  3. Choose Connectors.
  4. Choose Create custom connector.
  5. For Connector S3 URL, enter comma separated Amazon S3 paths for the above JAR files.
  6. For Name, enter iceberg-0120-byoc-connector.
  7. For Connector Type, choose Spark.
  8. For Class name, enter iceberg.
  9. Choose Create connector.
  10. Choose iceberg-0120-byoc-connector.
  11. Choose Create connection.
  12. For Name, enter iceberg-0120-byoc-connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Apache Iceberg 0.13.1

Complete the following steps to create a custom connection for Apache Iceberg 0.13.1:

  1. Download the following JAR files, and upload them to your S3 bucket.
    1. iceberg-spark-runtime-3.1_2.12-0.13.1.jar
    2. https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.161/bundle-2.17.161.jar
    3. https://repo1.maven.org/maven2/software/amazon/awssdk/url-connection-client/2.17.161/url-connection-client-2.17.161.jar
  2. Open AWS Glue Studio.
  3. Choose Connectors.
  4. Choose Create custom connector.
  5. For Connector S3 URL, enter comma separated Amazon S3 paths for the above JAR files.
  6. For Name, enter iceberg-0131-byoc-connector.
  7. For Connector Type, choose Spark.
  8. For Class name, enter iceberg.
  9. Choose Create connector.
  10. Choose iceberg-0131-byoc-connector.
  11. Choose Create connection.
  12. For Name, enter iceberg-0131-byoc-connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Prerequisites

To continue this tutorial, you must create the following AWS resources in advance:

  • AWS Identity and Access Management (IAM) role for your ETL job or notebook as instructed in Set up IAM permissions for AWS Glue Studio. Note that AmazonEC2ContainerRegistryReadOnly or equivalent permissions are needed when you use the marketplace connectors.
  • Amazon S3 bucket for storing data.
  • Glue connection (one of the marketplace connector or the custom connector corresponding to the data lake format).

Reads/writes using the connector on AWS Glue Studio Notebook

The following are the instructions to read/write tables using each data lake format on AWS Glue Studio Notebook. As a prerequisite, make sure that you have created a connector and a connection for the connector using the information above.
The example notebooks are hosted on AWS Glue Samples GitHub repository. You can find 7 notebooks available. In the following instructions, we will use one notebook per data lake format.

Apache Hudi

To read/write Apache Hudi tables in the AWS Glue Studio notebook, complete the following:

  1. Download hudi_dataframe.ipynb.
  2. Open AWS Glue Studio.
  3. Choose Jobs.
  4. Choose Jupyter notebook and then choose Upload and edit an existing notebook. From Choose file, select your ipynb file and choose Open, then choose Create.
  5. On the Notebook setup page, for Job name, enter your job name.
  6. For IAM role, select your IAM role. Choose Create job. After a short time period, the Jupyter notebook editor appears.
  7. In the first cell, replace the placeholder with your Hudi connection name, and run the cell:
    %connections hudi-0101-byoc-connection (Alternatively you can use your connection name created from the marketplace connector).
  8. In the second cell, replace the S3 bucket name placeholder with your S3 bucket name, and run the cell.
  9. Run the cells in the section Initialize SparkSession.
  10. Run the cells in the section Clean up existing resources.
  11. Run the cells in the section Create Hudi table with sample data using catalog sync to create a new Hudi table with sample data.
  12. Run the cells in the section Read from Hudi table to verify the new Hudi table. There are five records in this table.
  13. Run the cells in the section Upsert records into Hudi table to see how upsert works on Hudi. This code inserts one new record, and updates the one existing record. You can verify that there is a new record product_id=00006, and the existing record product_id=00001’s price has been updated from 250 to 400.
  14. Run the cells in the section Delete a Record. You can verify that the existing record product_id=00001 has been deleted.
  15. Run the cells in the section Point in time query. You can verify that you’re seeing the previous version of the table where the upsert and delete operations haven’t been applied yet.
  16. Run the cells in the section Incremental Query. You can verify that you’re seeing only the recent commit about product_id=00006.

On this notebook, you could complete the basic Spark DataFrame operations on Hudi tables.

Delta Lake

To read/write Delta Lake tables in the AWS Glue Studio notebook, complete following:

  1. Download delta_sql.ipynb.
  2. Open AWS Glue Studio.
  3. Choose Jobs.
  4. Choose Jupyter notebook, and then choose Upload and edit an existing notebook. From Choose file, select your ipynb file and choose Open, then choose Create.
  5. On the Notebook setup page, for Job name, enter your job name.
  6. For IAM role, select your IAM role. Choose Create job. After a short time period, the Jupyter notebook editor appears.
  7. In the first cell, replace the placeholder with your Delta connection name, and run the cell:
    %connections delta-100-byoc-connection
  8. In the second cell, replace the S3 bucket name placeholder with your S3 bucket name, and run the cell.
  9. Run the cells in the section Initialize SparkSession.
  10. Run the cells in the section Clean up existing resources.
  11. Run the cells in the section Create Delta table with sample data to create a new Delta table with sample data.
  12. Run the cells in the section Create a Delta Lake table.
  13. Run the cells in the section Read from Delta Lake table to verify the new Delta table. There are five records in this table.
  14. Run the cells in the section Insert records. The query inserts two new records: record_id=00006, and record_id=00007.
  15. Run the cells in the section Update records. The query updates the price of the existing records record_id=00007, and record_id=00007 from 500 to 300.
  16. Run the cells in the section Upsert records. to see how upsert works on Delta. This code inserts one new record, and updates the one existing record. You can verify that there is a new record product_id=00008, and the existing record product_id=00001’s price has been updated from 250 to 400.
  17. Run the cells in the section Alter DeltaLake table. The queries add one new column, and update the values in the column.
  18. Run the cells in the section Delete records. You can verify that the record product_id=00006 because it’s product_name is Pen.
  19. Run the cells in the section View History to describe the history of operations that was triggered against the target Delta table.

On this notebook, you could complete the basic Spark SQL operations on Delta tables.

Apache Iceberg

To read/write Apache Iceberg tables in the AWS Glue Studio notebook, complete the following:

  1. Download iceberg_sql.ipynb.
  2. Open AWS Glue Studio.
  3. Choose Jobs.
  4. Choose Jupyter notebook and then choose Upload and edit an existing notebook. From Choose file, select your ipynb file and choose Open, then choose Create.
  5. On the Notebook setup page, for Job name, enter your job name.
  6. For IAM role, select your IAM role. Choose Create job. After a short time period, the Jupyter notebook editor appears.
  7. In the first cell, replace the placeholder with your Delta connection name, and run the cell:
    %connections iceberg-0131-byoc-connection (Alternatively you can use your connection name created from the marketplace connector).
  8. In the second cell, replace the S3 bucket name placeholder with your S3 bucket name, and run the cell.
  9. Run the cells in the section Initialize SparkSession.
  10. Run the cells in the section Clean up existing resources.
  11. Run the cells in the section Create Iceberg table with sample data to create a new Iceberg table with sample data.
  12. Run the cells in the section Read from Iceberg table.
  13. Run the cells in the section Upsert records into Iceberg table.
  14. Run the cells in the section Delete records.
  15. Run the cells in the section View History and Snapshots.

On this notebook, you could complete the basic Spark SQL operations on Iceberg tables.

Conclusion

This post summarized how to utilize Apache Hudi, Delta Lake, and Apache Iceberg on AWS Glue platform, as well as demonstrate how each format works with a Glue Studio notebook. You can start using those data lake formats easily in Spark DataFrames and Spark SQL on the Glue jobs or the Glue Studio notebooks.

This post focused on interactive coding and querying on notebooks. The upcoming part 2 will focus on the experience using AWS Glue Studio Visual Editor and Glue DynamicFrames for customers who prefer visual authoring without the need to write code.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He enjoys learning different use cases from customers and sharing knowledge about big data technologies with the wider community.

Dylan Qu is a Specialist Solutions Architect focused on Big Data & Analytics with AWS. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Monjumi Sarma is a Data Lab Solutions Architect at AWS. She helps customers architect data analytics solutions, which gives them an accelerated path towards modernization initiatives.

Data warehouse and business intelligence technology consolidation using AWS

Post Syndicated from Bappaditya Datta original https://aws.amazon.com/blogs/architecture/data-warehouse-and-business-intelligence-technology-consolidation-using-aws/

Organizations have been using data warehouse and business intelligence (DWBI) workloads to support business decision making for many years. These workloads are brought to the Amazon Web Services (AWS) platform to utilize the benefit of AWS cloud. However, these workloads are built using multiple vendor tools and technologies, and the customer faces the burden of administrative overhead.

This post provides architectural guidance to consolidate multiple DWBI technologies to AWS Managed Services to help reduce the administrative overhead, bring operational ease, and business efficiency. Two scenarios are explored:

  1. Upstream transactional databases are already on AWS
  2. Upstream transactional databases are present at on-premise datacenter

Challenges faced by an organization

Organizations are engaged in managing multiple DWBI technologies due to acquisitions, mergers, and the lift-and-shift of workloads. These workloads use extract, transform, and load (ETL) tools to read relational data from upstream transactional databases, process it, and store it in a data warehouse. Thereafter, these workloads use business intelligence tools to generate valuable insight and present it to users in form of reports and dashboards.

These DWBI technologies are generally installed and maintained on their own server. Figure 1 demonstrates the increased the administrative overhead for the organization but also creates challenges in maintaining the team’s overall knowledge.

DWBI workload with multiple tools

Figure 1. DWBI workload with multiple tools

Therefore, organizations are looking to consolidate technology usage and continue supporting important business functions.

Scenario 1

As we know, three major functions of DWBI workstream are:

  • ETL data using a tool
  • Store/manage the data in a data warehouse
  • Generate information from the data using business intelligence

Each of these functions can be performed efficiently using an AWS service. For example, AWS Glue can be used for ETL, Amazon Redshift for data warehouse, and Amazon QuickSight for business intelligence.

With the use of mentioned AWS services, organizations will be able to consolidate their DWBI technology usage. Organizations also will be able to quickly adapt to these services, as their engineering team can more easily use their DWBI knowledge with these services. For example, using SQL knowledge in AWS Glue jobs with SprakSQL, in Amazon Redshift queries, and in Amazon QuickSight dashboards.

Figure 2 demonstrates the redesigned the architecture of Figure 1 using AWS services. In this architecture, ETL functions are consolidated in AWS Glue. An AWS Glue crawler is used to auto-catalogue the source and target table metadata; then, AWS Glue ETL jobs use these catalogues to read data from source and write to target (data warehouse). AWS Glue jobs also apply necessary transformations (such as join, filter, and aggregate) to the data before writing. Additionally, an AWS Glue trigger is used to schedule the job executions. Alternatively, AWS Managed Workflows for Apache Airflow can be used to schedule jobs.

Consolidated workload with source on AWS

Figure 2. Consolidated workload with source on AWS

Similarly, data warehousing function is consolidated with Amazon Redshift. Amazon Redshift is used to store and organize enriched data and also enforce appropriate data access control for both workloads and users.

Lastly, business intelligence functions are consolidated using Amazon QuickSight. It used to create necessary dashboards that source data from Amazon Redshift and apply complex business logic to produce necessary charts and graphs needed for business insights. It is also used to implement necessary access restrictions to dashboards and data.

Scenario 2

In situation where source databases are in on-premises datacenter, the overall solution will be similar to Scenario 1, with an additional step to move the data continually from on-premise database to an Amazon Simple Storage Service (Amazon S3) bucket. The data movement can be efficiently handled by AWS Database Migration Service (AWS DMS).

To make the source database accessible to AWS DMS, a connection needs to established between the AWS cloud and on-premise network. Based on performance and throughput needs, the organization can choose either AWS Direct Connect service or AWS Site-to-Site VPN service to securely move the data. For the purpose of this discussion, we are considering AWS Direct Connect.

In Figure 3, AWS DMS task is used to perform a full-load followed by change data capture to continuously move the data to an S3 bucket. In this scenario, AWS Glue is used to catalogue and read the data from S3 bucket. The remaining portion of the dataflow is the same as the one mentioned in Scenario 1.

Consolidated workload with source at datacenter

Figure 3. Consolidated workload with source at datacenter

Scaling

Both of the updated architectures provide necessary scaling:

  • Auto scaling feature can be used to scale-up or -down AWS Glue ETL job resources
  • Concurrency scaling feature can be used to support virtually unlimited concurrent users and queries in Amazon Redshift
  • Amazon QuickSight resources (web server, Amazon QuickSight engine, and SPICE) are auto scaled by design

Security, monitoring, and auditing

Also, the updated architectures provide necessary security by using access control, data encryption at-rest and in transit, monitoring, and auditing.

Additionally, both Amazon Redshift and Amazon QuickSight provides their own authentication and access controls. Therefore, a user can be a local user or a federated one. With the help of these authentications, an organization will be able to control access to data in Amazon Redshift and also access to the dashboard in Amazon QuickSight.

Conclusion

In this blog post, we discussed how AWS Glue, Amazon Redshift, and Amazon QuickSight can be used to consolidate DWBI technologies. We also have discussed how an architecture can help an organization build a scalable, secure workload with auto scaling, access control, log monitoring and activity auditing.

Ready to get started?

How William Hill migrated NoSQL workloads at scale to Amazon Keyspaces

Post Syndicated from Kunal Gautam original https://aws.amazon.com/blogs/big-data/how-william-hill-migrated-nosql-workloads-at-scale-to-amazon-keyspaces/

Social gaming and online sports betting are competitive environments. The game must be able to handle large volumes of unpredictable traffic while simultaneously promising zero downtime. In this domain, user retention is no longer just desirable, it’s critical. William Hill is a global online gambling company based in London, England, and it is the founding member of the UK Betting and Gaming Council. They share the mission to champion the betting and gaming industry and set world-class standards to make sure of an enjoyable, fair, and safe betting and gambling experience for all of their customers. In sports betting, William Hill is an industry-leading brand, awarded with prestigious industry titles like the IGA Awards Sports Betting Operator of the year in 2019, 2020, and 2022, and the SBC Awards Racing Sportsbook of the Year in 2019. William Hill has been acquired by Caesars Entertainment, Inc (NASDAQ: CZR) in April 2021, and it’s the largest casino-entertainment company in the US and one of the world’s most diversified casino-entertainment providers. At the heart of William Hill gaming platform is a NoSQL database that maintains 100% uptime, scales in real-time to handle millions of users or more, and provides users with a responsive and personalized experience across all of their devices.

In this post, we’ll discuss how William Hill moved their workload from Apache Cassandra to Amazon Keyspaces (for Apache Cassandra) with zero downtime using AWS Glue ETL.

William Hill was facing challenges regarding scalability, cluster instability, high operational costs, and manual patching and server maintenance. They were looking for a NoSQL solution which was scalable, highly-available, and completely managed. This let them focus on providing better user experience rather than maintaining infrastructure. William Hill Limited decided to move forward with Amazon Keyspaces, since it can run Apache Cassandra workloads on AWS using the same Cassandra application code and developer tools used today, without the need to provision, patch, manage servers, install, maintain, or operate software.

Solution overview

William Hill Limited wanted to migrate their existing Apache Cassandra workloads to Amazon Keyspaces with a replication lag of minutes, with minimum migration costs and development efforts. Therefore, AWS Glue ETL was leveraged to deliver the desired outcome.

AWS Glue is a serverless data integration service that provides multiple benefits for migration:

  • No infrastructure to maintain; allocates the necessary computing power and runs multiple migration jobs simultaneously.
  • All-in-one pricing model that includes infrastructure and is 55% cheaper than other cloud data integration options.
  • No lock in with the service; possible to develop data migration pipelines in open-source Apache Spark (Spark SQL, PySpark, and Scala).
  • Migration pipeline can be scaled fearlessly with Amazon Keyspaces and AWS Glue.
  • Built-in pipeline monitoring to make sure of in-migration continuity.
  • AWS Glue ETL jobs make it possible to perform bulk data extraction from Apache Cassandra and ingest to Amazon Keyspaces.

In this post, we’ll take you through William Hill’s journey of building the migration pipeline from scratch to migrate the Apache Cassandra workload to Amazon Keyspaces by leveraging AWS Glue ETL with DataStax Spark Cassandra connector.

For the purpose of this post, let’s look at a typical Cassandra Network setup on AWS and the mechanism used to establish the connection with AWS Glue ETL. The migration solution described also works for Apache Cassandra hosted on on-premises clusters.

Architecture overview

The architecture demonstrates the migration environment that requires Amazon Keyspaces, AWS Glue, Amazon Simple Storage Service (Amazon S3), and the Apache Cassandra cluster. To avoid a high CPU utilization/saturation on the Apache Cassandra cluster during the migration process, you might want to deploy another Cassandra datacenter to isolate your production from the migration workload to make the migration process seamless for your customers.

Amazon S3 has been used for staging while migrating data from Apache Cassandra to Amazon Keyspaces to make sure that the IO load on Cassandra serving live traffic on production is minimized, in case the data upload to Amazon Keyspaces fails and a retry must be done.

Prerequisites

The Apache Cassandra cluster is hosted on Amazon Elastic Compute Cloud (Amazon EC2) instances, spread across three availability zones, and hosted in private subnets. AWS Glue ETL is hosted on Amazon Virtual Private Cloud (Amazon VPC) and thus needs a AWS Glue Studio custom Connectors and Connections to be setup to communicate with the Apache Cassandra nodes hosted on the private subnets in the customer VPC. Thereby, this enables the connection to the Cassandra cluster hosted in the VPC. The DataStax Spark Cassandra Connector must be downloaded and saved onto an Amazon S3 bucket: s3://$MIGRATION_BUCKET/jars/spark-cassandra-connector-assembly_2.12-3.2.0.jar.

Let’s create an AWS Glue Studio custom connector named cassandra_connection and its corresponding connection named conn-cassandra-custom for AWS region us-east-1.

For the connector created, create an AWS Glue Studio connection and populate it with network information VPC, and a Subnet allowing for AWS Glue ETL to establish a connection with Apache Casandra.

  • Name: conn-cassandra-custom
  • Network Options

Let’s begin by creating a keyspace and table in Amazon Keyspaces using Amazon Keyspaces Console or CQLSH, and then create a target keyspace named target_keyspace and a target table named target_table.

CREATE KEYSPACE target_keyspace WITH replication = {'class': 'SingleRegionStrategy'};

CREATE TABLE target_keyspace.target_table (
    userid      uuid,
    level       text,
    gameid      int,
    description text,
    nickname    text,
    zip         text,
    email       text,
    updatetime  text,
    PRIMARY KEY (userid, level, gameid)
) WITH default_time_to_live = 0 AND CUSTOM_PROPERTIES = {
	'capacity_mode':{
		'throughput_mode':'PROVISIONED',
		'write_capacity_units':76388,
		'read_capacity_units':3612
	}
} AND CLUSTERING ORDER BY (level ASC, gameid ASC);

After the table has been created, switch the table to on-demand mode to pre-warm the table and avoid AWS Glue ETL job throttling failures. The following script will update the throughput mode.

ALTER TABLE target_keyspace.target_table 
WITH CUSTOM_PROPERTIES = {
	'capacity_mode':{
		'throughput_mode':'PAY_PER_REQUEST'
	}
} 

Let’s go ahead and create two Amazon S3 buckets to support the migration process. The first bucket (s3://your-spark-cassandra-connector-bucket-name)should store the spark Cassandra connector assembly jar file, Cassandra, and Keyspaces configuration YAML files.

The second bucket (s3://your-migration-stage-bucket-name) will be used to store intermediate parquet files to identify the delta between the Cassandra cluster and the Amazon Keyspaces table to track changes between subsequent executions of the AWS Glue ETL jobs.

In the following KeyspacesConnector.conf, set your contact points to connect to Amazon Keyspaces, and replace the username and the password to the AWS credentials.

Using the RateLimitingRequestThrottler we can make sure that requests don’t exceed the configured Keyspaces capacity. The G1.X DPU creates one executor per worker. The RateLimitingRequestThrottler in this example is set for 1000 requests per second. With this configuration, and G.1X DPU, you’ll achieve 1000 request per AWS Glue worker. Adjust the max-requests-per-second accordingly to fit your workload. Increase the number of workers to scale throughput to a table.

datastax-java-driver {
  basic.request.consistency = "LOCAL_QUORUM"
  basic.contact-points = ["cassandra.us-east-1.amazonaws.com:9142"]
   advanced.reconnect-on-init = true
   basic.load-balancing-policy {
        local-datacenter = "us-east-1"
    }
    advanced.auth-provider = {
       class = PlainTextAuthProvider
       username = "user-at-sample"
       password = "S@MPLE=PASSWORD="
    }
    advanced.throttler = {
       class = RateLimitingRequestThrottler
       max-requests-per-second = 1000
       max-queue-size = 50000
       drain-interval = 1 millisecond
    }
    advanced.ssl-engine-factory {
      class = DefaultSslEngineFactory
      hostname-validation = false
    }
    advanced.connection.pool.local.size = 1
}

Similarly, create a CassandraConnector.conf file, set the contact points to connect to the Cassandra cluster, and replace the username and the password respectively.

datastax-java-driver {
  basic.request.consistency = "LOCAL_QUORUM"
  basic.contact-points = ["127.0.0.1:9042"]
   advanced.reconnect-on-init = true
   basic.load-balancing-policy {
        local-datacenter = "datacenter1"
    }
    advanced.auth-provider = {
       class = PlainTextAuthProvider
       username = "user-at-sample"
       password = "S@MPLE=PASSWORD="
    }
}

Build AWS Glue ETL migration pipeline with Amazon Keyspaces

To build reliable, consistent delta upload Glue ETL pipeline, let’s decouple the migration process into two AWS Glue ETLs.

  • CassandraToS3 Glue ETL: Read data from the Apache Cassandra cluster and transfer the migration workload to Amazon S3 in the Apache Parquet format. To identify incremental changes in the Cassandra tables, the job stores separate parquet files with primary keys with an updated timestamp.
  • S3toKeyspaces Glue ETL: Uploads the migration workload from Amazon S3 to Amazon Keyspaces. During the first run, the ETL uploads the complete data set from Amazon S3 to Amazon Keyspaces, and for the subsequent run calculates the incremental changes by comparing the updated timestamp across two subsequent runs and calculating the incremental difference. The job also takes care of inserting new records, updating existing records, and deleting records based on the incremental difference.

In this example, we’ll use Scala to write the AWS Glue ETL, but you can also use PySpark.

Let’s go ahead and create an AWS Glue ETL job named CassandraToS3 with the following job parameters:

aws glue create-job \
    --name "CassandraToS3" \
    --role "GlueKeyspacesMigration" \
    --description "Offload data from the Cassandra to S3" \
    --glue-version "3.0" \
    --number-of-workers 2 \
    --worker-type "G.1X" \
    --connections "conn-cassandra-custom" \
    --command "Name=glueetl,ScriptLocation=s3://$MIGRATION_BUCKET/scripts/CassandraToS3.scala" \
    --max-retries 0 \
    --default-arguments '{
        "--job-language":"scala",
        "--KEYSPACE_NAME":"source_keyspace",
        "--TABLE_NAME":"source_table",
        "--S3_URI_FULL_CHANGE":"s3://$MIGRATION_BUCKET/full-dataset/",
        "--S3_URI_CURRENT_CHANGE":"s3://$MIGRATION_BUCKET/incremental-dataset/current/",
        "--S3_URI_NEW_CHANGE":"s3://$MIGRATION_BUCKET/incremental-dataset/new/",
        "--extra-files":"s3://$MIGRATION_BUCKET/conf/CassandraConnector.conf",
        "--conf":"spark.cassandra.connection.config.profile.path=CassandraConnector.conf",
        "--class":"GlueApp"
    }'

The CassandraToS3 Glue ETL job reads data from the Apache Cassandra table source_keyspace.source_table and writes it to the S3 bucket in the Apache Parquet format. The job rotates the parquet files to help identify delta changes in the data between consecutive job executions. To identify inserts, updates, and deletes, you must know primary key and columns write times (updated timestamp) in the Cassandra cluster up front. Our primary key consists of several columns userid, level, gameid, and a write time column updatetime. If you have multiple updated columns, then you must use more than one write time columns with an aggregation function. For example, for email and updatetime, take the maximum value between write times for email and updatetime.

The following AWS Glue spark code offloads data to Amazon S3 using the spark-cassandra-connector. The script takes four parameters KEYSPACE_NAME, KEYSPACE_TABLE, S3_URI_CURRENT_CHANGE, S3_URI_CURRENT_CHANGE, and S3_URI_NEW_CHANGE.

To upload the data from Amazon S3 to Amazon Keyspaces, you must create a S3toKeyspaces Glue ETL job using the Glue spark code to read the parquet files from the Amazon S3 bucket created as an output of CassandraToS3 Glue job and identify inserts, updates, deletes, and execute requests against the target table in Amazon Keyspaces. The code sample provided takes four parameters: KEYSPACE_NAME, KEYSPACE_TABLE, S3_URI_CURRENT_CHANGE, S3_URI_CURRENT_CHANGE, and S3_URI_NEW_CHANGE.

Let’s go ahead and create our second AWS Glue ETL job S3toKeyspaces with the following job parameters:

aws glue create-job \
    --name "S3toKeyspaces" \
    --role "GlueKeyspacesMigration" \
    --description "Push data to Amazon Keyspaces" \
    --glue-version "3.0" \
    --number-of-workers 2 \
    --worker-type "G.1X" \
    --command "Name=glueetl,ScriptLocation=s3://amazon-keyspaces-backups/scripts/S3toKeyspaces.scala" \
    --default-arguments '{
        "--job-language":"scala",
        "--KEYSPACE_NAME":"target_keyspace",
        "--TABLE_NAME":"target_table",
        "--S3_URI_FULL_CHANGE":"s3://$MIGRATION_BUCKET/full-dataset/",
        "--S3_URI_CURRENT_CHANGE":"s3://$MIGRATION_BUCKET/incremental-dataset/current/",
        "--S3_URI_NEW_CHANGE":"s3://$MIGRATION_BUCKET/incremental-dataset/new/",
        "--extra-files":"s3://$MIGRATION_BUCKET/conf/KeyspacesConnector.conf",
        "--conf":"spark.cassandra.connection.config.profile.path=KeyspacesConnector.conf",
        "--class":"GlueApp"
    }'

Job scheduling

The final step is to configure AWS Glue Triggers or Amazon EventBridge depending on your scheduling needs to trigger S3toKeyspaces Glue ETL when the job CassandraToS3 has succeeded. If you want to run the CassandraToS3 based on the schedule and configure the schedule option, then the following example showcases how to schedule cassandraToS3 to run every 15 minutes.

Job tuning

There are Spark settings recommended to begin with Amazon Keyspaces, which can then be increased later as appropriate for your workload.

  • Use a Spark partition size (groups multiple Cassandra rows) smaller than 8 MBs to avoid replaying large Spark tasks during a task failure.
  • Use a low concurrent number of writes per DPU with a large number of retries. Add the following options to the job parameters: --conf spark.cassandra.query.retry.count=500 --conf spark.cassandra.output.concurrent.writes=3.
  • Set spark.task.maxFailures to a bounded value. For example, you can start from 32 and increase as needed. This option can help you increase a number of tasks reties during a table pre-warm stage. Add the following option to the job parameters: --conf spark.task.maxFailures=32
  • Another recommendation is to turn off batching to improve random access patterns. Add the following options to the job parameters:
    spark.cassandra.output.batch.size.rows=1
    spark.cassandra.output.batch.grouping.key=none spark.cassandra.output.batch.grouping.buffer.size=100
  • Randomize your workload. Amazon Keyspaces partitions data using partition keys. Although Amazon Keyspaces has built-in logic to help load balance requests for the same partition key, loading the data is faster and more efficient if you randomize the order because you can take advantage of the built-in load balancing of writing to different partitions. To spread the writes across the partitions evenly, you must randomize the data in the dataframe. You might use a rand function to shuffle rows in the dataframe.

Summary

William Hill was able to migrate their workload from Apache Cassandra to Amazon Keyspaces at scale using AWS Glue, without the needs to make any changes on their application tech stack. The adoption of Amazon Keyspaces has provided them with the headroom to focus on their Application and customer experience, as with Amazon Keyspaces there’s no need to manage servers, get performance at scale, highly-scalable, and secure solution with the ability to handle the sudden spike in demand.

In this post, you saw how to use AWS Glue to migrate the Cassandra workload to Amazon Keyspaces, and simultaneously keep your Cassandra source databases completely functional during the migration process. When your applications are ready, you can choose to cut over your applications to Amazon Keyspaces with minimal replication lag in sub minutes between the Cassandra cluster and Amazon Keyspaces. You can also use a similar pipeline to replicate the data back to the Cassandra cluster from Amazon Keyspaces to maintain data consistency, if needed. Here you can find the documents and code to help accelerate your migration to Amazon Keyspaces.


About the Authors

Nikolai Kolesnikov is a Senior Data Architect and helps AWS Professional Services customers build highly-scalable applications using Amazon Keyspaces. He also leads Amazon Keyspaces ProServe customer engagements.

Kunal Gautam is a Senior Big Data Architect at Amazon Web Services. Having experience in building his own Startup and working along with enterprises, he brings a unique perspective to get people, business and technology work in tandem for customers. He is passionate about helping customers in their digital transformation journey and enables them to build scalable data and advance analytics solutions to gain timely insights and make critical business decisions. In his spare time, Kunal enjoys Marathons, Tech Meetups and Meditation retreats.

Analyzing Amazon SES event data with AWS Analytics Services

Post Syndicated from Oscar Mendoza original https://aws.amazon.com/blogs/messaging-and-targeting/analyzing-amazon-ses-event-data-with-aws-analytics-services/

In this post, we will walk through using AWS Services, such as, Amazon Kinesis Firehose, Amazon Athena and Amazon QuickSight to monitor Amazon SES email sending events with the granularity and level of detail required to get insights from your customers engage with the emails you send.

Nowadays, email Marketers rely on internal applications to create their campaigns or any communications requirements, such us newsletters or promotional content. From those activities, they need to collect as much information as possible to analyze and improve their pipeline to get better interaction with the customers. Data such us bounces, rejections, success reception, delivery delays, complaints or open rate can be a powerful tool to understand the customers. Usually applications work with high-level data points without detailed logging or granular information that could help improve even better the effectiveness of their campaigns.

Amazon Simple Email Service (SES) is a smart tool for companies that wants a cost-effective, flexible, and scalable email service solution to easily integrate with their own products. Amazon SES provides methods to control your sending activity with built-in integration with Amazon CloudWatch Metrics and also provides a mechanism to collect the email sending events data.

In this post, we propose an architecture and step-by-step guide to track your email sending activities at a granular level, where you can configure several types of email sending events, including sends, deliveries, opens, clicks, bounces, complaints, rejections, rendering failures, and delivery delays. We will use the configuration set feature of Amazon SES to send detailed logging to our analytics services to store, query and create dashboards for a detailed view.

Overview of solution

This architecture uses Amazon SES built-in features and AWS analytics services to provide a quick and cost-effective solution to address your mail tracking requirements. The following services will be implemented or configured:

The following diagram shows the architecture of the solution:

Serverless Architecture to Analyze Amazon SES events

Figure 1. Serverless Architecture to Analyze Amazon SES events

The flow of the events starts when a customer uses Amazon SES to send an email. Each of those send events will be capture by the configuration set feature and forward the events to a Kinesis Firehose delivery stream to buffer and store those events on an Amazon S3 bucket.

After storing the events, it will be required to create a database and table schema and store it on AWS Glue Data Catalog in order for Amazon Athena to be able to properly query those events on S3. Finally, we will use Amazon QuickSight to create interactive dashboard to search and visualize all your sending activity with an email level of detailed.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Walkthrough

Step 1: Use AWS CloudFormation to deploy some additional prerequisites

You can get started with our sample AWS CloudFormation template that includes some prerequisites. This template creates an Amazon S3 Bucket, an IAM role needed to access from Amazon SES to Amazon Kinesis Data Firehose.

To download the CloudFormation template, run one of the following commands, depending on your operating system:

In Windows:

curl https://raw.githubusercontent.com/aws-samples/amazon-ses-analytics-blog/main/SES-Blog-PreRequisites.yml -o SES-Blog-PreRequisites.yml

In MacOS

wget https://raw.githubusercontent.com/aws-samples/amazon-ses-analytics-blog/main/SES-Blog-PreRequisites.yml

To deploy the template, use the following AWS CLI command:

aws cloudformation deploy --template-file ./SES-Blog-PreRequisites.yml --stack-name ses-dashboard-prerequisites --capabilities CAPABILITY_NAMED_IAM

After the template finishes creating resources, you see the IAM Service role and the Delivery Stream on the stack Outputs tab. You are going to use these resources in the following steps.

IAM Service role and Delivery Stream created by CloudFormation template

Figure 2. CloudFormation template outputs

Step 2: Creating a configuration set in SES and setting the default configuration set for a verified identity

SES can track the number of send, delivery, open, click, bounce, and complaint events for each email you send. You can use event publishing to send information about these events to other AWS service. In this case we are going to send the events to Kinesis Firehose. To do this, a configuration set is required.

To create a configuration set, complete the following steps:

  1. On the AWS Console, choose the Amazon Simple Email Service.
  2. Choose Configuration sets.
  3. Click on Create set.

    Create a configuration set in Amazon SES

    Figure 3. Amazon SES Create Configuration Set

  4. Set a Configuration set name.
  5. Leave the other configurations by default.

    Write a name for your configuration set

    Figure 4. Configuration Set Name

  6. Once the configuration set is created, select Event destinations

    Configuration set created successfully

    Figure 5. Configuration set created successfully

  7. Click on Add destination
  8. Select the event types you would like to analyze and then click on next.

    Sending Events to analyze

    Figure 6. Sending Events to analyze

  9. Select Amazon Kinesis Data Firehose as the destination, choose the delivery stream and the IAM role created previously, click on next and in the review page, click on Add destination.

    Destination for Amazon SES sending events

    Figure 7. Destination for Amazon SES sending events

  10. Once you have created the configuration set and added the event destination, you can define the Default configuration set for the verified identity (domain or email address). In the SES console, choose Verified identities.

    Amazon SES Verified Identity

    Figure 8 Amazon SES Verified Identity

  11. Choose the verified identity from which you want to collect events and select Configuration set. Click on Edit.

    Edit Configuration Set for Verified Identity

    Figure 9. Edit Configuration Set for Verified Identity

  12. Click on the checkbox Assign a default configuration set and choose the configuration set created previously.

    Assign default configuration set

    Figure 10. Assign default configuration set

  13. Once you have completed the previous steps, your events will be sent to Amazon S3. Due to the buffer’s configuration on the Kinesis Delivery Stream, the data will be loaded every 5 minutes or every 5 MiB to Amazon S3. You can check the structure created on the bucket and see json logs with SES events data.

    Amazon S3 bucket structure

    Figure 11. Amazon S3 bucket structure

Step 3: Using Amazon Athena to query the SES event logs

Amazon SES publishes email sending event records to Amazon Kinesis Data Firehose in JSON format. The top-level JSON object contains an eventType string, a mail object, and either a Bounce, Complaint, Delivery, Send, Reject, Open, Click, Rendering Failure, or DeliveryDelay object, depending on the type of event.

  1. In order to simplify the analysis of email sending events, create the sesmaster table by running the following script in Amazon Athena. Don’t forget to change the location in the following script with your own bucket containing the data of email sending events.
    CREATE EXTERNAL TABLE sesmaster (
    eventType string,
    complaint struct < arrivaldate: string,
    complainedrecipients: array < struct < emailaddress: string >>,
    complaintfeedbacktype: string,
    feedbackid: string,
    `timestamp`: string,
    useragent: string >,
    bounce struct < bouncedrecipients: array < struct < action: string,
    diagnosticcode: string,
    emailaddress: string,
    status: string >>,
    bouncesubtype: string,
    bouncetype: string,
    feedbackid: string,
    reportingmta: string,
    `timestamp`: string >,
    mail struct < timestamp: string,
    source: string,
    sourcearn: string,
    sendingaccountid: string,
    messageid: string,
    destination: string,
    headerstruncated: boolean,
    headers: array < struct < name: string,
    value: string >>,
    commonheaders: struct < `from`: array < string >,
    to: array < string >,
    messageid: string,
    subject: string >,
    tags: struct < ses_source_tls_version: string,
    ses_operation: string,
    ses_configurationset: string,
    ses_source_ip: string,
    ses_outgoing_ip: string,
    ses_from_domain: string,
    ses_caller_identity: string >>,
    send string,
    delivery struct < processingtimemillis: int,
    recipients: array < string >,
    reportingmta: string,
    smtpresponse: string,
    `timestamp`: string >,
    open struct < ipaddress: string,
    `timestamp`: string,
    userAgent: string >,
    reject struct < reason: string >,
    click struct < ipAddress: string,
    `timestamp`: string,
    userAgent: string,
    link: string >
    )
    ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
    WITH SERDEPROPERTIES (
    "mapping.ses_caller_identity" = "ses:caller-identity",
    "mapping.ses_configurationset" = "ses:configuration-set",
    "mapping.ses_from_domain" = "ses:from-domain",
    "mapping.ses_operation" = "ses:opeation",
    "mapping.ses_outgoing_ip" = "ses:outgoing-ip",
    "mapping.ses_source_ip" = "ses:source-ip",
    "mapping.ses_source_tls_version" = "ses:source-tls-version"
    )
    LOCATION 's3://aws-s3-ses-analytics-<aws-account-number>/'
    

    The sesmaster table uses the org.openx.data.jsonserde.JsonSerDe SerDe library to deserialize the JSON data.

    We have leveraged the support for JSON arrays and maps and the support for nested data structures. Those features ease the process of preparation and visualization of data.

    In the sesmaster table, the following mappings were applied to avoid errors due to name of JSON fields containing colons.

    • “mapping.ses_configurationset”=”ses:configuration-set”
    • “mapping.ses_source_ip”=”ses:source-ip”
    • “mapping.ses_from_domain”=”ses:from-domain”
    • “mapping.ses_caller_identity”=”ses:caller-identity” “mapping.ses_outgoing_ip”=”ses:outgoing-ip”
  2. Once the sesmaster table is ready, it is a good strategy to create curated views of its data. The first view called vwSESMaster contains all the records of email sending events and all the fields which are unique on each event. Create the vwSESMaster view by running the following script in Amazon Athena.
    CREATE OR REPLACE VIEW vwSESMaster AS
    SELECT
    eventtype as eventtype
    , mail.messageId as mailmessageid
    , mail.timestamp as mailtimestamp
    , mail.source as mailsource
    , mail.sendingAccountId as mailsendingAccountId
    , mail.commonHeaders.subject as mailsubject
    , mail.tags.ses_configurationset as mailses_configurationset
    , mail.tags.ses_source_ip as mailses_source_ip
    , mail.tags.ses_from_domain as mailses_from_domain
    , mail.tags.ses_outgoing_ip as mailses_outgoing_ip
    , delivery.processingtimemillis as deliveryprocessingtimemillis
    , delivery.reportingmta as deliveryreportingmta
    , delivery.smtpresponse as deliverysmtpresponse
    , delivery.timestamp as deliverytimestamp
    , delivery.recipients[1] as deliveryrecipient
    , open.ipaddress as openipaddress
    , open.timestamp as opentimestamp
    , open.userAgent as openuseragent
    , bounce.bounceType as bouncebounceType
    , bounce.bouncesubtype as bouncebouncesubtype
    , bounce.feedbackid as bouncefeedbackid
    , bounce.timestamp as bouncetimestamp
    , bounce.reportingMTA as bouncereportingmta
    , click.ipAddress as clickipaddress
    , click.timestamp as clicktimestamp
    , click.userAgent as clickuseragent
    , click.link as clicklink
    , complaint.timestamp as complainttimestamp
    , complaint.userAgent as complaintuseragent
    , complaint.complaintFeedbackType as complaintcomplaintfeedbacktype
    , complaint.arrivalDate as complaintarrivaldate
    , reject.reason as rejectreason
    FROM
    sesmaster

    The sesmaster table contains some fields which are represented by nested arrays, so it is necessary to flatten them into multiples rows. Following you can see the event types and the fields which need to be flatten.

    • Event type SEND: field mail.commonHeaders
    • Event type BOUNCE: field bounce.bouncedrecipients
    • Event type COMPLAINT: field complaint.complainedrecipients

    To flatten those arrays into multiple rows, we used the CROSS JOIN in conjunction with the UNNEST operator using the following strategy for all the three events:

    • Create a temporal view with the mail.messageID and the field to be flattened.
    • Create another temporal view with the array flattened into multiple rows.
    • Create the final view joining the sesmaster table with the second temporal view by event type and mail.messageID.

    To create those views, follow the next steps.

  3. Run the following scripts in Amazon Athena to flat the mail.commonHeaders array in the SEND event type
    CREATE OR REPLACE VIEW vwSendMailTmpSendTo AS 
    SELECT
    mail.messageId as messageid
    , mail.commonHeaders.to as recipients
    FROM
    sesmaster
    WHERE 
    eventtype='Send'
    
    CREATE OR REPLACE VIEW vwsendmailrecipients AS 
    SELECT
    messageid
    , recipient
    FROM
    ("vwSendMailTmpSendTo"
    CROSS JOIN UNNEST(recipients) t (recipient))
    
    CREATE OR REPLACE VIEW vwSentMails AS
    SELECT 
    eventtype as eventtype
    , mail.messageId as mailmessageid
    , mail.timestamp as mailtimestamp
    , mail.source as mailsource
    , mail.sendingAccountId as mailsendingAccountId
    , mail.commonHeaders.subject as mailsubject
    , mail.tags.ses_configurationset as mailses_configurationset
    , mail.tags.ses_source_ip as mailses_source_ip
    , mail.tags.ses_from_domain as mailses_from_domain
    , mail.tags.ses_outgoing_ip as mailses_outgoing_ip
    , dest.recipient as mailto
    FROM
    sesmaster as sm
    ,vwsendmailrecipients as dest
    WHERE
    sm.eventtype = 'Send'
    and sm.mail.messageid = dest.messageid
  4. Run the following scripts in Amazon Athena to flat the bounce.bouncedrecipients array in the BOUNCE event type
    CREATE OR REPLACE VIEW vwbouncemailtmprecipients AS 
    SELECT
    mail.messageId as messageid
    , bounce.bouncedrecipients
    FROM
    sesmaster
    WHERE (eventtype = 'Bounce')
    
    CREATE OR REPLACE VIEW vwbouncemailrecipients AS 
    SELECT
    messageid
    , recipient.action
    , recipient.diagnosticcode
    , recipient.emailaddress
    FROM
    (vwbouncemailtmprecipients
    CROSS JOIN UNNEST(bouncedrecipients) t (recipient))
    
    CREATE OR REPLACE VIEW vwBouncedMails AS
    SELECT
    eventtype as eventtype
    , mail.messageId as mailmessageid
    , mail.timestamp as mailtimestamp
    , mail.source as mailsource
    , mail.sendingAccountId as mailsendingAccountId
    , mail.commonHeaders.subject as mailsubject
    , mail.tags.ses_configurationset as mailses_configurationset
    , mail.tags.ses_source_ip as mailses_source_ip
    , mail.tags.ses_from_domain as mailses_from_domain
    , mail.tags.ses_outgoing_ip as mailses_outgoing_ip
    , bounce.bounceType as bouncebounceType
    , bounce.bouncesubtype as bouncebouncesubtype
    , bounce.feedbackid as bouncefeedbackid
    , bounce.timestamp as bouncetimestamp
    , bounce.reportingMTA as bouncereportingmta
    , bd.action as bounceaction
    , bd.diagnosticcode as bouncediagnosticcode
    , bd.emailaddress as bounceemailaddress
    FROM
    sesmaster as sm
    ,vwbouncemailrecipients as bd
    WHERE
    sm.eventtype = 'Bounce'
    and sm.mail.messageid = bd.messageid
    
  5. Run the following scripts in Amazon Athena to flat the complaint.complainedrecipients array in the COMPLAINT event type
    CREATE OR REPLACE VIEW vwcomplainttmprecipients AS 
    SELECT
    mail.messageId as messageid
    , complaint.complainedrecipients
    FROM
    sesmaster
    WHERE (eventtype = 'Complaint')
    
    CREATE OR REPLACE VIEW vwcomplainedrecipients AS 
    SELECT
    messageid
    , recipient.emailaddress
    FROM
    (vwcomplainttmprecipients 
    CROSS JOIN UNNEST(complainedrecipients) t (recipient))
    

    At the end we have one table and four views which can be used in Amazon QuickSight to analyze email sending events:

    • Table sesmaster
    • View vwSESMaster
    • View vwSentMails
    • View vwBouncedMails
    • View vwComplainedemails

Step 4: Analyze and visualize data with Amazon QuickSight

 In this blog post, we use Amazon QuickSight to analyze and to visualize email sending events from the sesmaster and the four curated views created previously. Amazon QuickSight can directly access data through Athena. Its pay-per-session pricing enables you to put analytical insights into the hands of everyone in your organization.

Let’s set this up together. We first need to select our table and our views to create new data sources in Athena and then we use these data sources to populate the visualization. We are creating just an example of visualization. Feel free to create your own visualization based on your information needs.

Before we can use the data in Amazon QuickSight, we need to first grant access to the underlying S3 bucket. If you haven’t done so already for other analyses, see our documentation on how to do so.

  1. On the Amazon QuickSight home page, choose Datasets from the menu on the left side, then choose New dataset from the upper-right corner, set and pick Athena as data source. In the following dialog box, give the data source a descriptive name and choose Create data source.

    Create New Athena Data Source

    Figure 12. Create New Athena Data Source

  2. In the following dialog box, select the Catalog and the Database containing your sesmaster and curated views. Let’s select the sesmaster table in order to create some basic Key Performance Indicators. Select the table sesmaster and click on the Select

    Select Sesmaster Table

    Figure 13. Select Sesmaster Table

  3. Our sesmaster table now is a data source for Amazon QuickSight and we can turn to visualizing the data.

    QuickSight Visualize Data

    Figure 14. QuickSight Visualize Data

  4. You can see the list fields on the left. The canvas on the right is still empty. Before we populate it with data, let’s select Key Performance Indicator from the available visual types.

    QuickSight Visual Types

    Figure 15. QuickSight Visual Types

  5. To populate the graph, drag and drop the fields from the field list on the left onto their respective destinations. In our case, we put the field send onto the value well and use count as aggregation.

    Add Send field to visualization

    Figure 16. Add Send field to visualization

  6. Add another visual from the left-upper side and select Key Performance Indicator as visual type.
    Add a new visual

    Figure 17. Add a new visual

    Key Performance Indicator Visual Type

    Figure 18. Key Performance Indicator Visual Type

  7. Put the field Delivery onto the value well and use count as aggregation.

    Add Delivery Field to visualization

    Figure 19. Add Delivery Field to visualization

  8. Repeat the same procedure, (steps 1 to 4) to count the number of Open, Click, Bounce, Complaint and Reject Events. At the end, you should see something similar to the following visualization. After resizing and rearranging the visuals, you should get an analysis like the shown in the image below.

    Preview of Key Performance Indicators

    Figure 20. Preview of Key Performance Indicators

  9. Let´s add another dataset by clicking the pencil on the right of the current Dataset.

    Add a New Dataset

    Figure 21. Add a New Dataset

  10. On the following dialog box, select Add Dataset.

    Add a New Dataset

    Figure 22. Add a New Dataset

  11. Select the view called vwsesmaster and click Select.
    Add vwsesmaster dataset

    Figure 23. Add vwsesmaster dataset

    Now you can see all the available fields of the vwsesmaster view.

    New fields from vwsesmaster dataset

    Figure 24. New fields from vwsesmaster dataset

  12. Let’s create a new visual and select the Table visual type.

    QuickSight Visual Types

    Figure 25. QuickSight Visual Types

  13. Drag and drop the fields from the field list on the left onto their respective destinations. In our case, we put the fields eventtype, mailmessageid, and mailsubject onto the Group By well, but you can add as many fields as you need.

    Add eventtype, mailmessageid and mailsubject fields

    Figure 26. Add eventtype, mailmessageid and mailsubject fields

  14. Now let’s create a filter for this visual in order to filter by type of event. Be sure you select the table and then click on Filter on the left menu.

    Add a Filter

    Figure 27. Add a Filter

  15. Click on Create One and select the field eventtype on the popup window. Now select the eventtype filter to see the following options.

    Create eventtype filter

    Figure 28. Create eventtype filter

  16. Click on the dots on the right of the eventtype filter and select Add to Sheet.

    Add filter to sheet

    Figure 29. Add filter to sheet

  17. Leave all the default values, scroll down and select Apply

    Apply filters with default values

    Figure 30. Apply filters with default values

  18. Now you can filter the vwsesmaster view by eventtype.

    Filter vwsesmasterview by eventtype

    Figure 31. Filter vwsesmasterview by eventtype

  19. You can continue customizing your visualization with all the available data in the sesmaster table, the vwsesmaster view and even add more datasets to include data from the vwSentMails, vwBouncedMails, and vwComplainedemails views. Below, you can see some other visualizations created from those views.
    Final visualization 1

    Figure 32. Final visualization 1

    Final visualization 2

    Figure 33. Final visualization 2

    Final visualization 3

    Figure 34. Final visualization 3

Clean up

To avoid ongoing charges, clean up the resources you created as part of this post:

  1. Delete the visualizations created in Amazon Quicksight.
  2. Unsubscribe from Amazon QuickSight if you are not using it for other projects.
  3. Delete the views and tables created in Amazon Athena.
  4. Delete the Amazon SES configuration set.
  5. Delete the Amazon SES events stored in S3.
  6. Delete the CloudFormation stack in order to delete the Amazon Kinesis Delivery Stream.

Conclusion

In this blog we showed how you can use AWS native services and features to quickly create an email tracking solution based on Amazon SES events to have a more detailed view on your sending activities. This solution uses a full serverless architecture without having to manage the underlying infrastructure and giving you the flexibility to use the solution for small, medium or intense Amazon SES usage, without having to take care of any servers.

We showed you some samples of dashboards and analysis that can be built for most of customers requirements, but of course you can evolve this solution and customize it according to your needs, adding or removing charts, filters or events to the dashboard. Please refer to the following documentation for the available Amazon SES Events, their structure and also how to create analysis and dashboards on Amazon QuickSight:

From a performance and cost efficiency perspective there are still several configurations that can be done to improve the solution, for example using a columnar file formant like parquet, compressing with snappy or setting your S3 partition strategy according to your email sending usage. Another improvement could be importing data into SPICE to read data in Amazon Quicksight. Using SPICE results in the data being loaded from Athena only once, until it is either manually refreshed or automatically refreshed using a schedule.

You can use this walkthrough to configure your first SES dashboard and start visualizing events detail. You can adjust the services described in this blog according to your company requirements.

About the authors

Oscar Mendoza AWS Solutions Architect Oscar Mendoza is a Solutions Architect at AWS based in Bogotá, Colombia. Oscar works with our customers to provide guidance in architectural best practices and to build Well Architected solutions on the AWS platform. He enjoys spending time with his family and his dog and playing music.
Luis Eduardo Torres AWS Solutions Architect Luis Eduardo Torres is a Solutions Architect at AWS based in Bogotá, Colombia. He helps companies to build their business using the AWS cloud platform. He has a great interest in Analytics and has been leading the Analytics track of AWS Podcast in Spanish.
Santiago Benavidez AWS Solutions Architect Santiago Benavídez is a Solutions Architect at AWS based in Buenos Aires, Argentina, with more than 13 years of experience in IT, currently helping DNB/ISV customers to achieve their business goals using the breadth and depth of AWS services, designing highly available, resilient and cost-effective architectures.

Migrate from Snowflake to Amazon Redshift using AWS Glue Python shell

Post Syndicated from Raks Khare original https://aws.amazon.com/blogs/big-data/migrate-from-snowflake-to-amazon-redshift-using-aws-glue-python-shell/

As the most widely used cloud data warehouse, Amazon Redshift makes it simple and cost-effective to analyze your data using standard SQL and your existing ETL (extract, transform, and load), business intelligence (BI), and reporting tools. Tens of thousands of customers use Amazon Redshift to analyze exabytes of data per day and power analytics workloads such as BI, predictive analytics, and real-time streaming analytics without having to manage the data warehouse infrastructure. It natively integrates with other AWS services, facilitating the process of building enterprise-grade analytics applications in a manner that is not only cost-effective, but also avoids point solutions.

We are continuously innovating and releasing new features of Amazon Redshift, enabling the implementation of a wide range of data use cases and meeting requirements with performance and scale. For example, Amazon Redshift Serverless allows you to run and scale analytics workloads without having to provision and manage data warehouse clusters. Other features that help power analytics at scale with Amazon Redshift include automatic concurrency scaling for read and write queries, automatic workload management (WLM) for concurrency scaling, automatic table optimization, the new RA3 instances with managed storage to scale cloud data warehouses and reduce costs, cross-Region data sharing, data exchange, and the SUPER data type to store semi-structured data or documents as values. For the latest feature releases for Amazon Redshift, see Amazon Redshift What’s New. In addition to improving performance and scale, you can also gain up to three times better price performance with Amazon Redshift than other cloud data warehouses.

To take advantage of the performance, security, and scale of Amazon Redshift, customers are looking to migrate their data from their existing cloud warehouse in a way that is both cost optimized and performant. This post describes how to migrate a large volume of data from Snowflake to Amazon Redshift using AWS Glue Python shell in a manner that meets both these goals.

AWS Glue is serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning (ML), and application development. AWS Glue provides all the capabilities needed for data integration, allowing you to analyze your data in minutes instead of weeks or months. AWS Glue supports the ability to use a Python shell job to run Python scripts as a shell, enabling you to author ETL processes in a familiar language. In addition, AWS Glue allows you to manage ETL jobs using AWS Glue workflows, Amazon Managed Workflows for Apache Airflow (Amazon MWAA), and AWS Step Functions, automating and facilitating the orchestration of ETL steps.

Solution overview

The following architecture shows how an AWS Glue Python shell job migrates the data from Snowflake to Amazon Redshift in this solution.

Architecture

The solution is comprised of two stages:

  • Extract – The first part of the solution extracts data from Snowflake into an Amazon Simple Storage Service (Amazon S3) data lake
  • Load – The second part of the solution reads the data from the same S3 bucket and loads it into Amazon Redshift

For both stages, we connect the AWS Glue Python shell jobs to Snowflake and Amazon Redshift using database connectors for Python. The first AWS Glue Python shell job reads a SQL file from an S3 bucket to run the relevant COPY commands on the Snowflake database using Snowflake compute capacity and parallelism to migrate the data to Amazon S3. When this is complete, the second AWS Glue Python shell job reads another SQL file, and runs the corresponding COPY commands on the Amazon Redshift database using Redshift compute capacity and parallelism to load the data from the same S3 bucket.

Both jobs are orchestrated using AWS Glue workflows, as shown in the following screenshot. The workflow pushes data processing logic down to the respective data warehouses by running COPY commands on the databases themselves, minimizing the processing capacity required by AWS Glue to just the resources needed to run the Python scripts. The COPY commands load data in parallel both to and from Amazon S3, providing one of the fastest and most scalable mechanisms to transfer data from Snowflake to Amazon Redshift.

Because all heavy lifting around data processing is pushed down to the data warehouses, this solution is designed to provide a cost-optimized and highly performant mechanism to migrate a large volume of data from Snowflake to Amazon Redshift with ease.

Glue Workflow

The entire solution is packaged in an AWS CloudFormation template for simplicity of deployment and automatic provisioning of most of the required resources and permissions.

The high-level steps to implement the solution are as follows:

  1. Generate the Snowflake SQL file.
  2. Deploy the CloudFormation template to provision the required resources and permissions.
  3. Provide Snowflake access to newly created S3 bucket.
  4. Run the AWS Glue workflow to migrate the data.

Prerequisites

Before you get started, you can optionally build the latest version of the Snowflake Connector for Python package locally and generate the wheel (.whl) package. For instructions, refer to How to build.

If you don’t provide the latest version of the package, the CloudFormation template uses a pre-built .whl file that may not be on the most current version of Snowflake Connector for Python.

By default, the CloudFormation template migrates data from all tables in the TPCH_SF1 schema of the SNOWFLAKE_SAMPLE_DATA database, which is a sample dataset provided by Snowflake when an account is created. The following stored procedure is used to dynamically generate the Snowflake COPY commands required to migrate the dataset to Amazon S3. It accepts the database name, schema name, and stage name as the parameters.

CREATE OR REPLACE PROCEDURE generate_copy(db_name VARCHAR, schema_name VARCHAR, stage_name VARCHAR)
   returns varchar not null
   language javascript
   as
   $$
var return_value = "";
var sql_query = "select table_catalog, table_schema, lower(table_name) as table_name from " + DB_NAME + ".information_schema.tables where table_schema = '" + SCHEMA_NAME + "'" ;
   var sql_statement = snowflake.createStatement(
          {
          sqlText: sql_query
          }
       );
/* Creates result set */
var result_scan = sql_statement.execute();
while (result_scan.next())  {
       return_value += "\n";
       return_value += "COPY INTO @"
       return_value += STAGE_NAME
       return_value += "/"
       return_value += result_scan.getColumnValue(3);
       return_value += "/"
       return_value += "\n";
       return_value += "FROM ";
       return_value += result_scan.getColumnValue(1);
       return_value += "." + result_scan.getColumnValue(2);
       return_value += "." + result_scan.getColumnValue(3);
       return_value += "\n";
       return_value += "FILE_FORMAT = (TYPE = CSV FIELD_DELIMITER = '|' COMPRESSION = GZIP)";
       return_value += "\n";
       return_value += "OVERWRITE = TRUE;"
       return_value += "\n";
       }
return return_value;
$$
;

Deploy the required resources and permissions using AWS CloudFormation

You can use the provided CloudFormation template to deploy this solution. This template automatically provisions an Amazon Redshift cluster with your desired configuration in a private subnet, maintaining a high standard of security.

  1. Sign in to the AWS Management Console, preferably as admin user.
  2. Select your desired Region, preferably the same Region where your Snowflake instance is provisioned.
  3. Choose Launch Stack:
  4. Choose Next.
  5. For Stack name, enter a meaningful name for the stack, for example, blog-resources.

The Parameters section is divided into two subsections: Source Snowflake Infrastructure and Target Redshift Configuration.

  1. For Snowflake Unload SQL Script, it defaults to S3 location (URI) of a SQL file which migrates the sample data in the TPCH_SF1 schema of the SNOWFLAKE_SAMPLE_DATA database.
  2. For Data S3 Bucket, enter a prefix for the name of the S3 bucket that is automatically provisioned to stage the Snowflake data, for example, sf-migrated-data.
  3. For Snowflake Driver, if applicable, enter the S3 location (URI) of the .whl package built earlier as a prerequisite. By default, it uses a pre-built .whl file.
  4. For Snowflake Account Name, enter your Snowflake account name.

You can use the following query in Snowflake to return your Snowflake account name:

SELECT CURRENT_ACCOUNT();
  1. For Snowflake Username, enter your user name to connect to the Snowflake account.
  2. For Snowflake Password, enter the password for the preceding user.
  3. For Snowflake Warehouse Name, enter the warehouse name for running the SQL queries.

Make sure the aforementioned user has access to the warehouse.

  1. For Snowflake Database Name, enter the database name. The default is SNOWFLAKE_SAMPLE_DATA.
  2. For Snowflake Schema Name, enter schema name. The default is TPCH_SF1.

CFN Param Snowflake

  1. For VPC CIDR Block, enter the desired CIDR block of Redshift cluster. The default is 10.0.0.0/16.
  2. For Subnet 1 CIDR Block, enter the CIDR block of the first subnet. The default is 10.0.0.0/24.
  3. For Subnet 2 CIDR Block, enter the CIDR block of the first subnet. The default is 10.0.1.0/24.
  4. For Redshift Load SQL Script, it defaults to S3 location (URI) of a SQL file which migrates the sample data in S3 to Redshift.

The following database view in Redshift is used to dynamically generate Redshift COPY commands required to migrate the dataset from Amazon S3. It accepts the schema name as the filter criteria.

CREATE OR REPLACE VIEW v_generate_copy
AS
SELECT
    schemaname ,
    tablename  ,
    seq        ,
    ddl
FROM
    (
        SELECT
            table_id   ,
            schemaname ,
            tablename  ,
            seq        ,
            ddl
        FROM
            (
                --COPY TABLE
                SELECT
                    c.oid::bigint  as table_id   ,
                    n.nspname      AS schemaname ,
                    c.relname      AS tablename  ,
                    0              AS seq        ,
                    'COPY ' + n.nspname + '.' + c.relname + ' FROM ' AS ddl
                FROM
                    pg_namespace AS n
                INNER JOIN
                    pg_class AS c
                ON
                    n.oid = c.relnamespace
                WHERE
                    c.relkind = 'r'
                --COPY TABLE continued                
                UNION                
                SELECT
                    c.oid::bigint as table_id   ,
                    n.nspname     AS schemaname ,
                    c.relname     AS tablename  ,
                    2             AS seq        ,
                    '''${' + '2}' + c.relname + '/'' iam_role ''${' + '1}'' gzip delimiter ''|'' EMPTYASNULL REGION ''us-east-1''' AS ddl
                FROM
                    pg_namespace AS n
                INNER JOIN
                    pg_class AS c
                ON
                    n.oid = c.relnamespace
                WHERE
                    c.relkind = 'r'
                --END SEMICOLON                
                UNION                
                SELECT
                    c.oid::bigint as table_id  ,
                    n.nspname     AS schemaname,
                    c.relname     AS tablename ,
                    600000005     AS seq       ,
                    ';'           AS ddl
                FROM
                    pg_namespace AS n
                INNER JOIN
                    pg_class AS c
                ON
                    n.oid = c.relnamespace
                WHERE
                    c.relkind = 'r' 
             )
        ORDER BY
            table_id  ,
            schemaname,
            tablename ,
            seq 
    );

SELECT ddl
FROM v_generate_copy
WHERE schemaname = 'tpch_sf1';
  1. For Redshift Database Name, enter your desired database name, for example, dev.
  2. For Number of Redshift Nodes, enter the desired compute nodes, for example, 2.
  3. For Redshift Node Type, choose the desired node type, for example, ra3.4xlarge.
  4. For Redshift Password, enter your desired password with the following constraints: it must be 8–64 characters in length, and contain at least one uppercase letter, one lowercase letter, and one number.
  5. For Redshift Port, enter the Amazon Redshift port number to connect to. The default port is 5439.

CFN Param Redshift 1 CFN Param Redshift 2

  1. Choose Next.
  2. Review and choose Create stack.

It takes around 5 minutes for the template to finish creating all resources and permissions. Most of the resources have the prefix of the stack name you specified for easy identification of the resources later. For more details on the deployed resources, see the appendix at the end of this post.

Create an IAM role and external Amazon S3 stage for Snowflake access to the data S3 bucket

In order for Snowflake to access the TargetDataS3Bucket created earlier by CloudFormation template, you must create an AWS Identity and Access Management (IAM) role and external Amazon S3 stage for Snowflake access to the S3 bucket. For instructions, refer to Configuring Secure Access to Amazon S3.

When you create an external stage in Snowflake, use the value for TargetDataS3Bucket on the Outputs tab of your deployed CloudFormation stack for the Amazon S3 URL of your stage.

CF Output

Make sure to name the external stage unload_to_s3 if you’re migrating the sample data using the default scripts provided in the CloudFormation template.

Convert Snowflake tables to Amazon Redshift

You can simply run the following DDL statements to create TPCH_SF1 schema objects in Amazon Redshift. You can also use AWS Schema Conversion Tool (AWS SCT) to convert Snowflake custom objects to Amazon Redshift. For instructions on converting your schema, refer to Accelerate Snowflake to Amazon Redshift migration using AWS Schema Conversion Tool.

CREATE SCHEMA TPCH_SF1;
SET SEARCH_PATH to TPCH_SF1;
CREATE TABLE customer (
  c_custkey int8 not null ,
  c_name varchar(25) not null,
  c_address varchar(40) not null,
  c_nationkey int4 not null,
  c_phone char(15) not null,
  c_acctbal numeric(12,2) not null,
  c_mktsegment char(10) not null,
  c_comment varchar(117) not null,
  Primary Key(C_CUSTKEY)
) ;

CREATE TABLE lineitem (
  l_orderkey int8 not null ,
  l_partkey int8 not null,
  l_suppkey int4 not null,
  l_linenumber int4 not null,
  l_quantity numeric(12,2) not null,
  l_extendedprice numeric(12,2) not null,
  l_discount numeric(12,2) not null,
  l_tax numeric(12,2) not null,
  l_returnflag char(1) not null,
  l_linestatus char(1) not null,
  l_shipdate date not null ,
  l_commitdate date not null,
  l_receiptdate date not null,
  l_shipinstruct char(25) not null,
  l_shipmode char(10) not null,
  l_comment varchar(44) not null,
  Primary Key(L_ORDERKEY, L_LINENUMBER)
)  ;

CREATE TABLE nation (
  n_nationkey int4 not null,
  n_name char(25) not null ,
  n_regionkey int4 not null,
  n_comment varchar(152) not null,
  Primary Key(N_NATIONKEY)                                
) ;

CREATE TABLE orders (
  o_orderkey int8 not null,
  o_custkey int8 not null,
  o_orderstatus char(1) not null,
  o_totalprice numeric(12,2) not null,
  o_orderdate date not null,
  o_orderpriority char(15) not null,
  o_clerk char(15) not null,
  o_shippriority int4 not null,
  o_comment varchar(79) not null,
  Primary Key(O_ORDERKEY)
) ;

CREATE TABLE part (
  p_partkey int8 not null ,
  p_name varchar(55) not null,
  p_mfgr char(25) not null,
  p_brand char(10) not null,
  p_type varchar(25) not null,
  p_size int4 not null,
  p_container char(10) not null,
  p_retailprice numeric(12,2) not null,
  p_comment varchar(23) not null,
  PRIMARY KEY (P_PARTKEY)
) ;

CREATE TABLE partsupp (
  ps_partkey int8 not null,
  ps_suppkey int4 not null,
  ps_availqty int4 not null,
  ps_supplycost numeric(12,2) not null,
  ps_comment varchar(199) not null,
  Primary Key(PS_PARTKEY, PS_SUPPKEY)
) ;

CREATE TABLE region (
  r_regionkey int4 not null,
  r_name char(25) not null ,
  r_comment varchar(152) not null,
  Primary Key(R_REGIONKEY)                             
) ;

CREATE TABLE supplier (
  s_suppkey int4 not null,
  s_name char(25) not null,
  s_address varchar(40) not null,
  s_nationkey int4 not null,
  s_phone char(15) not null,
  s_acctbal numeric(12,2) not null,
  s_comment varchar(101) not null,
  Primary Key(S_SUPPKEY)
);

Run an AWS Glue workflow for data migration

When you’re ready to start the data migration, complete the following steps:

  1. On the AWS Glue console, choose Workflows in the navigation pane.
  2. Select the workflow to run (<stack name>snowflake-to-redshift-migration).
  3. On the Actions menu, choose Run.Glue Workflow Run
  4. To check the status of the workflow, choose the workflow and on the History tab, select the Run ID and choose View run details.
    Glue Workflow Status
  5. When the workflow is complete, navigate to the Amazon Redshift console and launch the Amazon Redshift query editor v2 to verify the successful migration of data.
  6. Run the following query in Amazon Redshift to get row counts of all tables migrated from Snowflake to Amazon Redshift. Make sure to adjust the table_schema value accordingly if you’re not migrating the sample data.
SELECT tab.table_schema,
       tab.table_name,
       nvl(tinf.tbl_rows,0) tbl_rows,
       nvl(tinf.size,0) size
FROM svv_tables tab
LEFT JOIN svv_table_info tinf 
          on tab.table_schema = tinf.schema 
          and tab.table_name = tinf.”table”
WHERE tab.table_type = 'BASE TABLE'
      and tab.table_schema in ('tpch_sf1')
ORDER BY tbl_rows;

Redshift Editor

  1. Run the following query in Snowflake to compare and validate the data:
USE DATABASE snowflake_sample_data;
SELECT  TABLE_CATALOG,
        TABLE_SCHEMA,
        TABLE_NAME,
        ROW_COUNT,
        BYTES AS SIZE,
        COMMENT
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = 'TPCH_SF1'
ORDER BY ROW_COUNT;

Snowflake Editor

Clean up

To avoid incurring future charges, delete the resources you created as part of the CloudFormation stack by navigating to the AWS CloudFormation console, selecting the stack blog-resources, and choosing Delete.

Conclusion

In this post, we discussed how to perform an efficient, fast, and cost-effective migration from Snowflake to Amazon Redshift. Migrations from one data warehouse environment to another can typically be very time-consuming and resource-intensive; this solution uses the power of cloud-based compute by pushing down the processing to the respective warehouses. Orchestrating this migration with the AWS Glue Python shell provides additional cost optimization.

With this solution, you can facilitate your migration from Snowflake to Amazon Redshift. If you’re interested in further exploring the potential of using Amazon Redshift, please reach out to your AWS Account Team for a proof of concept.

Appendix: Resources deployed by AWS CloudFormation

The CloudFormation stack deploys the following resources in your AWS account:

  • Networking resourcesAmazon Virtual Private Cloud (Amazon VPC), subnets, ACL, and security group.
  • Amazon S3 bucket – This is referenced as TargetDataS3Bucket on the Outputs tab of the CloudFormation stack. This bucket holds the data being migrated from Snowflake to Amazon Redshift.
  • AWS Secrets Manager secrets – Two secrets in AWS Secrets Manager store credentials for Snowflake and Amazon Redshift.
  • VPC endpoints – The two VPC endpoints are deployed to establish a private connection from VPC resources like AWS Glue to services that run outside of the VPC, such as Secrets Manager and Amazon S3.
  • IAM roles – IAM roles for AWS Glue, Lambda, and Amazon Redshift. If the CloudFormation template is to be deployed in a production environment, you need to adjust the IAM policies so they’re not as permissive as presented in this post (which were set for simplicity and demonstration). Particularly, AWS Glue and Amazon Redshift don’t require all the actions granted in the *FullAccess policies, which would be considered overly permissive.
  • Amazon Redshift cluster – An Amazon Redshift cluster is created in a private subnet, which isn’t publicly accessible.
  • AWS Glue connection – The connection for Amazon Redshift makes sure that the AWS Glue job runs within the same VPC as Amazon Redshift. This also ensures that AWS Glue can access the Amazon Redshift cluster in a private subnet.
  • AWS Glue jobs – Two AWS Glue Python shell jobs are created:
    • <stack name>-glue-snowflake-unload – The first job runs the SQL scripts in Snowflake to copy data from the source database to Amazon S3. The Python script is available in S3. The Snowflake job accepts two parameters:
      • SQLSCRIPT – The Amazon S3 location of the SQL script to run in Snowflake to migrate data to Amazon S3. This is referenced as the Snowflake Unload SQL Script parameter in the input section of the CloudFormation template.
      • SECRET – The Secrets Manager ARN that stores Snowflake connection details.
    • <stack name>-glue-redshift-load – The second job runs another SQL script in Amazon Redshift to copy data from Amazon S3 to the target Amazon Redshift database. The Python script link is available in S3. The Amazon Redshift job accepts three parameters:
      • SQLSCRIPT – The Amazon S3 location of the SQL script to run in Amazon Redshift to migrate data from Amazon S3. If you provide custom SQL script to migrate the Snowflake data to Amazon S3 (as mentioned in the prerequisites), the file location is referenced as LoadFileLocation on the Outputs tab of the CloudFormation stack.
      • SECRET – The Secrets Manager ARN that stores Amazon Redshift connection details.
      • PARAMS – This includes any additional parameters required for the SQL script, including the Amazon Redshift IAM role used in the COPY commands and the S3 bucket staging the Snowflake data. Multiple parameter values can be provided separated by a comma.
  • AWS Glue workflow – The orchestration of Snowflake and Amazon Redshift AWS Glue Python shell jobs is managed via an AWS Glue workflow. The workflow <stack name>snowflake-to-redshift-migration runs later for actual migration of data.

About the Authors

Raks KhareRaks Khare is an Analytics Specialist Solutions Architect at AWS based out of Pennsylvania. He helps customers architect data analytics solutions at scale on the AWS platform.

Julia BeckJulia Beck is an Analytics Specialist Solutions Architect at AWS. She supports customers in validating analytics solutions by architecting proof of concept workloads designed to meet their specific needs.

Orchestrating AWS Glue crawlers using AWS Step Functions

Post Syndicated from Benjamin Smith original https://aws.amazon.com/blogs/compute/orchestrating-aws-glue-crawlers-using-aws-step-functions/

This blog post is written by Justin Callison, General Manager, AWS Workflow.

Organizations generate terabytes of data every day in a variety of semistructured formats. AWS Glue and Amazon Athena can give you a simpler and more cost-effective way to analyze this data with no infrastructure to manage. AWS Glue crawlers identify the schema of your data and manage the metadata required to analyze the data in place, without the need to transform this data and load into a data warehouse.

The timing of when your crawlers run and complete is important. You must ensure the crawler runs after your data has updated and before you query it with Athena or analyze with an AWS Glue job. If not, your analysis may experience errors or return incomplete results.

In this blog, you learn how to use AWS Step Functions, a low-code visual workflow service that integrates with over 220 AWS services. The service orchestrates your crawlers to control when they start, confirm completion, and combine them into end-to-end, serverless data processing workflows.

Using Step Functions to orchestrate multiple AWS Glue crawlers, provides a number of benefits when compared to implementing a solution directly with code. Firstly, the workflow provides an instant visual understanding of the application, and any errors that might occur during execution. Step Functions’ ability to run nested workflows inside a Map state helps to decouple and reuse application components with native array iteration. Finally, the Step Functions wait state lets the workflow periodically poll the status of the crawl job, without incurring additional cost for idol wait time.

Deploying the example

With this example, you create three datasets in Amazon S3, then use Step Functions to orchestrate AWS Glue crawlers to analyze the datasets and make them available to query using Athena.

You deploy the example with AWS CloudFormation using the following steps:

  1. Download the template.yaml file from here.
  2. Log in to the AWS Management Console and go to AWS CloudFormation.
  3. Navigate to Stacks -> Create stack and select With new resources (standard).
  4. Select Template is ready and Upload a template file, then Choose File and select the template.yaml file that you downloaded in Step 1 and choose Next.
  5. Enter a stack name, such as glue-stepfunctions-demo, and choose Next.
  6. Choose Next, check the acknowledgement boxes in the Capabilities and transforms section, then choose Create stack.
  7. After deployment, the status updates to CREATE_COMPLETE.

Create your datasets

Navigate to Step Functions in the AWS Management Console and select the create-dataset state machine from the list. This state machine uses Express Workflows and the Parallel state to build three datasets concurrently in S3. The first two datasets include information by user and location respectively and include files per day over the 5-year period from 2016 to 2020. The third dataset is a simpler, all-time summary of data by location.

To create the datasets, you choose Start execution from the toolbar for the create-dataset state machine, then choose Start execution again in the dialog box. This runs the state machine and creates the datasets in S3.

Navigate to the S3 console and view the glue-demo-databucket created for this example. In this bucket, in a folder named data, there are three subfolders, each containing a dataset.

The all-time-location-summaries folder contains a set of JSON files, one for each location.

The daily-user-summaries and daily-location-summaries contain a folder structure with nested folders for each year, month, and date. In addition to making this data easier to navigate via the console, this folder structure provides hints to AWS Glue that it can use to partition this dataset and make it more efficient to query.

Crawling

You now use AWS Glue crawlers to analyze these datasets and make them available to query. Navigate to the AWS Glue console, select Crawlers to see the list of Crawlers that you created when you deployed this example. Select the daily-user-summaries crawler to view details and note that they have tags assigned to indicate metadata such as the datatype of the data and whether the dataset is-partitioned.

Now, return to the Step Functions console and view the run-crawlers-with-tags state machine. This state machine uses AWS SDK service integrations to get a list of all crawlers matching the tag criteria you enter. It then uses the map state and the optimized service integration for Step Functions to execute the run-crawler state machine for each of the matching crawlers concurrently. The run-crawler state machine starts each crawler and monitors status until the crawler completes. Once each of the individual crawlers have completed, the run-crawlers-with-tags state machine also completes.

To initiate the crawlers:

  1. Choose Start execution from the top of the page when viewing the run-crawlers-with-tags state machine
  2. Provide the following as Input
    {"tags": {"datatype": "json"}}
  3. Choose Start execution.

After 2-3 minutes, the execution finishes with a Succeeded status once all three crawlers have completed. During this time, you can navigate to the run-crawler state machine to view the individual, nested executions per crawler or to the AWS Glue console to see the status of the crawlers.

Querying the data using Amazon Athena

Now, navigate to the Athena console where you can see the database and tables created by your crawlers. Note that AWS Glue recognized the partitioning scheme and included fields for year, month, and date in addition to user and usage fields for the data contained in the JSON files.

If you have not used Athena in this account before, you see a message instructing you to set a query result location. Choose View settings -> Manage -> Browse S3 and select the athena-results bucket that you created when you deployed the example. Choose Save then return to the Editor to continue.

You can now run queries such as the following, to calculate the total usage for all users over 5 years.

SELECT SUM(usage) all_time_usage FROM “daily_user_summaries”

You can also add filters, as shown in the following example, which limit results to those from 2016.

SELECT SUM(usage) all_time_usage FROM “daily_user_summaries” WHERE year = ‘2016’

Note this second query scanned only 17% as much data (133 KB vs 797 KB) and completed faster. This is because Athena used the partitioning information to avoid querying the full dataset. While the differences in this example are small, for real-world datasets with terabytes of data, your cost and latency savings from partitioning data can be substantial.

The disadvantage of a partitioning scheme is that new folders are not included in query results until you add new partitions. Re-running your crawler identifies and adds the new partitions and using Step Functions to orchestrate these crawlers makes that task simpler.

Extending the example

You can use these example state machines as they are in your AWS accounts to manage your existing crawlers. You can use Amazon S3 event notifications with Amazon EventBridge to trigger crawlers based on data changes. With the Optimized service integration for Amazon Athena, you can extend your workflows to execute queries against these crawled datasets. And you can use these examples to integrate crawler execution into your end-to-end data processing workflows, creating reliable, auditable workflows from ingestion through to analysis.

Conclusion

In this blog post, you learn how to use Step Functions to orchestrate AWS Glue crawlers. You deploy an example that generates three datasets, then uses Step Functions to start and coordinate crawler runs that analyze this data and make it available to query using Athena.

To learn more about Step Functions, visit Serverless Land.

Accelerate Amazon DynamoDB data access in AWS Glue jobs using the new AWS Glue DynamoDB Export connector

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/accelerate-amazon-dynamodb-data-access-in-aws-glue-jobs-using-the-new-aws-glue-dynamodb-elt-connector/

Modern data architectures encourage the integration of data lakes, data warehouses, and purpose-built data stores, enabling unified governance and easy data movement. With a modern data architecture on AWS, you can store data in a data lake and use a ring of purpose-built data services around the lake, allowing you to make decisions with speed and agility.

To achieve a modern data architecture, AWS Glue is the key service that integrates data over a data lake, data warehouse, and purpose-built data stores. AWS Glue simplifies data movement like inside-out, outside-in, or around the perimeter. A powerful purpose-built data store is Amazon DynamoDB, which is widely used by hundreds of thousands of companies, including Amazon.com. It’s common to move data from DynamoDB to a data lake built on top of Amazon Simple Storage Service (Amazon S3). Many customers move data from DynamoDB to Amazon S3 using AWS Glue extract, transform, and load (ETL) jobs.

Today, we’re pleased to announce the general availability of a new AWS Glue DynamoDB export connector. It’s built on top of the DynamoDB table export feature. It’s a scalable and cost-efficient way to read large DynamoDB table data in AWS Glue ETL jobs. This post describes the benefit of this new export connector and its use cases.

The following are typical use cases to read from DynamoDB tables using AWS Glue ETL jobs:

  • Move the data from DynamoDB tables to different data stores
  • Integrate the data with other services and applications
  • Retain historical snapshots for auditing
  • Build an S3 data lake from the DynamoDB data and analyze the data from various services, such as Amazon Athena, Amazon Redshift, and Amazon SageMaker

The new AWS Glue DynamoDB export connector

The old version of the AWS Glue DynamoDB connector reads DynamoDB tables through the DynamoDB Scan API. Instead, the new AWS Glue DynamoDB export connector reads DynamoDB data from the snapshot, which is exported from DynamoDB tables. This approach has following benefits:

  • It doesn’t consume read capacity units of the source DynamoDB tables
  • The read performance is consistent for large DynamoDB tables

Especially for large DynamoDB tables more than 100 GB, this new connector is significantly faster than the traditional connector.

To use this new export connector, you need to enable point-in-time recovery (PITR) for the source DynamoDB table in advance.

How to use the new connector on AWS Glue Studio Visual Editor

AWS Glue Studio Visual Editor is a graphical interface that makes it easy to create, run, and monitor AWS Glue ETL jobs in AWS Glue. The new DynamoDB export connector is available on AWS Glue Studio Visual Editor. You can choose Amazon DynamoDB as the source.

After you choose Create, you see the visual Directed Acyclic Graph (DAG). Here, you can choose your DynamoDB table that exists in this account or Region. This allows you to select DynamoDB tables (with PITR enabled) directly as a source in AWS Glue Studio. This provides a one-click export from any of your DynamoDB tables to Amazon S3. You can also easily add any data sources and targets or transformations to the DAG. For example, it allows you to join two different DynamoDB tables and export the result to Amazon S3, as shown in the following screenshot.

The following two connection options are automatically added. This location is used to store temporary data during the DynamoDB export phase. You can set S3 bucket lifecycle policies to expire temporary data.

  • dynamodb.s3.bucket – The S3 bucket to store temporary data during DynamoDB export
  • dynamodb.s3.prefix – The S3 prefix to store temporary data during DynamoDB export

How to use the new connector on the job script code

You can use the new export connector when you create an AWS Glue DynamicFrame in the job script code by configuring the following connection options:

  • dynamodb.export – (Required) You need to set this to ddb or s3
  • dynamodb.tableArn – (Required) Your source DynamoDB table ARN
  • dynamodb.unnestDDBJson – (Optional) If set to true, performs an unnest transformation of the DynamoDB JSON structure that is present in exports. The default value is false.
  • dynamodb.s3.bucket – (Optional) The S3 bucket to store temporary data during DynamoDB export
  • dynamodb.s3.prefix – (Optional) The S3 prefix to store temporary data during DynamoDB export

The following is the sample Python code to create a DynamicFrame using the new export connector:

dyf = glue_context.create_dynamic_frame.from_options(
    connection_type="dynamodb",
    connection_options={
        "dynamodb.export": "ddb",
        "dynamodb.tableArn": "test_source",
        "dynamodb.unnestDDBJson": True,
        "dynamodb.s3.bucket": "bucket name",
        "dynamodb.s3.prefix": "bucket prefix"
    }
)

The new export connector doesn’t require configurations related to AWS Glue job parallelism, unlike the old connector. Now you no longer need to change the configuration when you scale out the AWS Glue job. It also doesn’t require any configuration regarding DynamoDB table read/write capacity and its capacity mode (on demand or provisioned).

DynamoDB table schema handling

By default, the new export connector reads data in DynamoDB JSON structure that is present in exports. The following is an example schema of the frame using the Amazon Customer Review Dataset:

root
|-- Item: struct (nullable = true)
| |-- product_id: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- review_id: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- total_votes: struct (nullable = true)
| | |-- N: string (nullable = true)
| |-- product_title: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- star_rating: struct (nullable = true)
| | |-- N: string (nullable = true)
| |-- customer_id: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- marketplace: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- helpful_votes: struct (nullable = true)
| | |-- N: string (nullable = true)
| |-- review_headline: struct (nullable = true)
| | |-- S: string (nullable = true)
| | |-- NULL: boolean (nullable = true)
| |-- review_date: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- vine: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- review_body: struct (nullable = true)
| | |-- S: string (nullable = true)
| | |-- NULL: boolean (nullable = true)
| |-- verified_purchase: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- product_category: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- year: struct (nullable = true)
| | |-- N: string (nullable = true)
| |-- product_parent: struct (nullable = true)
| | |-- S: string (nullable = true)

To read DynamoDB item columns without handling nested data, you can set dynamodb.unnestDDBJson to True. The following is an example of the schema of the same data where dynamodb.unnestDDBJson is set to True:

root
|-- product_id: string (nullable = true)
|-- review_id: string (nullable = true)
|-- total_votes: string (nullable = true)
|-- product_title: string (nullable = true)
|-- star_rating: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- marketplace: string (nullable = true)
|-- helpful_votes: string (nullable = true)
|-- review_headline: string (nullable = true)
|-- review_date: string (nullable = true)
|-- vine: string (nullable = true)
|-- review_body: string (nullable = true)
|-- verified_purchase: string (nullable = true)
|-- product_category: string (nullable = true)
|-- year: string (nullable = true)
|-- product_parent: string (nullable = true)

Data freshness

Data freshness is the measure of staleness of the data from the live tables in the original source. In the new export connecor, the option dynamodb.export impacts data freshness.

When dynamodb.export is set to ddb, the AWS Glue job invokes a new export and then reads the export placed in an S3 bucket into DynamicFrame. It reads exports of the live table, so data can be fresh. On the other hand, when dynamodb.export is set to s3, the AWS Glue job skips invoking a new export and directly reads an export already placed in an S3 bucket. It reads exports of the past table, so data can be stale, but you can reduce overhead to trigger the exports.

The following table explains the data freshness and pros and cons of each option.

.. dynamodb.export Config Data Freshness Data Source Pros Cons
New export connector s3 Stale Export of the past table
  • RCU is not consumed
  • Can skip triggering exports
  • Data can be stale
New export connector ddb Fresh Export of the live table
  • Data can be fresh
  • RCU is not consumed
  • Overhead to trigger exports and wait for completion
Old connector N/A Most fresh Scan of the live tables
  • Data can be fresh
  • Read capacity unit (RCU) is consumed

Performance

The following benchmark shows the performance improvements between the old version of the AWS Glue DynamoDB connector and the new export connector. The comparison uses the DynamoDB tables storing the TPC-DS benchmark dataset with different scales from 10 MB to 2 TB. The sample Spark job reads from the DynamoDB table and calculates the count of the items. All the Spark jobs are run on AWS Glue 3.0, G.2X, 60 workers.

The following chart compares AWS Glue job duration between the old connector and the new export connector. For small DynamoDB tables, the old connector is faster. For large tables more than 80 GB, the new export connector is faster. In other words, the DynamoDB export connector is recommended for jobs that take the old connector more than 5–10 minutes to run. Also, the chart shows that the duration of the new export connector increases slowly as data size increases, although the duration of the old connector increases rapidly as data size increases. This means that the new export connector is suitable especially for larger tables.

The following chart compares dollar cost between the old connector and the new export connector. It contains the AWS Glue DPU hour cost summed with the cost for reading data from DynamoDB. For the old connector, we include the read request cost. For the new export connector, we include the cost in the DynamoDB data export to Amazon S3. Both are calculated in DynamoDB on-demand capacity mode.

With AWS Glue Auto Scaling

AWS Glue Auto Scaling is a new feature to automatically resize computing resources for better performance at lower cost. You can take advantage of AWS Glue Auto Scaling with the new DynamoDB export connector.

As the following chart shows, with AWS Glue Auto Scaling, the duration of the new export connector is shorter than the old connector when the size of the source DynamoDB table is 100 GB or more. It shows a similar trend without AWS Glue Auto Scaling.

You get the cost benefits as only Spark driver is active for most of the time duration during the DynamoDB export (which is nearly 30% of the total job duration time with the old scan-based connector).

Conclusion

AWS Glue is a key service to integrate with multiple data stores. At AWS, we keep improving the performance and cost-efficiency of our services. In this post, we announced the availability of the new AWS Glue DynamoDB export connector. With this new connector, you can easily integrate your large data on DynamoDB tables with different data stores. It helps you read the large tables faster from AWS Glue jobs at lower cost.

The new AWS Glue DynamoDB export connector is now generally available in all supported Glue Regions. Let’s start using the new AWS Glue DynamoDB export connector today! We are looking forward to your feedback and stories on how you utilize the connector for your needs.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts that help customers build data lakes on the cloud.

Neil Gupta is a Software Development Engineer on the AWS Glue team. He enjoys tackling big data problems and learning more about distributed systems.

Andrew Kim is a Software Development Engineer on the AWS Glue team. His passion is to build scalable and effective solutions to challenging problems and working with distributed systems.

Savio Dsouza is a Software Development Manager on the AWS Glue team. His team works on distributed systems for efficiently managing data lakes on AWS and optimizing Apache Spark for performance and reliability.

Use the AWS Glue connector to read and write Apache Iceberg tables with ACID transactions and perform time travel

Post Syndicated from Tomohiro Tanaka original https://aws.amazon.com/blogs/big-data/use-the-aws-glue-connector-to-read-and-write-apache-iceberg-tables-with-acid-transactions-and-perform-time-travel/

Nowadays, many customers have built their data lakes as the core of their data analytic systems. In a typical use case of data lakes, many concurrent queries run to retrieve consistent snapshots of business insights by aggregating query results. A large volume of data constantly comes from different data sources into the data lakes. There is also a common demand to reflect the changes occurring in the data sources into the data lakes. This means that not only inserts but also updates and deletes need to be replicated into the data lakes.

Apache Iceberg provides the capability of ACID transactions on your data lakes, which allows concurrent queries to add or delete records isolated from any existing queries with read-consistency for queries. Iceberg is an open table format designed for large analytic workloads on huge datasets. You can perform ACID transactions against your data lakes by using simple SQL expressions. It also enables time travel, rollback, hidden partitioning, and schema evolution changes, such as adding, dropping, renaming, updating, and reordering columns.

AWS Glue is one of the key elements to building data lakes. It extracts data from multiple sources and ingests your data to your data lake built on Amazon Simple Storage Service (Amazon S3) using both batch and streaming jobs. To expand the accessibility of your AWS Glue extract, transform, and load (ETL) jobs to Iceberg, AWS Glue provides an Apache Iceberg connector. The connector allows you to build Iceberg tables on your data lakes and run Iceberg operations such as ACID transactions, time travel, rollbacks, and so on from your AWS Glue ETL jobs.

In this post, we give an overview of how to set up the Iceberg connector for AWS Glue and configure the relevant resources to use Iceberg with AWS Glue jobs. We also demonstrate how to run typical Iceberg operations on AWS Glue interactive sessions with an example use case.

Apache Iceberg connector for AWS Glue

With the Apache Iceberg connector for AWS Glue, you can take advantage of the following Iceberg capabilities:

  • Basic operations on Iceberg tables – This includes creating Iceberg tables in the AWS Glue Data Catalog and inserting, updating, and deleting records with ACID transactions in the Iceberg tables
  • Inserting and updating records – You can run UPSERT (update and insert) queries for your Iceberg table
  • Time travel on Iceberg tables – You can read a specific version of an Iceberg table from table snapshots that Iceberg manages
  • Rollback of table versions – You can revert an Iceberg table back to a specific version of the table

Iceberg offers additional useful capabilities such as hidden partitioning; schema evolution with add, drop, update, and rename support; automatic data compaction; and more. For more details about Iceberg, refer to the Apache Iceberg documentation.

Next, we demonstrate how the Apache Iceberg connector for AWS Glue works for each Iceberg capability based on an example use case.

Overview of example customer scenario

Let’s assume that an ecommerce company sells products on their online platform. Customers can buy products and write reviews to each product. Customers can add, update, or delete their reviews at any time. The customer reviews are an important source for analyzing customer sentiment and business trends.

In this scenario, we have the following teams in our organization:

  • Data engineering team – Responsible for building and managing data platforms.
  • Data analyst team – Responsible for analyzing customer reviews and creating business reports. This team queries the reviews daily, creates a business intelligence (BI) report, and shares it with sales team.
  • Customer support team – Responsible for replying to customer inquiries. This team queries the reviews when they get inquiries about the reviews.

Our solution has the following requirements:

  • Query scalability is important because the website is huge.
  • Individual customer reviews can be added, updated, and deleted.
  • The data analyst team needs to use both notebooks and ad hoc queries for their analysis.
  • The customer support team sometimes needs to view the history of the customer reviews.
  • Customer reviews can always be added, updated, and deleted, even while one of the teams is querying the reviews for analysis. This means that any result in a query isn’t affected by uncommitted customer review write operations.
  • Any changes in customer reviews that are made by the organization’s various teams need to be reflected in BI reports and query results.

In this post, we build a data lake of customer review data on top of Amazon S3. To meet these requirements, we introduce Apache Iceberg to enable adding, updating, and deleting records; ACID transactions; and time travel queries. We also use an AWS Glue Studio notebook to integrate and query the data at scale. First, we set up the connector so we can create an AWS Glue connection for Iceberg.

Set up the Apache Iceberg connector and create the Iceberg connection

We first set up Apache Iceberg connector for AWS Glue to use Apache Iceberg with AWS Glue jobs. Particularly, in this section, we set up the Apache Iceberg connector for AWS Glue and create an AWS Glue job with the connector. Complete the following steps:

  1. Navigate to the Apache Iceberg connector for AWS Glue page in AWS Marketplace.
  2. Choose Continue to Subscribe.

  1. Review the information under Terms and Conditions, and choose Accept Terms to continue.

  1. When the subscription is complete, choose Continue to Configuration.

  1. For Fulfillment option, choose Glue 3.0. (1.0 and 2.0 are also available options.)
  2. For Software version, choose the latest software version.

As of this writing, 0.12.0-2 is the latest version of the Apache Iceberg connector for AWS Glue.

  1. Choose Continue to Launch.

  1. Choose Usage instructions.
  2. Choose Activate the Glue connector from AWS Glue Studio.

You’re redirected to AWS Glue Studio.

  1. For Name, enter a name for your connection (for example, iceberg-connection).

  1. Choose Create connection and activate connector.

A message appears that the connection was successfully added, and the connection is now visible on the AWS Glue Studio console.

Configure resources and permissions

We use a provided AWS CloudFormation template to set up Iceberg configuration for AWS Glue. AWS CloudFormation creates the following resources:

  • An S3 bucket to store an Iceberg configuration file and actual data
  • An AWS Lambda function to generate an Iceberg configuration file based on parameters provided by a user for the CloudFormation template, and to clean up the resources created through this post
  • AWS Identity and Access Management (IAM) roles and policies with necessary permissions
  • An AWS Glue database in the Data Catalog to register Iceberg tables

To deploy the CloudFormation template, complete the following steps:

  1. Choose Launch Stack:

Launch Button

  1. For DynamoDBTableName, enter a name for an Amazon DynamoDB table that is created automatically when AWS Glue creates an Iceberg table.

This table is used for an AWS Glue job to obtain a commit lock to avoid concurrently modifying records in Iceberg tables. For more details about commit locking, refer to DynamoDB for Commit Locking. Note that you shouldn’t specify the name of an existing table.

  1. For IcebergDatabaseName, enter a name for the AWS Glue database that is created in the Data Catalog and used for registering Iceberg tables.
  2. Choose Next.

  1. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  2. Choose Create stack.

Start an AWS Glue Studio notebook to use Apache Iceberg

After you launch the CloudFormation stack, you create an AWS Glue Studio notebook to perform Iceberg operations. Complete the following steps:

  1. Download the Jupyter notebook file.
  2. On the AWS Glue console, choose Jobs in the navigation pane.
  3. Under Create job, select Jupyter Notebook.

  1. Select Upload and edit an existing notebook and upload iceberg-with-glue.ipynb.

  1. Choose Create.
  2. For Job name, enter a name.
  3. For IAM role, choose IcebergConnectorGlueJobRole, which was created via the CloudFormation template.
  4. Choose Start notebook job.

The process takes a few minutes to complete, after which you can see an AWS Glue Studio notebook view.

  1. Choose Save to save the notebook.

Set up the Iceberg configuration

To set up the Iceberg configuration, complete the following steps:

  1. Run the following cells with multiple options (magics). Note that you set your connection name for the %connections magic in the cell.

For more information, refer to Configuring AWS Glue Interactive Sessions for Jupyter and AWS Glue Studio notebooks.

A message Session <session-id> has been created appears when your AWS Glue Studio notebook is ready.

In the last cell in this section, you load your Iceberg configuration, which you specified when launching the CloudFormation stack. The Iceberg configuration includes a warehouse path for Iceberg actual data, a DynamoDB table name for commit locking, a database name for your Iceberg tables, and more.

To load the configuration, set the S3 bucket name that was created via the CloudFormation stack.

  1. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  2. Choose the stack you created.
  3. On the Outputs tab, copy the S3 bucket name.

  1. Set the S3 name as the S3_BUCKET parameter in your notebook.

  1. Run the cell and load the Iceberg configuration that you set.

Initialize the job with Iceberg configurations

We continue to run cells to initiate a SparkSession in this section.

  1. Set an Iceberg warehouse path and a DynamoDB table name for Iceberg commit locking from the user_config parameter.
  2. Initialize a SparkSession by setting the Iceberg configurations.
  3. With the SparkSession object, create SparkContext and GlueContext objects.

The following screenshot shows the relevant section in the notebook.

We provide the details of each parameter that you configure for the SparkSession in the appendix of this post.

For this post, we demonstrate setting the Spark configuration for Iceberg. You can also set the configuration as AWS Glue job parameters. For more information, refer to the Usage Information section in the Iceberg connector product page.

Use case walkthrough

To walk through our use case, we use two tables; acr_iceberg and acr_iceberg_report. The table acr_iceberg contains the customer review data. The table acr_iceberg_report contains BI analysis results based on the customer review data. All changes to acr_iceberg also impact acr_iceberg_report. The table acr_iceberg_report needs to be updated daily, right before sharing business reports with stakeholders.

To demonstrate this use case, we walk through the following typical steps:

  1. A data engineering team registers the acr_iceberg and acr_iceberg_report tables in the Glue Data Catalog.
  2. Customers (ecommerce users) add reviews to products in the Industrial_Supplies category. These reviews are added to the Iceberg table.
  3. A customer requests to update their reviews. We simulate updating the customer review in the acr_iceberg table.
  4. We reflect the customer’s request of the updated review in acr_iceberg into acr_iceberg_report.
  5. We revert the customer’s request of the updated review for the customer review table acr_iceberg, and reflect the reversion in acr_iceberg_report.

1. Create Iceberg tables of customer reviews and BI reports

In this step, the data engineering team creates the acr_iceberg Iceberg table for customer reviews data (based on the Amazon Customer Reviews Dataset), and the team creates the acr_iceberg_report Iceberg table for BI reports.

Create the acr_iceberg table for customer reviews

The following code initially extracts the Amazon customer reviews, which are stored in a public S3 bucket. Then it creates an Iceberg table of the customer reviews and loads these reviews into your specified S3 bucket (created via CloudFormation stack). Note that the script loads partial datasets to avoid taking a lot of time to load the data.

# Loading the dataset and creating an Iceberg table. This will take about 3-5 minutes.
spark.read \
    .option('basePath', INPUT_BASE_PATH) \
    .parquet(*INPUT_CATEGORIES) \
    .writeTo(f'{CATALOG}.{DATABASE}.{TABLE}') \
    .tableProperty('format-version', '2') \
    .create()

Regarding the tableProperty parameter, we specify format version 2 to make the table version compatible with Amazon Athena. For more information about Athena support for Iceberg tables, refer to Considerations and limitations. To learn more about the difference between Iceberg table versions 1 and 2, refer to Appendix E: Format version changes.

Let’s run the following cells. Running the second cell takes around 3–5 minutes.

After you run the cells, the acr_iceberg table is available in your specified database in the Glue Data Catalog.

You can also see the actual data and metadata of the Iceberg table in the S3 bucket that is created through the CloudFormation stack. Iceberg creates the table and writes actual data and relevant metadata that includes table schema, table version information, and so on. See the following objects in your S3 bucket:

$ aws s3 ls 's3://your-bucket/data/' --recursive
YYYY-MM-dd hh:mm:ss   83616660 data/iceberg_blog_default.db/acr_iceberg/data/00000-44-c2983230-c43a-4f4a-9b89-1f7c13e59645-00001.parquet
YYYY-MM-dd hh:mm:ss   83247771 
...
YYYY-MM-dd hh:mm:ss       5134 data/iceberg_blog_default.db/acr_iceberg/metadata/00000-bc5d3ea2-280f-4e28-a71f-4c2b749ed637.metadata.json
YYYY-MM-dd hh:mm:ss     116950 data/iceberg_blog_default.db/acr_iceberg/metadata/411308cd-1f4d-4535-9444-f6b56a56697f-m0.avro
YYYY-MM-dd hh:mm:ss       3821 data/iceberg_blog_default.db/acr_iceberg/metadata/snap-6122957686233868728-1-411308cd-1f4d-4535-9444-f6b56a56697f.avro

The job tries to create a DynamoDB table, which you specified in the CloudFormation stack (in the following screenshot, its name is myGlueLockTable), if it doesn’t exist already. As we discussed earlier, the DynamoDB table is used for commit locking for Iceberg tables.

Create the acr_iceberg_report Iceberg table for BI reports

The data engineer team also creates the acr_iceberg_report table for BI reports in the Glue Data Catalog. This table initially has the following records.

comment_count avg_star product_category
1240 4.20729367860598 Camera
95 4.80167540490342 Industrial_Supplies
663 3.80123467540571 PC

To create the table, run the following cell.

The two Iceberg tables have been created. Let’s check the acr_iceberg table records by running a query.

Determine the average star rating for each product category by querying the Iceberg table

You can see the Iceberg table records by using a SELECT statement. In this section, we query the acr_iceberg table to simulate seeing a current BI report data by running an ad hoc query.

Run the following cell in the notebook to get the aggregated number of customer comments and mean star rating for each product_category.

The cell output has the following results.

Another way to query Iceberg tables is using Amazon Athena (when you use the Athena with Iceberg tables, you need to set up the Iceberg environment) or Amazon EMR.

2. Add customer reviews in the Iceberg table

In this section, customers add comments for some products in the Industrial Supplies product category, and we add these comments to the acr_iceberg table. To demonstrate this scenario, we create a Spark DataFrame based on the following new customer reviews and then add them to the table with an INSERT statement.

marketplace customer_id review_id product_id product_
parent
product_
title
star_
rating
helpful_
votes
total_
votes
vine verified_
purchase
review_
headline
review_
body
review_
date
year product_
category
US 12345689 ISB35E4556F144 I00EDBY7X8 989172340 plastic containers 5 0 0 N Y Five Stars Great product! 2022-02-01 2022 Industrial_
Supplies
US 78901234 IS4392CD4C3C4 I00D7JFOPC 952000001 battery tester 3 0 0 N Y nice one, but
it broke
some days later
nope 2022-02-01 2022 Industrial_
Supplies
US 12345123 IS97B103F8B24C I002LHA74O 818426953 spray bottle 2 1 1 N N Two Stars the bottle isn’t
as big as pictured.
2022-02-01 2022 Industrial_
Supplies
US 23000093 ISAB4268D46F3X I00ARPLCGY 562945918 3d printer 5 3 3 N Y Super great very useful 2022-02-01 2022 Industrial_
Supplies
US 89874312 ISAB4268137V2Y I80ARDQCY 564669018 circuit board 4 0 0 Y Y Great, but
a little bit expensive
you should buy this,
but note the price
2022-02-01 2022 Industrial_
Supplies

Run the following cells in the notebook to insert the customer comments to the Iceberg table. The process takes about 1 minute.

Run the next cell to see an addition to the product category Industrial_Supplies with 5 under comment_count.

3. Update a customer review in the Iceberg table

In the previous section, we added new customer reviews to the acr_iceberg Iceberg table. In this section, a customer requests an update of their review. Specifically, customer 78901234 requests the following update of the review ID IS4392CD4C3C4.

  • change star_rating from 3 to 5
  • update the review_headline from nice one, but it broke some days later to very good

We update the customer comment by using an UPDATE query by running the following cell.

We can review the updated record by running the next cell as follows.

Also, when you run this cell for the reporting table, you can see the updated avg_star column value for the Industrial_Supplies product category. Specifically, the avg_star value has been updated from 3.8 to 4.2 as a result of the star_rating changing from 3 to 5:

4. Reflect changes in the customer reviews table in the BI report table with a MERGE INTO query

In this section, we reflect the changes in the acr_iceberg table into the BI report table acr_iceberg_report. To do so, we run the MERGE INTO query and combine the two tables based on the condition of the product_category column in each table. This query works as follows:

  • When the product_category column in each table is the same, the query returns the sum of each column record
  • When the column in each table is not the same, the query just inserts a new record

This MERGE INTO operation is also referred to as an UPSERT (update and insert).

Run the following cell to reflect the update of customer reviews in the acr_iceberg table into the acr_iceberg_report BI table.

After the MERGE INTO query is complete, you can see the updated acr_iceberg_report table by running the following cell.

The MERGE INTO query performed the following changes:

  • In the Camera, Industrial_Supplies, and PC product categories, each comment_count is the sum between the initial value of the acr_iceberg_report table and the aggregated table value. For example, in the Industrial_Supplies product category row, the comment_count 100 is calculated by 95 (in the initial version of acr_iceberg_report) + 5 (in the aggregated report table).
  • In addition to comment_count, the avg_star in the Camera, Industrial_Supplies, or PC product category row is also computed by averaging between each avg_star value in acr_iceberg_report and in the aggregated table.
  • In other product categories, each comment_count and avg_star is the same as each value in the aggregated table, which means that each value in the aggregated table is inserted into the acr_iceberg_report table.

5. Roll back the Iceberg tables and reflect changes in the BI report table

In this section, the customer who requested the update of the review now requests to revert the updated review.

Iceberg stores versioning tables through the operations for Iceberg tables. We can see the information of each version of table by inspecting tables, and we can also time travel or roll back tables to an old table version.

To complete the customer request to revert the updated review, we need to revert the table version of acr_iceberg to the earlier version when we first added the reviews. Additionally, we need to update the acr_iceberg_report table to reflect the rollback of the acr_iceberg table version. Specifically, we need to perform the following three steps to complete these operations:

  1. Check the history of table changes of acr_iceberg and acr_iceberg_report to get each table snapshot.
  2. Roll back acr_iceberg to the version when first we inserted records, and also roll back the acr_iceberg_report table to the initial version to reflect the customer review update.
  3. Merge the acr_iceberg table with the acr_iceberg_report table again.

Get the metadata of each report table

As a first step, we check table versions by inspecting the table. Run the following cells.

Now you can see the following table versions in acr_iceberg and acr_iceberg_report:

  • acr_iceberg has three versions:
    • The oldest one is the initial version of this table, which shows the append operation
    • The second oldest one is the record insertion, which shows the append operation
    • The latest one is the update, which shows the overwrite operation
  • acr_iceberg_report has two versions:
    • The oldest one is the initial version of this table, which shows the append operation
    • The other one is from the MERGE INTO query in the previous section, which shows the overwrite operation

As shown in the following screenshot, we roll back to the acr_iceberg table version, inserting records based on the customer revert request. We also roll back to the acr_iceberg_report table version in the initial version to discard the MERGE INTO operation in the previous section.

Roll back the acr_iceberg and acr_iceberg_report tables

Based on your snapshot IDs, you can roll back each table version:

  • For acr_iceberg, use the second-oldest snapshot_id (in this example, 5440744662350048750) and replace <Type snapshot_id in ace_iceberg table> in the following cell with this snapshot_id.
  • For acr_iceberg_report table, use the initial snapshot_id (in this example, 7958428388396549892) and replace <Type snaphost_id in ace_iceberg_report table> in the following cell with this snapshot_id.

After you specify the snapshot_id for each rollback query, run the following cells.

When this step is complete, you can see the previous and current snapshot IDs of each table.

Each Iceberg table has been reverted to the specific version now.

Reflect changes in acr_iceberg into acr_iceberg_report again

We reflect the acr_iceberg table reversion into the current acr_iceberg_report table. To complete this, run the following cell.

After you rerun the MERGE INTO query, run the following cell to see the new table records. When we compare the table records, we observe that the avg_star value in Industrial_Supplies is lower than the value of the previous table avg_star.

You were able to reflect a customer’s request of reverting their updated review on the BI report table. Specifically, you can get the updated avg_star record in the Industrial_Supplies product category.

Clean up

To clean up all resources that you created, delete the CloudFormation stack.

Conclusion

In this post, we walked through using the Apache Iceberg connector with AWS Glue ETL jobs. We created an Iceberg table built on Amazon S3, and ran queries such as reading the Iceberg table data, inserting a record, merging two tables, and time travel.

The operations for the Iceberg table that we demonstrated in this post aren’t all of the operations Iceberg supports. Refer to the Apache Iceberg documentation for information about more operations.

Appendix: Spark configurations to use Apache Iceberg on AWS Glue

As we mentioned earlier, the notebook sets up a Spark configuration to integrate Iceberg with AWS Glue. The following table shows what each parameter defines.

Spark configuration key Value Description
spark.sql.catalog.{CATALOG} org.apache.iceberg.spark.SparkCatalog Specifies a Spark catalog interface that communicates with Iceberg tables.
spark.sql.catalog.{CATALOG}.warehouse {WAREHOUSE_PATH} A warehouse path for jobs to write iceberg metadata and actual data.
spark.sql.catalog.{CATALOG}.catalog-impl org.apache.iceberg.aws.
glue.GlueCatalog
The implementation of the Spark catalog class to communicate between Iceberg tables and the AWS Glue Data Catalog.
spark.sql.catalog.{CATALOG}.io-impl org.apache.iceberg.aws.s3.S3FileIO Used for Iceberg to communicate with Amazon S3.
spark.sql.catalog.{CATALOG}.lock-impl org.apache.iceberg.aws.glue.
DynamoLockManager
Used for Iceberg to manage table locks.
spark.sql.catalog.{CATALOG}.lock.table {DYNAMODB_TABLE} A DynamoDB table name to store table locks.
spark.sql.extensions org.apache.icerberg.spark.extensions.
IcebergSparkSessionExtensions
The implementation that enables Spark to run Iceberg-specific SQL commands.
spark.sql.session.timeZone UTC Sets the time zone of the Spark environment to UTC for further Iceberg time travel queries. The epoch time is in the UTC time zone.

About the Author

Tomohiro Tanaka is a Cloud Support Engineer at Amazon Web Services. He builds Glue connectors such as Apache Iceberg connector and TPC-DS connector. He’s passionate about helping customers build data lakes using ETL workloads. In his free time, he also enjoys coffee breaks with his colleagues and making coffee at home.