Tag Archives: Analytics

Build, Test and Deploy ETL solutions using AWS Glue and AWS CDK based CI/CD pipelines

Post Syndicated from Puneet Babbar original https://aws.amazon.com/blogs/big-data/build-test-and-deploy-etl-solutions-using-aws-glue-and-aws-cdk-based-ci-cd-pipelines/

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning (ML), and application development. It’s serverless, so there’s no infrastructure to set up or manage.

This post provides a step-by-step guide to build a continuous integration and continuous delivery (CI/CD) pipeline using AWS CodeCommit, AWS CodeBuild, and AWS CodePipeline to define, test, provision, and manage changes of AWS Glue based data pipelines using the AWS Cloud Development Kit (AWS CDK).

The AWS CDK is an open-source software development framework for defining cloud infrastructure as code using familiar programming languages and provisioning it through AWS CloudFormation. It provides you with high-level components called constructs that preconfigure cloud resources with proven defaults, cutting down boilerplate code and allowing for faster development in a safe, repeatable manner.

Solution overview

The solution constructs a CI/CD pipeline with multiple stages. The CI/CD pipeline constructs a data pipeline using COVID-19 Harmonized Data managed by Talend / Stitch. The data pipeline crawls the datasets provided by neherlab from the public Amazon Simple Storage Service (Amazon S3) bucket, exposes the public datasets in the AWS Glue Data Catalog so they’re available for SQL queries using Amazon Athena, performs ETL (extract, transform, and load) transformations to denormalize the datasets to a table, and makes the denormalized table available in the Data Catalog.

The solution is designed as follows:

  • A data engineer deploys the initial solution. The solution creates two stacks:
    • cdk-covid19-glue-stack-pipeline – This stack creates the CI/CD infrastructure as shown in the architectural diagram (labeled Tool Chain).
    • cdk-covid19-glue-stack – The cdk-covid19-glue-stack-pipeline stack deploys the cdk-covid19-glue-stack stack to create the AWS Glue based data pipeline as shown in the diagram (labeled ETL).
  • The data engineer makes changes on cdk-covid19-glue-stack (when a change in the ETL application is required).
  • The data engineer pushes the change to a CodeCommit repository (generated in the cdk-covid19-glue-stack-pipeline stack).
  • The pipeline is automatically triggered by the push, and deploys and updates all the resources in the cdk-covid19-glue-stack stack.

At the time of publishing of this post, the AWS CDK has two versions of the AWS Glue module: @aws-cdk/aws-glue and @aws-cdk/aws-glue-alpha, containing L1 constructs and L2 constructs, respectively. At this time, the @aws-cdk/aws-glue-alpha module is still in an experimental stage. We use the stable @aws-cdk/aws-glue module for the purpose of this post.

The following diagram shows all the components in the solution.

BDB-2467-architecture-diagram

Figure 1 – Architecture diagram

The data pipeline consists of an AWS Glue workflow, triggers, jobs, and crawlers. The AWS Glue job uses an AWS Identity and Access Management (IAM) role with appropriate permissions to read and write data to an S3 bucket. AWS Glue crawlers crawl the data available in the S3 bucket, update the AWS Glue Data Catalog with the metadata, and create tables. You can run SQL queries on these tables using Athena. For ease of identification, we followed the naming convention for triggers to start with t_*, crawlers with c_*, and jobs with j_*. A CI/CD pipeline based on CodeCommit, CodeBuild, and CodePipeline builds, tests and deploys the solution. The complete infrastructure is created using the AWS CDK.

The following table lists the tables created by this solution that you can query using Athena.

Table Name Description Dataset Location Access Location
neherlab_case_counts Total number of cases s3://covid19-harmonized-dataset/covid19tos3/neherlab_case_counts/ Read Public
neherlab_country_codes Country code s3://covid19-harmonized-dataset/covid19tos3/neherlab_country_codes/ Read Public
neherlab_icu_capacity Intensive Care Unit (ICU) capacity s3://covid19-harmonized-dataset/covid19tos3/neherlab_icu_capacity/ Read Public
neherlab_population Population s3://covid19-harmonized-dataset/covid19tos3/neherlab_population/ Read Public
neherla_denormalized Denormalized table that combines all the preceding tables into one table s3://<your-S3-bucket-name>/neherlab_denormalized Read/Write Reader’s AWS account

Anatomy of the AWS CDK application

In this section, we visit key concepts and anatomy of the AWS CDK application, review the important sections of the code, and discuss how the AWS CDK reduces complexity of the solution as compared to AWS CloudFormation.

An AWS CDK app defines one or more stacks. Stacks (equivalent to CloudFormation stacks) contain constructs, each of which defines one or more concrete AWS resources. Each stack in the AWS CDK app is associated with an environment. An environment is the target AWS account ID and Region into which the stack is intended to be deployed.

In the AWS CDK, the top-most object is the AWS CDK app, which contains multiple stacks vs. the top-level stack in AWS CloudFormation. Given this difference, you can define all the stacks required for the application in the AWS CDK app. In AWS Glue based ETL projects, developers need to define multiple data pipelines by subject area or business logic. In AWS CloudFormation, we can achieve this by writing multiple CloudFormation stacks and often deploy them independently. In some cases, developers write nested stacks, which over time becomes very large and complicated to maintain. In the AWS CDK, all stacks are deployed from the AWS CDK app, increasing modularity of the code and allowing developers to identify all the data pipelines associated with an application easily.

Our AWS CDK application consists of four main files:

  • app.py – This is the AWS CDK app and the entry point for the AWS CDK application
  • pipeline.py – The pipeline.py stack, invoked by app.py, creates the CI/CD pipeline
  • etl/infrastructure.py – The etl/infrastructure.py stack, invoked by pipeline.py, creates the AWS Glue based data pipeline
  • default-config.yaml – The configuration file contains the AWS account ID and Region.

The AWS CDK application reads the configuration from the default-config.yaml file, sets the environment information (AWS account ID and Region), and invokes the PipelineCDKStack class in pipeline.py. Let’s break down the preceding line and discuss the benefits of this design.

For every application, we want to deploy in pre-production environments and a production environment. The application in all the environments will have different configurations, such as the size of the deployed resources. In the AWS CDK, every stack has a property called env, which defines the stack’s target environment. This property receives the AWS account ID and Region for the given stack.

Lines 26–34 in app.py show the aforementioned details:

# Initiating the CodePipeline stack
PipelineCDKStack(
app,
"PipelineCDKStack",
config=config,
env=env,
stack_name=config["codepipeline"]["pipelineStackName"]
)

The env=env line sets the target AWS account ID and Region for PipelieCDKStack. This design allows an AWS CDK app to be deployed in multiple environments at once and increases the parity of the application in all environment. For our example, if we want to deploy PipelineCDKStack in multiple environments, such as development, test, and production, we simply call the PipelineCDKStack stack after populating the env variable appropriately with the target AWS account ID and Region. This was more difficult in AWS CloudFormation, where developers usually needed to deploy the stack for each environment individually. The AWS CDK also provides features to pass the stage at the command line. We look into this option and usage in the later section.

Coming back to the AWS CDK application, the PipelineCDKStack class in pipeline.py uses the aws_cdk.pipeline construct library to create continuous delivery of AWS CDK applications. The AWS CDK provides multiple opinionated construct libraries like aws_cdk.pipeline to reduce boilerplate code from an application. The pipeline.py file creates the CodeCommit repository, populates the repository with the sample code, and creates a pipeline with the necessary AWS CDK stages for CodePipeline to run the CdkGlueBlogStack class from the etl/infrastructure.py file.

Line 99 in pipeline.py invokes the CdkGlueBlogStack class.

The CdkGlueBlogStack class in etl/infrastructure.py creates the crawlers, jobs, database, triggers, and workflow to provision the AWS Glue based data pipeline.

Refer to line 539 for creating a crawler using the CfnCrawler construct, line 564 for creating jobs using the CfnJob construct, and line 168 for creating the workflow using the CfnWorkflow construct. We use the CfnTrigger construct to stitch together multiple triggers to create the workflow. The AWS CDK L1 constructs expose all the available AWS CloudFormation resources and entities using methods from popular programing languages. This allows developers to use popular programing languages to provision resources instead of working with JSON or YAML files in AWS CloudFormation.

Refer to etl/infrastructure.py for additional details.

Walkthrough of the CI/CD pipeline

In this section, we walk through the various stages of the CI/CD pipeline. Refer to CDK Pipelines: Continuous delivery for AWS CDK applications for additional information.

  • Source – This stage fetches the source of the AWS CDK app from the CodeCommit repo and triggers the pipeline every time a new commit is made.
  • Build – This stage compiles the code (if necessary), runs the tests, and performs a cdk synth. The output of the step is a cloud assembly, which is used to perform all the actions in the rest of the pipeline. The pytest is run using the amazon/aws-glue-libs:glue_libs_3.0.0_image_01 Docker image. This image comes with all the required libraries to run tests for AWS Glue version 3.0 jobs using a Docker container. Refer to Develop and test AWS Glue version 3.0 jobs locally using a Docker container for additional information.
  • UpdatePipeline – This stage modifies the pipeline if necessary. For example, if the code is updated to add a new deployment stage to the pipeline or add a new asset to your application, the pipeline is automatically updated to reflect the changes.
  • Assets – This stage prepares and publishes all AWS CDK assets of the app to Amazon S3 and all Docker images to Amazon Elastic Container Registry (Amazon ECR). When the AWS CDK deploys an app that references assets (either directly by the app code or through a library), the AWS CDK CLI first prepares and publishes the assets to Amazon S3 using a CodeBuild job. This AWS Glue solution creates four assets.
  • CDKGlueStage – This stage deploys the assets to the AWS account. In this case, the pipeline deploys the AWS CDK template etl/infrastructure.py to create all the AWS Glue artifacts.

Code

The code can be found at AWS Samples on GitHub.

Prerequisites

This post assumes you have the following:

Deploy the solution

To deploy the solution, complete the following steps:

  • Download the source code from the AWS Samples GitHub repository to the client machine:
$ git clone [email protected]:aws-samples/aws-glue-cdk-cicd.git
  • Create the virtual environment:
$ cd aws-glue-cdk-cicd 
$ python3 -m venv .venv

This step creates a Python virtual environment specific to the project on the client machine. We use a virtual environment in order to isolate the Python environment for this project and not install software globally.

  • Activate the virtual environment according to your OS:
    • On MacOS and Linux, use the following code:
$ source .venv/bin/activate
    • On a Windows platform, use the following code:
% .venv\Scripts\activate.bat

After this step, the subsequent steps run within the bounds of the virtual environment on the client machine and interact with the AWS account as needed.

  • Install the required dependencies described in requirements.txt to the virtual environment:
$ pip install -r requirements.txt
  • Bootstrap the AWS CDK app:
cdk bootstrap

This step populates a given environment (AWS account ID and Region) with resources required by the AWS CDK to perform deployments into the environment. Refer to Bootstrapping for additional information. At this step, you can see the CloudFormation stack CDKToolkit on the AWS CloudFormation console.

  • Synthesize the CloudFormation template for the specified stacks:
$ cdk synth # optional if not default (-c stage=default)

You can verify the CloudFormation templates to identify the resources to be deployed in the next step.

  • Deploy the AWS resources (CI/CD pipeline and AWS Glue based data pipeline):
$ cdk deploy # optional if not default (-c stage=default)

At this step, you can see CloudFormation stacks cdk-covid19-glue-stack-pipeline and cdk-covid19-glue-stack on the AWS CloudFormation console. The cdk-covid19-glue-stack-pipeline stack gets deployed first, which in turn deploys cdk-covid19-glue-stack to create the AWS Glue pipeline.

Verify the solution

When all the previous steps are complete, you can check for the created artifacts.

CloudFormation stacks

You can confirm the existence of the stacks on the AWS CloudFormation console. As shown in the following screenshot, the CloudFormation stacks have been created and deployed by cdk bootstrap and cdk deploy.

BDB-2467-cloudformation-stacks

Figure 2 – AWS CloudFormation stacks

CodePipeline pipeline

On the CodePipeline console, check for the cdk-covid19-glue pipeline.

BDB-2467-code-pipeline-summary

Figure 3 – AWS CodePipeline summary view

You can open the pipeline for a detailed view.

BDB-2467-code-pipeline-detailed

Figure 4 – AWS CodePipeline detailed view

AWS Glue workflow

To validate the AWS Glue workflow and its components, complete the following steps:

  • On the AWS Glue console, choose Workflows in the navigation pane.
  • Confirm the presence of the Covid_19 workflow.
BDB-2467-glue-workflow-summary

Figure 5 – AWS Glue Workflow summary view

You can select the workflow for a detailed view.

BDB-2467-glue-workflow-detailed

Figure 6 – AWS Glue Workflow detailed view

  • Choose Triggers in the navigation pane and check for the presence of seven t-* triggers.
BDB-2467-glue-triggers

Figure 7 – AWS Glue Triggers

  • Choose Jobs in the navigation pane and check for the presence of three j_* jobs.
BDB-2467-glue-jobs

Figure 8 – AWS Glue Jobs

The jobs perform the following tasks:

    • etlScripts/j_emit_start_event.py – A Python job that starts the workflow and creates the event
    • etlScripts/j_neherlab_denorm.py – A Spark ETL job to transform the data and create a denormalized view by combining all the base data together in Parquet format
    • etlScripts/j_emit_ended_event.py – A Python job that ends the workflow and creates the specific event
  • Choose Crawlers in the navigation pane and check for the presence of five neherlab-* crawlers.
BDB-2467-glue-crawlers

Figure 9 – AWS Glue Crawlers

Execute the solution

  • The solution creates a scheduled AWS Glue workflow which runs at 10:00 AM UTC on day 1 of every month. A scheduled workflow can also be triggered on-demand. For the purpose of this post, we will execute the workflow on-demand using the following command from the AWS CLI. If the workflow is successfully started, the command returns the run ID. For instructions on how to run and monitor a workflow in Amazon Glue, refer to Running and monitoring a workflow in Amazon Glue.
aws glue start-workflow-run --name Covid_19
  • You can verify the status of a workflow run by execution the following command from the AWS CLI. Please use the run ID returned from the above command. A successfully executed Covid_19 workflow should return a value of 7 for SucceededActions  and 0 for FailedActions.
aws glue get-workflow-run --name Covid_19 --run-id <run_ID>
  • A sample output of the above command is provided below.
{
"Run": {
"Name": "Covid_19",
"WorkflowRunId": "wr_c8855e82ab42b2455b0e00cf3f12c81f957447abd55a573c087e717f54a4e8be",
"WorkflowRunProperties": {},
"StartedOn": "2022-09-20T22:13:40.500000-04:00",
"CompletedOn": "2022-09-20T22:21:39.545000-04:00",
"Status": "COMPLETED",
"Statistics": {
"TotalActions": 7,
"TimeoutActions": 0,
"FailedActions": 0,
"StoppedActions": 0,
"SucceededActions": 7,
"RunningActions": 0
}
}
}
  • (Optional) To verify the status of the workflow run using AWS Glue console, choose Workflows in the navigation pane, select the Covid_19 workflow, click on the History tab, select the latest row and click on View run details. A successfully completed workflow is marked in green check marks. Please refer to the Legend section in the below screenshot for additional statuses.

    BDB-2467-glue-workflow-success

    Figure 10 – AWS Glue Workflow successful run

Check the output

  • When the workflow is complete, navigate to the Athena console to check the successful creation and population of neherlab_denormalized table. You can run SQL queries against all 5 tables to check the data. A sample SQL query is provided below.
SELECT "country", "location", "date", "cases", "deaths", "ecdc-countries",
        "acute_care", "acute_care_per_100K", "critical_care", "critical_care_per_100K" 
FROM "AwsDataCatalog"."covid19db"."neherlab_denormalized"
limit 10;
BDB-2467-athena

Figure 10 – Amazon Athena

Clean up

To clean up the resources created in this post, delete the AWS CloudFormation stacks in the following order:

  • cdk-covid19-glue-stack
  • cdk-covid19-glue-stack-pipeline
  • CDKToolkit

Then delete all associated S3 buckets:

  • cdk-covid19-glue-stack-p-pipelineartifactsbucketa-*
  • cdk-*-assets-<AWS_ACCOUNT_ID>-<AWS_REGION>
  • covid19-glue-config-<AWS_ACCOUNT_ID>-<AWS_REGION>
  • neherlab-denormalized-dataset-<AWS_ACCOUNT_ID>-<AWS_REGION>

Conclusion

In this post, we demonstrated a step-by-step guide to define, test, provision, and manage changes to an AWS Glue based ETL solution using the AWS CDK. We used an AWS Glue example, which has all the components to build a complex ETL solution, and demonstrated how to integrate individual AWS Glue components into a frictionless CI/CD pipeline. We encourage you to use this post and associated code as the starting point to build your own CI/CD pipelines for AWS Glue based ETL solutions.


About the authors

Puneet Babbar is a Data Architect at AWS, specialized in big data and AI/ML. He is passionate about building products, in particular products that help customers get more out of their data. During his spare time, he loves to spend time with his family and engage in outdoor activities including hiking, running, and skating. Connect with him on LinkedIn.

Suvojit Dasgupta is a Sr. Lakehouse Architect at Amazon Web Services. He works with customers to design and build data solutions on AWS.

Justin Kuskowski is a Principal DevOps Consultant at Amazon Web Services. He works directly with AWS customers to provide guidance and technical assistance around improving their value stream, which ultimately reduces product time to market and leads to a better customer experience. Outside of work, Justin enjoys traveling the country to watch his two kids play soccer and spending time with his family and friends wake surfing on the lakes in Michigan.

New Hands-On Course for Business Analysts – Practical Decision Making using No-Code ML on AWS

Post Syndicated from Antje Barth original https://aws.amazon.com/blogs/aws/new-hands-on-course-for-business-analysts-practical-decision-making-using-no-code-ml-on-aws/

Artificial intelligence (AI) is all around us. AI sends certain emails to our spam folders. It powers autocorrect, which helps us fix typos when we text. And now we can use it to solve business problems.

In business, data-driven insights have become increasingly valuable. These insights are often discovered with the help of machine learning (ML), a subset of AI and the foundation of complex AI systems. And ML technology has come a long way. Today, you don’t need to be a data scientist or computer engineer to gain insights. With the help of no-code ML tools such as Amazon SageMaker Canvas, you can now achieve effective business outcomes using ML without writing a single line of code. You can better understand patterns, trends, and what’s likely to happen in the future. And that means making better business decisions!

Today, I’m happy to announce that AWS and Coursera are launching the new hands-on course Practical Decision Making using No-Code ML on AWS. This five-hour course is designed to demystify AI/ML and give anyone with a spreadsheet the ability to solve real-life business problems.

Practical Decision Making on Coursera

Course Highlights
Over the course of three lessons, you will learn how to address your business problem using ML, how to build and understand an ML model without any code, and how to use ML to extract value to make better decisions. Each lesson walks you through real-life business scenarios and hands-on exercises using Amazon SageMaker Canvas, a visual, no-code ML tool.

Lesson 1 – How To Address Your Business Problem Using ML
In the first lesson, you will learn how to address your business problem using ML without knowing data science. You will be able to describe the four stages of analytics and discuss the high-level concepts of AI/ML.

Practical Data Science - Prescriptive Analytics

This lesson will also introduce you to automated machine learning (AutoML) and how AutoML can help you generate insights based on common business use cases. You will then practice forming business questions around the most common machine learning problem types.

Practical Decision Making - Forming ML questions

For example, imagine you are a business analyst at a ticketing company. You manage ticket sales for large venues—concerts, sporting events, and so on. Let’s assume you want to predict cash flow. A question to solve with ML could be: “How can you better forecast ticket sales?” This is an example of time series forecasting. You will also explore numeric and category ML problems throughout the course. They will help you answer business questions such as “What’s the likely annual revenue for a customer?” and “Will this customer buy another ticket in the next three months?”.

Next, you will learn about the iterative process of asking questions for machine learning to make the questions more explicit and explore how to pick the highest value problems to work on.

Practical Decision Making - Value vs. Ease

The first lesson wraps up with a deep dive on how time influences your data across forecasting and nonforecasting business problems and how to set up your data for each ML problem type.

Lesson 2 – Build and Understand an ML Model Without Any Code
In the second lesson, you learn how to build and understand an ML model without any code using Amazon SageMaker Canvas. You will focus on a customer churn example with synthetically generated data from a cellular services company. The problem question is, “Which customers are most likely to cancel their service next month?”

Practical Decision Making - Customer Churn Example

You will learn how to import data and start exploring it. This lesson will explain how to select the right configuration, pick the target column, and show you how to prepare your data for ML.

SageMaker Canvas also recently introduced new visualizations for exploratory data analysis (EDA), including scatter plots, bar charts, and box plots. These visualizations help you analyze the relationships between features in your data sets and comprehend your data better.

Practical Decision Making - SageMaker Canvas Scatter Plot

After a final data validation, you can preview the model. This shows you right away how accurate the model might be and, on average, which features or columns have the greatest relative impact on model predictions. Once you are done preparing and validating the data, you can go ahead and build the model.

Practical Decision Making - Model Evaluation

Next, you will learn how to evaluate the performance of the model. You will be able to describe the difference between training data and test data splits and how they are used to derive the model’s accuracy score. The lesson also discusses additional performance metrics and how you can apply domain knowledge to decide if the model is performing well. Once you understand how to evaluate the performance metrics, you have the foundation for making better business decisions.

The second lesson wraps up with some common gotchas to watch out for and shows how to iterate on the model to keep improving performance. You will be able to describe the concept of data leakage as a result of memorization versus generalization and additional model flaws to avoid. You will also learn how to iterate on questions, included features, and sample sizes to keep increasing model performance.

Lesson 3 – Extract Value From ML
In the third lesson, you learn how to extract value from ML to make better decisions. You will be able to generate and read predictions, including predictions on a single row of a spreadsheet, called a single prediction, and predictions on the entire spreadsheet, called batch prediction. You will be able to understand what is impacting predictions and play with different scenarios.

Next, you will learn how to share insights and predictions with others. You will learn how to take visuals from the product, such as feature importance charts or scoring diagrams, and share the insights through presentations or business reports.

The third lesson wraps up with how to collaborate with the data science team or a team member with machine learning expertise. When you build your model using SageMaker Canvas, you can choose either a Quick build or a Standard build. The Quick build usually takes 2–15 minutes and limits the input dataset to a maximum of 50,000 rows. The Standard build usually takes 2–4 hours and generally has a higher accuracy. SageMaker Canvas makes it easy to share a standard build model. In the process, you can reveal the model’s behind-the-scenes complexity down to the code level.

Once you have the trained model open, you can click on the Share button. This creates a link that can be opened in SageMaker Studio, an integrated development environment used by data science teams.

Practical Decision Making - Share Model

In SageMaker Studio, you can see the transformations to the input data set and detailed information about scoring and artifacts, like the model object. You can also see the Python notebooks for data exploration and feature engineering.

Practical Decision Making - SageMaker Studio

Hands-On Exercises
This course includes seven hands-on labs to put your learning into practice. You will have the opportunity to use no-code ML with SageMaker Canvas to solve real-world challenges based on publicly available datasets.

The labs focus on different business problems across industries, including retail, financial services, manufacturing, healthcare, and life sciences, as well as transport and logistics.

You will have the opportunity to work on customer churn predictions, housing price predictions, sales forecasting, loan predictions, diabetic patient readmission prediction, machine failure predictions, and supply chain delivery on-time predictions.

Register Today
Practical Decision Making using No-Code ML on AWS is a five-hour course for business analysts and anyone who wants to learn how to solve real-life business problems using no-code ML.

Sign up for Practical Decision Making using No-Code ML on AWS today at Coursera!

— Antje

How AWS Data Lab helped BMW Financial Services design and build a multi-account modern data architecture

Post Syndicated from Rahul Shaurya original https://aws.amazon.com/blogs/big-data/how-aws-data-lab-helped-bmw-financial-services-design-and-build-a-multi-account-modern-data-architecture/

This post is co-written by Martin Zoellner, Thomas Ehrlich and Veronika Bogusch from BMW Group.

BMW Group and AWS announced a comprehensive strategic collaboration in 2020. The goal of the collaboration is to further accelerate BMW Group’s pace of innovation by placing data and analytics at the center of its decision-making. A key element of the collaboration is the further development of the Cloud Data Hub (CDH) of BMW Group. This is the central platform for managing company-wide data and data solutions in the cloud. At the AWS re:Invent 2019 session, BMW and AWS demonstrated the new Cloud Data Hub platform by outlining different archetypes of data platforms and then walking through the journey of building BMW Group’s Cloud Data Hub. To learn more about the Cloud Data Hub, refer to BMW Cloud Data Hub: A reference implementation of the modern data architecture on AWS.

As part of this collaboration, BMW Group is migrating hundreds of data sources across several data domains to the Cloud Data Hub. Several of these sources pertain to BMW Financial Services.

In this post, we talk about how the AWS Data Lab is helping BMW Financial Services build a regulatory reporting application for one of the European BMW market using the Cloud Data Hub on AWS.

Solution overview

In the context of regulatory reporting, BMW Financial Services works with critical financial services data that contains personally identifiable information (PII). We need to provide monthly insights on our financial data to one of the European National Regulator, and we also need to be compliant with the Schrems II and GDPR regulations as we process PII data. This requires the PII to be pseudonymized when it’s loaded into the Cloud Data Hub, and it has to be processed further in pseudonymized form. For an overview of pseudonymization process, check out Build a pseudonymization service on AWS to protect sensitive data .

To address these requirements in a precise and efficient way, BMW Financial Services decided to engage with the AWS Data Lab. The AWS Data Lab has two offerings: the Design Lab and the Build Lab.

Design Lab

The Design Lab is a 1-to-2-day engagement for customers who need a real-world architecture recommendation based on AWS expertise, but aren’t ready to build. In the case of BMW Financial Services, before beginning the build phase, it was key to get all the stakeholders in the same room and record all the functional and non-functional requirements introduced by all the different parties that might influence the data platform—from owners of the various data sources to end-users that would use the platform to run analytics and get business insights. Within the scope of the Design Lab, we discussed three use cases:

  • Regulatory reporting – The top priority for BMW Financial Services was the regulatory reporting use case, which involves collecting and calculating data and reports that will be declared to the National Regulator.
  • Local data warehouse – For this use case, we need to calculate and store all key performance indicators (KPIs) and key value indicators (KVIs) that will be defined during the project. The historical data needs to be stored, but we need to apply a pseudonymization process to respect GDPR directives. Moreover, historical data has to be accessed on a daily basis through a tableau visualization tool. Regarding the structure, it would be valuable to define two levels (at minimum): one at the contract level to justify the calculation of all KPIs, and another at an aggregated level to optimize restitutions. Personal data is limited in the application, but a reidentification process must be possible for authorized consumption patterns.
  • Accounting details – This use case is based on the BMW accounting tool IFT, which provides the accounting balance at the contract level from all local market applications. It must run at least once a month. However, if some issues are identified on IFT during closing, we must be able to restart it and erase the previous run. When the month-end closing is complete, this use case has to keep the last accounting balance version generated during the month and store it. In parallel, all accounting balance versions have to be accessible by other applications for queries and be able to retrieve the information for 24 months.

Design Lab Solution Architecture

Based on these requirements, we developed the following architecture during the Design Lab.

This solution contains the following components:

  1. The main data source that hydrates our three use cases is the already available in the Cloud Data Hub. The Cloud Data Hub uses AWS Lake Formation resource links to grant access to the dataset to the consumer accounts.
  2. For standard, periodic ETL (extract, transform, and load) jobs that involve operations such as converting data types, or creating labels based on numerical values or Boolean flags based on a label, we used AWS Glue ETL jobs.
  3. For historical ETL jobs or more complex calculations such as in the account details use case, which may involve huge joins with custom configurations and tuning, we recommended to use Amazon EMR. This gives you the opportunity to control cluster configurations at a fine-grained level.
  4. To store job metadata that enables features such as reprocessing inputs or rerunning failed jobs, we recommended building a data registry. The goal of the data registry is to create a centralized inventory for any data being ingested in the data lake. A schedule-based AWS Lambda function could be triggered to register data landing on the semantic layer of the Cloud Data Hub in a centralized metadata store. We recommended using Amazon DynamoDB for the data registry.
  5. Amazon Simple Storage Service (Amazon S3) serves as the storage mechanism that powers the regulatory reporting use case using the data management framework Apache Hudi. Apache Hudi is useful for our use cases because we need to develop data pipelines where record-level insert, update, upsert, and delete capabilities are desirable. Hudi tables are supported by both Amazon EMR and AWS Glue jobs via the Hudi connector, along with query engines such as Amazon Athena and Amazon Redshift Spectrum.
  6. As part of the data storing process in the regulatory reporting S3 bucket, we can populate the AWS Glue Data Catalog with the required metadata.
  7. Athena provides an ad hoc query environment for interactive analysis of data stored in Amazon S3 using standard SQL. It has an out-of-the-box integration with the AWS Glue Data Catalog.
  8. For the data warehousing use case, we need to first de-normalize data to create a dimensional model that enables optimized analytical queries. For that conversion, we use AWS Glue ETL jobs.
  9. Dimensional data marts in Amazon Redshift enable our dashboard and self-service reporting needs. Data in Amazon Redshift is organized into several subject areas that are aligned with the business needs, and a dimensional model allows for cross-subject area analysis.
  10. As a by-product of creating an Amazon Redshift cluster, we can use Redshift Spectrum to access data in the regulatory reporting bucket of the architecture. It acts as a front to access more granular data without actually loading it in the Amazon Redshift cluster.
  11. The data provided to the Cloud Data Hub contains personal data that is pseudonymized. However, we need our pseudonymized columns to be re-personalized when visualizing them on Tableau or when generating CSV reports. Both Athena and Amazon Redshift support Lambda UDFs, which can be used to access Cloud Data Hub PII APIs to re-personalize the pseudonymized columns before presenting them to end-users.
  12. Both Athena and Amazon Redshift can be accessed via JDBC (Java Database Connectivity) to provide access to data consumers.
  13. We can use a Python shell job in AWS Glue to run a query against either of our analytics solutions, convert the results to the required CSV format, and store them to a BMW secured folder.
  14. Any business intelligence (BI) tool deployed on premises can connect to both Athena and Amazon Redshift and use their query engines to perform any heavy computation before it receives the final data to fuel its dashboards.
  15. For the data pipeline orchestration, we recommended using AWS Step Functions because of its low-code development experience and its full integration with all the other components discussed.

With the preceding architecture as our long-term target state, we concluded the Design Lab and decided to return for a Build Lab to accelerate solution development.

Preparing for Build Lab

The typical preparation of a Build Lab that follows a Design Lab involves identifying a few examples of common use case patterns, typically the more complex ones. To maximize the success in the Build Lab, we reduce the long-term target architecture to a subset of components that addresses those examples and can be implemented within a 3-to-5-day intense sprint.

For a successful Build Lab, we also need to identify and resolve any external dependencies, such as network connectivity to data sources and targets. If that isn’t feasible, then we find meaningful ways to mock them. For instance, to make the prototype closer to what the production environment would look like, we decided to use separate AWS accounts for each use case, based on the existing team structure of BMW, and use a consumer S3 bucket instead of BMW network-attached storage (NAS).

Build Lab

The BMW team set aside 4 days for their Build Lab. During that time, their dedicated Data Lab Architect worked alongside the team, helping them to build the following prototype architecture.

Build Lab Solution

This solution includes the following components:

  1. The first step was to synchronize the AWS Glue Data Catalog of the Cloud Data Hub and regulatory reporting accounts.
  2. AWS Glue jobs running on the regulatory reporting account had access to the data in the Cloud Data Hub resource accounts. During the Build Lab, the BMW team implemented ETL jobs for six tables, addressing insert, update, and delete record requirements using Hudi.
  3. The result of the ETL jobs is stored in the data lake layer stored in the regulatory reporting S3 bucket as Hudi tables that are catalogued in the AWS Glue Data Catalog and can be consumed by multiple AWS services. The bucket is encrypted using AWS Key Management Service (AWS KMS).
  4. Athena is used to run exploratory queries on the data lake.
  5. To demonstrate the cross-account consumption pattern, we created an Amazon Redshift cluster on it, created external tables from the Data Catalog, and used Redshift Spectrum to query the data. To enable cross-account connectivity between the subnet group of the Data Catalog of the regulatory reporting account and the subnet group of the Amazon Redshift cluster on the local data warehouse account, we had to enable VPC peering. To accelerate and optimize the implementation of these configurations during the Build Lab, we received support from an AWS networking subject matter expert, who ran a valuable session, during which the BMW team understood the networking details of the architecture.
  6. For data consumption, the BMW team implemented an AWS Glue Python shell job that connected to Amazon Redshift or Athena using a JDBC connection, ran a query, and stored the results in the reporting bucket as a CSV file, which would later be accessible by the end-users.
  7. End-users can also directly connect to both Athena and Amazon Redshift using a JDBC connection.
  8. We decided to orchestrate the AWS Glue ETL jobs using AWS Glue Workflows. We used the resulting workflow for the end-of-lab demo.

With that, we completed all the goals we had set up and concluded the 4-day Build Lab.

Conclusion

In this post, we walked you through the journey the BMW Financial Services team took with the AWS Data Lab team to participate in a Design Lab to identify a best-fit architecture for their use cases, and the subsequent Build Lab to implement prototypes for regulatory reporting in one of the European BMW market.

To learn more about how AWS Data Lab can help you turn your ideas into solutions, visit AWS Data Lab.

Special thanks to everyone who contributed to the success of the Design and Build Lab: Lionel Mbenda, Mario Robert Tutunea, Marius Abalarus, Maria Dejoie.


About the authors

Martin Zoellner is an IT Specialist at BMW Group. His role in the project is Subject Matter Expert for DevOps and ETL/SW Architecture.

Thomas Ehrlich is the functional maintenance manager of Regulatory Reporting application in one of the European BMW market.

Veronika Bogusch is an IT Specialist at BMW. She initiated the rebuild of the Financial Services Batch Integration Layer via the Cloud Data Hub. The ingested data assets are the base for the Regulatory Reporting use case described in this article.

George Komninos is a solutions architect for the Amazon Web Services (AWS) Data Lab. He helps customers convert their ideas to a production-ready data product. Before AWS, he spent three years at Alexa Information domain as a data engineer. Outside of work, George is a football fan and supports the greatest team in the world, Olympiacos Piraeus.

Rahul Shaurya is a Senior Big Data Architect with AWS Professional Services. He helps and works closely with customers building data platforms and analytical applications on AWS. Outside of work, Rahul loves taking long walks with his dog Barney.

Customize Amazon QuickSight dashboards with the new bookmarks functionality

Post Syndicated from Mei Qu original https://aws.amazon.com/blogs/big-data/customize-amazon-quicksight-dashboards-with-the-new-bookmarks-functionality/

Amazon QuickSight users now can add bookmarks in dashboards to save customized dashboard preferences into a list of bookmarks for easy one-click access to specific views of the dashboard without having to manually make multiple filter and parameter changes every time. Combined with the “Share this view” functionality, you can also now share your bookmark views with other readers for easy collaboration and discussion.

In this post, we introduce the bookmark functionality and its capabilities, and demonstrate typical use cases. We also discuss several scenarios extending the usage of bookmarks for sharing and presentation.

Create a bookmark

Open the published dashboard that you want to view and make changes to the filters, controls, or select the sheet that you want. For example, you can filter to the Region that interests you, or you can select a specific date range using a sheet control on the dashboard.

To create a bookmark of this view, choose the bookmark icon, then choose Add bookmark.

Enter a name, then choose Save.

The bookmark is saved, and the dashboard name on the banner updates with the bookmark name.

You can return to the original dashboard view that the author published at any time by going back to the bookmark icon and choosing Original dashboard.

Rename or delete a bookmark

To rename or delete a bookmark, on the bookmark pane, choose the context menu (three vertical dots) and choose Rename or Delete.

Although you can make any of these edits to bookmarks you have created, the Original dashboard view can’t be renamed, deleted, or updated, because it’s what the author has published.

Update a bookmark

At any time, you can change a bookmark dashboard view and update the bookmark to always reflect those changes.

After you make your edits, on the banner, you can see Modified next to the bookmark you’re currently editing.

To save your updates, on the bookmarks pane, choose the context menu and choose Update.

Make a bookmark the default view

After you create a bookmark, you can set it as the default view of the dashboard that you see when you open the dashboard in a new session. This doesn’t affect anyone else’s view of the dashboard. You can also set the Original dashboard that the author published as the default view. When a default view is set, any time that you open the dashboard, the bookmark view is presented to you, regardless of the changes you made during your last session. However, if no default bookmark has been set, QuickSight will persist your current filter selections when you leave the dashboard. This way, readers can still pick up where they left off and don’t have to re-select filters.

To do this, in the Bookmarks pane, choose the context menu (the three dots) for the bookmark that you want to set as your default view, then choose Set as default.

Share a bookmark

After you create a bookmark, you can share a URL link for the view with others who have permission to view the dashboard. They can then save that view as their own bookmark. The shared view is a snapshot of your bookmark, meaning that if you make any updates to your bookmark after generating the URL link, the shared view doesn’t change.

To do this, choose the bookmark that you want to share so that the dashboard updates to that view. Choose the share icon, then choose Share this view. You can then copy the generated URL to share it with others.

Presentation with bookmarks

One extended use case of the new bookmarks feature is the ability to create a presentation through visuals in a much more elegant fashion. Previously, you may have had to change multiple filters and controls before arriving at your next presentation. Now you can save each presentation as a bookmark beforehand and just go through each bookmark like a slideshow. By creating a snapshot for each of the configurations you want to show, you create a smoother and cleaner experience for your audience.

For example, let’s prepare a presentation through bookmarks using a Restaurant Operations dashboard. We can start with the global view of all the states and their related KPIs.

Best vs. worst performing restaurant

We’re interested in analyzing the difference between the best performing and worst performing store based on revenue. If we had to filter this dashboard on the fly, it would be much more time-consuming because we would have to manually enter both store names into the filter. However, with bookmarks, we can pre-filter to the two addresses, save the bookmark, and be automatically directed to the desired stores when we select it.

Worst performing restaurant analysis

When comparing the best and worst performing stores, we see that the worst performing store (10 Resort Plz) had a below average revenue amount in December 2021. We’re interested in seeing how we can better boost sales around the holiday season and if there is something the owner lacked in that month. We can create a bookmark that directs us to the Owner View with the filters selected to December 2021 and the address and city pre-filtered.

Pennsylvania December 2021 analysis

After looking at this bookmark, we found that in the month of December, the call for support score in the restaurant category was high. Additionally, in the week of December 19, 2021 product sales were at the monthly low. We want to see how these KPIs compare to other stores in the same state for that week. Is it just a seasonal trend, or do we need to look even deeper and take action? Again, we can create a bookmark that gives us this comparison on a state level.

From the last bookmark, we can see that the other store in the same state (Pennsylvania) also had lower than average product sales in the week of December 19, 2021. For the purpose of this demo, we can go ahead and stop here, but we could create even more bookmarks to help answer additional questions, such as if we see this trend across other cities and states.

With these three bookmarks, we have created a top-down presentation looking at the worst performing store and its relevant metrics and comparisons. If we wanted to present this, it would be as simple as cycling through the bookmarks we’ve created to put together a cohesive presentation instead of having distracting onscreen filtering.

Conclusion

In this post, we discussed how we can create, modify, and delete bookmarks, along with setting it as a default view and sharing bookmarks. We also demonstrated an extended use case of presentation with bookmarks. The new bookmarks functionality is now generally available in all supported QuickSight Regions.

We’re looking forward to your feedback and stories on how you apply these calculations for your business needs.

Did you know that you can also ask questions of you data in natural language with QuickSight Q? Watch this video to learn more.


About the Authors

Mei Qu is a Business Intelligence Engineer on the AWS QuickSight ProServe Team. She helps customers and partners leverage their data to develop key business insights and monitor ongoing initiatives. She also develops QuickSight proof of concept projects, delivers technical workshops, and supports implementation projects.

Emily Zhu is a Senior Product Manager at Amazon QuickSight, AWS’s cloud-native, fully managed SaaS BI service. She leads the development of QuickSight analytics and query capabilities. Before joining AWS, she was working in the Amazon Prime Air drone delivery program and the Boeing company as senior strategist for several years. Emily is passionate about potentials of cloud-based BI solutions and looks forward to helping customers advance in their data-driven strategy-making.

Get a quick start with Apache Hudi, Apache Iceberg, and Delta Lake with Amazon EMR on EKS

Post Syndicated from Amir Shenavandeh original https://aws.amazon.com/blogs/big-data/get-a-quick-start-with-apache-hudi-apache-iceberg-and-delta-lake-with-amazon-emr-on-eks/

A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. You can keep your data as is in your object store or file-based storage without having to first structure the data. Additionally, you can run different types of analytics against your loosely formatted data lake—from dashboards and visualizations to big data processing, real-time analytics, and machine learning (ML) to guide better decisions. Due to the flexibility and cost effectiveness that a data lake offers, it’s very popular with customers who are looking to implement data analytics and AI/ML use cases.

Due to the immutable nature of the underlying storage in the cloud, one of the challenges in data processing is updating or deleting a subset of identified records from a data lake. Another challenge is making concurrent changes to the data lake. Implementing these tasks is time consuming and costly.

In this post, we explore three open-source transactional file formats: Apache Hudi, Apache Iceberg, and Delta Lake to help us to overcome these data lake challenges. We focus on how to get started with these data storage frameworks via real-world use case. As an example, we demonstrate how to handle incremental data change in a data lake by implementing a Slowly Changing Dimension Type 2 solution (SCD2) with Hudi, Iceberg, and Delta Lake, then deploy the applications with Amazon EMR on EKS.

ACID challenge in data lakes

In analytics, the data lake plays an important role as an immutable and agile data storage layer. Unlike traditional data warehouses or data mart implementations, we make no assumptions on the data schema in a data lake and can define whatever schemas required by our use cases. It’s up to the downstream consumption layer to make sense of that data for their own purposes.

One of the most common challenges is supporting ACID (Atomicity, Consistency, Isolation, Durability) transactions in a data lake. For example, how do we run queries that return consistent and up-to-date results while new data is continuously being ingested or existing data is being modified?

Let’s try to understand the data problem with a real-world scenario. Assume we centralize customer contact datasets from multiple sources to an Amazon Simple Storage Service (Amazon S3)-backed data lake, and we want to keep all the historical records for analysis and reporting. We face the following challenges:

  • We keep creating append-only files in Amazon S3 to track the contact data changes (insert, update, delete) in near-real time.
  • Consistency and atomicity aren’t guaranteed because we just dump data files from multiple sources without knowing whether the entire operation is successful or not.
  • We don’t have an isolation guarantee whenever multiple workloads are simultaneously reading and writing to the same target contact table.
  • We track every single activity at source, including duplicates caused by the retry mechanism and accidental data changes that are then reverted. This leads to the creation of a large volume of append-only files. The performance of extract, transform, and load (ETL) jobs decreases as all the data files are read each time.
  • We have to shorten the file retention period to reduce the data scan and read performance.

In this post, we walk through a simple SCD2 ETL example designed for solving the ACID transaction problem with the help of Hudi, Iceberg, and Delta Lake. We also show how to deploy the ACID solution with EMR on EKS and query the results by Amazon Athena.

Custom library dependencies with EMR on EKS

By default, Hudi and Iceberg are supported by Amazon EMR as out-of-the-box features. For this demonstration, we use EMR on EKS release 6.8.0, which contains Apache Iceberg 0.14.0-amzn-0 and Apache Hudi 0.11.1-amzn-0. To find out the latest and past versions that Amazon EMR supports, check out the Hudi release history and the Iceberg release history tables. The runtime binary files of these frameworks can be found in the Spark’s class path location within each EMR on EKS image. See Amazon EMR on EKS release versions for the list of supported versions and applications.

As of this writing, Amazon EMR does not include Delta Lake by default. There are two ways to make it available in EMR on EKS:

  • At the application level – You install Delta libraries by setting a Spark configuration spark.jars or --jars command-line argument in your submission script. The JAR files will be downloaded and distributed to each Spark Executor and Driver pod when starting a job.
  • At the Docker container level – You can customize an EMR on EKS image by packaging Delta dependencies into a single Docker container that promotes portability and simplifies dependency management for each workload

Other custom library dependencies can be managed the same way as for Delta Lake—passing a comma-separated list of JAR files in the Spark configuration at job submission, or packaging all the required libraries into a Docker image.

Solution overview

The solution provides two sample CSV files as the data source: initial_contact.csv and update_contacts.csv. They were generated by a Python script with the Faker package. For more details, check out the tutorial on GitHub.

The following diagram describes a high-level architecture of the solution and different services being used.

The workflow steps are as follows:

  1. Ingest the first CSV file from a source S3 bucket. The data is being processed by running a Spark ETL job with EMR on EKS. The application contains either the Hudi, Iceberg, or Delta framework.
  2. Store the initial table in Hudi, Iceberg, or Delta file format in a target S3 bucket (curated). We use the AWS Glue Data Catalog as the hive metastore. Optionally, you can configure Amazon DynamoDB as a lock manager for the concurrency controls.
  3. Ingest a second CSV file that contains new records and some changes to the existing ones.
  4. Perform SCD2 via Hudi, Iceberg, or Delta in the Spark ETL job.
  5. Query the Hudi, Iceberg, or Delta table stored on the target S3 bucket in Athena

To simplify the demo, we have accommodated steps 1–4 into a single Spark application.

Prerequisites

Install the following tools:

curl -fsSL -o get_helm.sh \
https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3

chmod 700 get_helm.sh
export DESIRED_VERSION=v3.8.2
./get_helm.sh 
helm version

For a quick start, you can use AWS CloudShell which includes the AWS CLI and kubectl already.

Clone the project

Download the sample project either to your computer or the CloudShell console:

git clone https://github.com/aws-samples/emr-on-eks-hudi-iceberg-delta
cd emr-on-eks-hudi-iceberg-delta

Set up the environment

Run the following blog_provision.sh script to set up a test environment. The infrastructure deployment includes the following resources:

  • A new S3 bucket to store sample data and job code.
  • An Amazon Elastic Kubernetes Service (Amazon EKS) cluster (version 1.21) in a new VPC across two Availability Zones.
  • An EMR virtual cluster in the same VPC, registered to the emr namespace in Amazon EKS.
  • An AWS Identity and Access Management (IAM) job execution role contains DynamoDB access, because we use DynamoDB to provide concurrency controls that ensure atomic transaction with the Hudi and Iceberg tables.
export AWS_REGION=us-east-1
export EKSCLUSTER_NAME=eks-quickstart
./blog_provision.sh
# Upload sample contact data to S3
export ACCOUNTID=$(aws sts get-caller-identity --query Account --output text)
aws s3 sync data s3://emr-on-eks-quickstart-${ACCOUNTID}-${AWS_REGION}/blog/data

Job execution role

The provisioning includes an IAM job execution role called emr-on-eks-quickstart-execution-role that allows your EMR on EKS jobs access to the required AWS services. It contains AWS Glue permissions because we use the Data Catalog as our metastore.

See the following code:

 {
    "Effect": "Allow",
    "Action": ["glue:Get*","glue:BatchCreatePartition","glue:UpdateTable","glue:CreateTable"],
    "Resource": [
        "arn:aws:glue:${AWS_REGION}:${ACCOUNTID}:catalog",
        "arn:aws:glue:${AWS_REGION}:${ACCOUNTID}:database/*",
        "arn:aws:glue:${AWS_REGION}:${ACCOUNTID}:table/*"
    ]
}

Additionally, the role contains DynamoDB permissions, because we use the service as the lock manager. It provides concurrency controls that ensure atomic transaction with our Hudi and Iceberg tables. If a DynamoDB table with the given name doesn’t exist, a new table is created with the billing mode set as pay-per-request. More details can be found in the following framework examples.

{
    "Sid": "DDBLockManager",
    "Effect": "Allow",
    "Action": [
        "dynamodb:DescribeTable",
        "dynamodb:CreateTable",
        "dynamodb:Query",
        "dynamodb:Scan",
        "dynamodb:PutItem",
        "dynamodb:UpdateItem",
        "dynamodb:DeleteItem",
        "dynamodb:BatchWriteItem",
        "dynamodb:GetItem",
        "dynamodb:BatchGetItem"
    ],
    "Resource": [
       "arn:aws:dynamodb:${AWS_REGION}:${ACCOUNTID}:table/myIcebergLockTable",
        "arn:aws:dynamodb:${AWS_REGION}:${ACCOUNTID}:table/myIcebergLockTable/index/*",
        "arn:aws:dynamodb:${AWS_REGION}:${ACCOUNTID}:table/myHudiLockTable"
        "arn:aws:dynamodb:${AWS_REGION}:${ACCOUNTID}:table/myHudiLockTable/index/*",
    ]
}

Example 1: Run Apache Hudi with EMR on EKS

The following steps provide a quick start for you to implement SCD Type 2 data processing with the Hudi framework. To learn more, refer to Build Slowly Changing Dimensions Type 2 (SCD2) with Apache Spark and Apache Hudi on Amazon EMR.

The following code snippet demonstrates the SCD type2 implementation logic. It creates Hudi tables in a default database in the Glue Data Catalog. The full version is in the script hudi_scd_script.py.

# Read incremental contact CSV file with extra SCD columns
delta_csv_df = spark.read.schema(contact_schema).format("csv")\
.load(f"s3://{S3_BUCKET_NAME}/.../update_contacts.csv")\
.withColumn("ts", lit(current_timestamp()).cast(TimestampType()))\
.withColumn("valid_from", lit(current_timestamp()).cast(TimestampType()))\
.withColumn("valid_to", lit("").cast(TimestampType()))\
.withColumn("checksum",md5(concat(col("name"),col("email"),col("state"))))\
.withColumn('iscurrent', lit(1).cast("int"))

## Find existing records to be expired
join_cond = [initial_hudi_df.checksum != delta_csv_df.checksum,
             initial_hudi_df.id == delta_csv_df.id,
             initial_hudi_df.iscurrent == 1]
contact_to_update_df = (initial_hudi_df.join(delta_csv_df, join_cond)
                      .select(initial_hudi_df.id,
                                ....
                              initial_hudi_df.valid_from,
                              delta_csv_df.valid_from.alias('valid_to'),
                              initial_hudi_df.checksum
                              )
                      .withColumn('iscurrent', lit(0).cast("int"))
                      )
                      
merged_contact_df = delta_csv_df.unionByName(contact_to_update_df)

# Upsert
merged_contact_df.write.format('org.apache.hudi')\
                    .option('hoodie.datasource.write.operation', 'upsert')\
                    .options(**hudiOptions) \
                    .mode('append')\
                    .save(TABLE_LOCATION)

In the job script, the hudiOptions were set to use the AWS Glue Data Catalog and enable the DynamoDB-based Optimistic Concurrency Control (OCC). For more information about concurrency control and alternatives for lock providers, refer to Concurrency Control.

hudiOptions = {
    ....
    # sync to Glue catalog
    "hoodie.datasource.hive_sync.mode":"hms",
    ....
    # DynamoDB based locking mechanisms
    "hoodie.write.concurrency.mode":"optimistic_concurrency_control", #default is SINGLE_WRITER
    "hoodie.cleaner.policy.failed.writes":"LAZY", #Hudi will delete any files written by failed writes to re-claim space
    "hoodie.write.lock.provider":"org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider",
    "hoodie.write.lock.dynamodb.table":"myHudiLockTable",
    "hoodie.write.lock.dynamodb.partition_key":"tablename",
    "hoodie.write.lock.dynamodb.region": REGION,
    "hoodie.write.lock.dynamodb.endpoint_url": f"dynamodb.{REGION}.amazonaws.com"
}
  1. Upload the job scripts to Amazon S3:
    export AWS_REGION=us-east-1
    export ACCOUNTID=$(aws sts get-caller-identity --query Account --output text)
    aws s3 sync hudi/ s3://emr-on-eks-quickstart-${ACCOUNTID}-${AWS_REGION}/blog/

  2. Submit Hudi jobs with EMR on EKS to create SCD2 tables:
    export EMRCLUSTER_NAME=emr-on-eks-quickstart
    export AWS_REGION=us-east-1
    
    ./hudi/hudi_submit_cow.sh
    ./hudi/hudi_submit_mor.sh

    Hudi supports two tables types: Copy on Write (CoW) and Merge on Read (MoR). The following is the code snippet to create a CoW table. For the complete job scripts for each table type, refer to hudi_submit_cow.sh and hudi_submit_mor.sh.

    aws emr-containers start-job-run \
      --virtual-cluster-id $VIRTUAL_CLUSTER_ID \
      --name em68-hudi-cow \
      --execution-role-arn $EMR_ROLE_ARN \
      --release-label emr-6.8.0-latest \
      --job-driver '{
      "sparkSubmitJobDriver": {
          "entryPoint": "s3://'$S3BUCKET'/blog/hudi_scd_script.py",
          "entryPointArguments":["'$AWS_REGION'","'$S3BUCKET'","COW"],
          "sparkSubmitParameters": "--jars local:///usr/lib/hudi/hudi-spark-bundle.jar,local:///usr/lib/spark/external/lib/spark-avro.jar --conf spark.executor.memory=2G --conf spark.executor.cores=2"}}' \
      --configuration-overrides '{
        "applicationConfiguration": [
          {
            "classification": "spark-defaults", 
            "properties": {
              "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
              "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
             }}
        ]}'

  3. Check the job status on the EMR virtual cluster console.
  4. Query the output in Athena:

    select * from hudi_contact_cow where id=103

    select * from hudi_contact_mor_rt where id=103

Example 2: Run Apache Iceberg with EMR on EKS

Starting with Amazon EMR version 6.6.0, you can use Apache Spark 3 on EMR on EKS with the Iceberg table format. For more information on how Iceberg works in an immutable data lake, see Build a high-performance, ACID compliant, evolving data lake using Apache Iceberg on Amazon EMR.

The sample job creates an Iceberg table iceberg_contact in the default database of AWS Glue. The full version is in the iceberg_scd_script.py script. The following code snippet shows the SCD2 type of MERGE operation:

# Read incremental CSV file with extra SCD2 columns
spark.read.schema(contact_schema)\
.format("csv").options(header=False,delimiter=",")\
.load(f"s3://{S3_BUCKET_NAME}/blog/data/update_contacts.csv")\
.withColumn(……)\
.createOrReplaceTempView('iceberg_contact_update')

# Update existing records which are changed in the update file
contact_update_qry = """
    WITH contact_to_update AS (
          SELECT target.*
          FROM glue_catalog.default.iceberg_contact AS target
          JOIN iceberg_contact_update AS source 
          ON target.id = source.id
          WHERE target.checksum != source.checksum
            AND target.iscurrent = 1
        UNION
          SELECT * FROM iceberg_contact_update
    ),contact_updated AS (
        SELECT *, LEAD(valid_from) OVER (PARTITION BY id ORDER BY valid_from) AS eff_from
        FROM contact_to_update
    )
    SELECT id,name,email,state,ts,valid_from,
        CAST(COALESCE(eff_from, null) AS Timestamp) AS valid_to,
        CASE WHEN eff_from IS NULL THEN 1 ELSE 0 END AS iscurrent,
        checksum
    FROM contact_updated
"""
# Upsert
spark.sql(f"""
    MERGE INTO glue_catalog.default.iceberg_contact tgt
    USING ({contact_update_qry}) src
    ON tgt.id = src.id
    AND tgt.checksum = src.checksum
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
""")

As demonstrated earlier when discussing the job execution role, the role emr-on-eks-quickstart-execution-role granted access to the required DynamoDB table myIcebergLockTable, because the table is used to obtain locks on Iceberg tables, in case of multiple concurrent write operations against a single table. For more information on Iceberg’s lock manager, refer to DynamoDB Lock Manager.

  1. Upload the application scripts to the example S3 bucket:
    export AWS_REGION=us-east-1
    export ACCOUNTID=$(aws sts get-caller-identity --query Account --output text)
    aws s3 sync iceberg/ s3://emr-on-eks-quickstart-${ACCOUNTID}-${AWS_REGION}/blog/

  2. Submit the job with EMR on EKS to create an SCD2 Iceberg table:
    export EMRCLUSTER_NAME=emr-on-eks-quickstart
    export AWS_REGION=us-east-1
    
    ./iceberg/iceberg_submit.sh

    The full version code is in the iceberg_submit.sh script. The code snippet is as follows:

    aws emr-containers start-job-run \
    --virtual-cluster-id $VIRTUAL_CLUSTER_ID \
    --name em68-iceberg \
    --execution-role-arn $EMR_ROLE_ARN \
    --release-label emr-6.8.0-latest \
    --job-driver '{
        "sparkSubmitJobDriver": {
        "entryPoint": "s3://'$S3BUCKET'/blog/iceberg_scd_script.py",
        "entryPointArguments":["'$S3BUCKET'"],
        "sparkSubmitParameters": "--jars local:///usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar --conf spark.executor.memory=2G --conf spark.executor.cores=2"}}' \
    --configuration-overrides '{
        "applicationConfiguration": [
        {
        "classification": "spark-defaults",
        "properties": {
        "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        "spark.sql.catalog.glue_catalog": "org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.glue_catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
        "spark.sql.catalog.glue_catalog.warehouse": "s3://'$S3BUCKET'/iceberg/",
        "spark.sql.catalog.glue_catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
        "spark.sql.catalog.glue_catalog.lock-impl": "org.apache.iceberg.aws.glue.DynamoLockManager",
        "spark.sql.catalog.glue_catalog.lock.table": "myIcebergLockTable"
        }}
    ]}'

  3. Check the job status on the EMR on EKS console.
  4. When the job is complete, query the table in Athena:
    select * from iceberg_contact where id=103

Example 3: Run open-source Delta Lake with EMR on EKS

Delta Lake 2.1.x is compatible with Apache Spark 3.3.x. Check out the compatibility list for other versions of Delta Lake and Spark. In this post, we use Amazon EMR release 6.8 (Spark 3.3.0) to demonstrate the SCD2 implementation in a data lake.

The following is the Delta code snippet to load initial dataset; the incremental load MERGE logic is highly similar to the Iceberg example. As a one-off task, there should be two tables set up on the same data:

  • The Delta table delta_table_contact – Defined on the TABLE_LOCATION at ‘s3://{S3_BUCKET_NAME}/delta/delta_contact’. The MERGE/UPSERT operation must be implemented on the Delta destination table. Athena can’t query this table directly, instead it reads from a manifest file stored in the same location, which is a text file containing a list of data files to read for querying a table. It is described as an Athena table below.
  • The Athena table delta_contact – Defined on the manifest location s3://{S3_BUCKET_NAME}/delta/delta_contact/_symlink_format_manifest/. All read operations from Athena must use this table.

The full version code is in the delta_scd_script.py  script. The code snippet is as follows:

# Read initial contact CSV file and create a Delta table with extra SCD2 columns
df_intial_csv = spark.read.schema(contact_schema)\
 .format("csv")\
 .options(header=False,delimiter=",")\
 .load(f"s3://{S3_BUCKET_NAME}/.../initial_contacts.csv")\
 .withColumn(.........)\
 .write.format("delta")\
 .mode("overwrite")\
 .save(TABLE_LOCATION)

spark.sql(f"""CREATE TABLE IF NOT EXISTS delta_table_contact USING DELTA LOCATION '{TABLE_LOCATION}'""")
spark.sql("GENERATE symlink_format_manifest FOR TABLE delta_table_contact")
spark.sql("ALTER TABLE delta_table_contact SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)")

# Create a queriable table in Athena
spark.sql(f"""
    CREATE EXTERNAL TABLE IF NOT EXISTS default.delta_contact (
     ....
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '{TABLE_LOCATION}/_symlink_format_manifest/'""")

The SQL statement GENERATE symlink_format_manifest FOR TABLE ... is a required step to set up the Athena for Delta Lake. Whenever the data in a Delta table is updated, you must regenerate the manifests. Therefore, we use ALTER TABLE .... SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true) to automate the manifest refresh as a one-off setup.

  1. Upload the Delta sample scripts to the S3 bucket:
    export AWS_REGION=us-east-1
    export ACCOUNTID=$(aws sts get-caller-identity --query Account --output text)
    aws s3 sync delta/ s3://emr-on-eks-quickstart-${ACCOUNTID}-${AWS_REGION}/blog/

  2. Submit the job with EMR on EKS:
    export EMRCLUSTER_NAME=emr-on-eks-quickstart
    export AWS_REGION=us-east-1
    ./delta/delta_submit.sh

    The full version code is in the delta_submit.sh script. The open-source Delta JAR files must be included in the spark.jars. Alternatively, follow the instructions in How to customize Docker images and build a custom EMR on EKS image to accommodate the Delta dependencies.

    "spark.jars": "https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.0.0/delta-core_2.12-2.0.0.jar,https://repo1.maven.org/maven2/io/delta/delta-storage/2.0.0/delta-storage-2.0.0.jar"

    The code snippet is as follows:

    aws emr-containers start-job-run \
    --virtual-cluster-id $VIRTUAL_CLUSTER_ID \
    --name em68-delta \
    --execution-role-arn $EMR_ROLE_ARN \
    --release-label emr-6.8.0-latest \
    --job-driver '{
       "sparkSubmitJobDriver": {
       "entryPoint": "s3://'$S3BUCKET'/blog/delta_scd_script.py",
       "entryPointArguments":["'$S3BUCKET'"],
       "sparkSubmitParameters": "--conf spark.executor.memory=2G --conf spark.executor.cores=2"}}' \
    --configuration-overrides '{
       "applicationConfiguration": [
       {
        "classification": "spark-defaults",
        "properties": {
        "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
    "spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog",
    "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
    “spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
    "spark.jars": "https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.1.0/delta-core_2.12-2.1.0.jar,https://repo1.maven.org/maven2/io/delta/delta-storage/2.1.0/delta-storage-2.1.0.jar"
        }}
    ]}‘

  3. Check the job status from the EMR on EKS console.
  4. When the job is complete, query the table in Athena:
    select * from delta_contact where id=103

Clean up

To avoid incurring future charges, delete the resources generated if you don’t need the solution anymore. Run the following cleanup script (change the Region if necessary):

export EMRCLUSTER_NAME=emr-on-eks-quickstart
export AWS_REGION=us-east-1
./clean_up.sh

Conclusion

Implementing an ACID-compliant data lake with EMR on EKS enables you focus more on delivering business value, instead of worrying about managing complexities and reliabilities at the data storage layer.

This post presented three different transactional storage frameworks that can meet your ACID needs. They ensure you never read partial data (Atomicity). The read/write isolation allows you to see consistent snapshots of the data, even if an update occurs at the same time (Consistency and Isolation). All the transactions are stored directly to the underlying Amazon S3-backed data lake, which is designed for 11 9’s of durability (Durability).

For more information, check out the sample GitHub repository used in this post and the EMR on EKS Workshop. They will get you started with running your familiar transactional framework with EMR on EKS. If you want dive deep into each storage format, check out the following posts:


About the authors

Amir Shenavandeh is a Sr Analytics Specialist Solutions Architect and Amazon EMR subject matter expert at Amazon Web Services. He helps customers with architectural guidance and optimisation. He leverages his experience to help people bring their ideas to life, focusing on distributed processing and big data architectures.

Melody Yang is a Senior Big Data Solutions Architect for Amazon EMR at AWS. She is an experienced analytics leader working with AWS customers to provide best practice guidance and technical advice in order to assist their success in data transformation. Her areas of interests are open-source frameworks and automation, data engineering, and DataOps.

Amit Maindola is a Data Architect focused on big data and analytics at Amazon Web Services. He helps customers in their digital transformation journey and enables them to build highly scalable, robust, and secure cloud-based analytical solutions on AWS to gain timely insights and make critical business decisions.

Run a data processing job on Amazon EMR Serverless with AWS Step Functions

Post Syndicated from Siva Ramani original https://aws.amazon.com/blogs/big-data/run-a-data-processing-job-on-amazon-emr-serverless-with-aws-step-functions/

There are several infrastructure as code (IaC) frameworks available today, to help you define your infrastructure, such as the AWS Cloud Development Kit (AWS CDK) or Terraform by HashiCorp. Terraform, an AWS Partner Network (APN) Advanced Technology Partner and member of the AWS DevOps Competency, is an IaC tool similar to AWS CloudFormation that allows you to create, update, and version your AWS infrastructure. Terraform provides friendly syntax (similar to AWS CloudFormation) along with other features like planning (visibility to see the changes before they actually happen), graphing, and the ability to create templates to break infrastructure configurations into smaller chunks, which allows better maintenance and reusability. We use the capabilities and features of Terraform to build an API-based ingestion process into AWS. Let’s get started!

In this post, we showcase how to build and orchestrate a Scala Spark application using Amazon EMR Serverless, AWS Step Functions, and Terraform. In this end-to-end solution, we run a Spark job on EMR Serverless that processes sample clickstream data in an Amazon Simple Storage Service (Amazon S3) bucket and stores the aggregation results in Amazon S3.

With EMR Serverless, you don’t have to configure, optimize, secure, or operate clusters to run applications. You will continue to get the benefits of Amazon EMR, such as open source compatibility, concurrency, and optimized runtime performance for popular data frameworks. EMR Serverless is suitable for customers who want ease in operating applications using open-source frameworks. It offers quick job startup, automatic capacity management, and straightforward cost controls.

Solution overview

We provide the Terraform infrastructure definition and the source code for an AWS Lambda function using sample customer user clicks for online website inputs, which are ingested into an Amazon Kinesis Data Firehose delivery stream. The solution uses Kinesis Data Firehose to convert the incoming data into a Parquet file (an open-source file format for Hadoop) before pushing it to Amazon S3 using the AWS Glue Data Catalog. The generated output S3 Parquet file logs are then processed by an EMR Serverless process, which outputs a report detailing aggregate clickstream statistics in an S3 bucket. The EMR Serverless operation is triggered using Step Functions. The sample architecture and code are spun up as shown in the following diagram.

emr serverless application

The provided samples have the source code for building the infrastructure using Terraform for running the Amazon EMR application. Setup scripts are provided to create the sample ingestion using Lambda for the incoming application logs. For a similar ingestion pattern sample, refer to Provision AWS infrastructure using Terraform (By HashiCorp): an example of web application logging customer data.

The following are the high-level steps and AWS services used in this solution:

  • The provided application code is packaged and built using Apache Maven.
  • Terraform commands are used to deploy the infrastructure in AWS.
  • The EMR Serverless application provides the option to submit a Spark job.
  • The solution uses two Lambda functions:
    • Ingestion – This function processes the incoming request and pushes the data into the Kinesis Data Firehose delivery stream.
    • EMR Start Job – This function starts the EMR Serverless application. The EMR job process converts the ingested user click logs into output in another S3 bucket.
  • Step Functions triggers the EMR Start Job Lambda function, which submits the application to EMR Serverless for processing of the ingested log files.
  • The solution uses four S3 buckets:
    • Kinesis Data Firehose delivery bucket – Stores the ingested application logs in Parquet file format.
    • Loggregator source bucket – Stores the Scala code and JAR for running the EMR job.
    • Loggregator output bucket – Stores the EMR processed output.
    • EMR Serverless logs bucket – Stores the EMR process application logs.
  • Sample invoke commands (run as part of the initial setup process) insert the data using the ingestion Lambda function. The Kinesis Data Firehose delivery stream converts the incoming stream into a Parquet file and stores it in an S3 bucket.

For this solution, we made the following design decisions:

  • We use Step Functions and Lambda in this use case to trigger the EMR Serverless application. In a real-world use case, the data processing application could be long running and may exceed Lambda’s timeout limits. In this case, you can use tools like Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Amazon MWAA is a managed orchestration service makes it easier to set up and operate end-to-end data pipelines in the cloud at scale.
  • The Lambda code and EMR Serverless log aggregation code are developed using Java and Scala, respectively. You can use any supported languages in these use cases.
  • The AWS Command Line Interface (AWS CLI) V2 is required for querying EMR Serverless applications from the command line. You can also view these from the AWS Management Console. We provide a sample AWS CLI command to test the solution later in this post.

Prerequisites

To use this solution, you must complete the following prerequisites:

  • Install the AWS CLI. For this post, we used version 2.7.18. This is required in order to query the aws emr-serverless AWS CLI commands from your local machine. Optionally, all the AWS services used in this post can be viewed and operated via the console.
  • Make sure to have Java installed, and JDK/JRE 8 is set in the environment path of your machine. For instructions, see the Java Development Kit.
  • Install Apache Maven. The Java Lambda functions are built using mvn packages and are deployed using Terraform into AWS.
  • Install the Scala Build Tool. For this post, we used version 1.4.7. Make sure to download and install based on your operating system needs.
  • Set up Terraform. For steps, see Terraform downloads. We use version 1.2.5 for this post.
  • Have an AWS account.

Configure the solution

To spin up the infrastructure and the application, complete the following steps:

  1. Clone the following GitHub repository.
    The provided exec.sh shell script builds the Java application JAR (for the Lambda ingestion function) and the Scala application JAR (for the EMR processing) and deploys the AWS infrastructure that is needed for this use case.
  2. Run the following commands:
    $ chmod +x exec.sh
    $ ./exec.sh

    To run the commands individually, set the application deployment Region and account number, as shown in the following example:

    $ APP_DIR=$PWD
    $ APP_PREFIX=clicklogger
    $ STAGE_NAME=dev
    $ REGION=us-east-1
    $ ACCOUNT_ID=$(aws sts get-caller-identity | jq -r '.Account')

    The following is the Maven build Lambda application JAR and Scala application package:

    $ cd $APP_DIR/source/clicklogger
    $ mvn clean package
    $ sbt reload
    $ sbt compile
    $ sbt package

  3. Deploy the AWS infrastructure using Terraform:
    $ terraform init
    $ terraform plan
    $ terraform apply --auto-approve

Test the solution

After you build and deploy the application, you can insert sample data for Amazon EMR processing. We use the following code as an example. The exec.sh script has multiple sample insertions for Lambda. The ingested logs are used by the EMR Serverless application job.

The sample AWS CLI invoke command inserts sample data for the application logs:

aws lambda invoke --function-name clicklogger-dev-ingestion-lambda —cli-binary-format raw-in-base64-out —payload '{"requestid":"OAP-guid-001","contextid":"OAP-ctxt-001","callerid":"OrderingApplication","component":"login","action":"load","type":"webpage"}' out

To validate the deployments, complete the following steps:

  1. On the Amazon S3 console, navigate to the bucket created as part of the infrastructure setup.
  2. Choose the bucket to view the files.
    You should see that data from the ingested stream was converted into a Parquet file.
  3. Choose the file to view the data.
    The following screenshot shows an example of our bucket contents.
    Now you can run Step Functions to validate the EMR Serverless application.
  4. On the Step Functions console, open clicklogger-dev-state-machine.
    The state machine shows the steps to run that trigger the Lambda function and EMR Serverless application, as shown in the following diagram.
  5. Run the state machine.
  6. After the state machine runs successfully, navigate to the clicklogger-dev-output-bucket on the Amazon S3 console to see the output files.
  7. Use the AWS CLI to check the deployed EMR Serverless application:
    $ aws emr-serverless list-applications \
          | jq -r '.applications[] | select(.name=="clicklogger-dev-loggregrator-emr-<Your-Account-Number>").id'

  8. On the Amazon EMR console, choose Serverless in the navigation pane.
  9. Select clicklogger-dev-studio and choose Manage applications.
  10. The Application created by the stack will be as shown below clicklogger-dev-loggregator-emr-<Your-Account-Number>
    Now you can review the EMR Serverless application output.
  11. On the Amazon S3 console, open the output bucket (us-east-1-clicklogger-dev-loggregator-output-).
    The EMR Serverless application writes the output based on the date partition, such as 2022/07/28/response.md.The following code shows an example of the file output:

    |*createdTime*|*callerid*|*component*|*count*
    |------------|-----------|-----------|-------
    *07-28-2022*|OrderingApplication|checkout|2
    *07-28-2022*|OrderingApplication|login|2
    *07-28-2022*|OrderingApplication|products|2

Clean up

The provided ./cleanup.sh script has the required steps to delete all the files from the S3 buckets that were created as part of this post. The terraform destroy command cleans up the AWS infrastructure that you created earlier. See the following code:

$ chmod +x cleanup.sh
$ ./cleanup.sh

To do the steps manually, you can also delete the resources via the AWS CLI:

# CLI Commands to delete the Amazon S3  

aws s3 rb s3://clicklogger-dev-emr-serverless-logs-bucket-<your-account-number> --force
aws s3 rb s3://clicklogger-dev-firehose-delivery-bucket-<your-account-number> --force
aws s3 rb s3://clicklogger-dev-loggregator-output-bucket-<your-account-number> --force
aws s3 rb s3://clicklogger-dev-loggregator-source-bucket-<your-account-number> --force
aws s3 rb s3://clicklogger-dev-loggregator-source-bucket-<your-account-number> --force

# Destroy the AWS Infrastructure 
terraform destroy --auto-approve

Conclusion

In this post, we built, deployed, and ran a data processing Spark job in EMR Serverless that interacts with various AWS services. We walked through deploying a Lambda function packaged with Java using Maven, and a Scala application code for the EMR Serverless application triggered with Step Functions with infrastructure as code. You can use any combination of applicable programming languages to build your Lambda functions and EMR job application. EMR Serverless can be triggered manually, automated, or orchestrated using AWS services like Step Functions and Amazon MWAA.

We encourage you to test this example and see for yourself how this overall application design works within AWS. Then, it’s just the matter of replacing your individual code base, packaging it, and letting EMR Serverless handle the process efficiently.

If you implement this example and run into any issues, or have any questions or feedback about this post, please leave a comment!

References


About the Authors

Sivasubramanian Ramani (Siva Ramani) is a Sr Cloud Application Architect at Amazon Web Services. His expertise is in application optimization & modernization, serverless solutions and using Microsoft application workloads with AWS.

Naveen Balaraman is a Sr Cloud Application Architect at Amazon Web Services. He is passionate about Containers, serverless Applications, Architecting Microservices and helping customers leverage the power of AWS cloud.

Upgrade Amazon EMR Hive Metastore from 5.X to 6.X

Post Syndicated from Jianwei Li original https://aws.amazon.com/blogs/big-data/upgrade-amazon-emr-hive-metastore-from-5-x-to-6-x/

If you are currently running Amazon EMR 5.X clusters, consider moving to Amazon EMR 6.X as  it includes new features that helps you improve performance and optimize on cost. For instance, Apache Hive is two times faster with LLAP on Amazon EMR 6.X, and Spark 3 reduces costs by 40%. Additionally, Amazon EMR 6.x releases include Trino, a fast distributed SQL engine and Iceberg, high-performance open data format for petabyte scale tables.

To upgrade Amazon EMR clusters from 5.X to 6.X release, a Hive Metastore upgrade is the first step before applications such as Hive and Spark can be migrated. This post provides guidance on how to upgrade Amazon EMR Hive Metastore from 5.X to 6.X as well as migration of Hive Metastore to the AWS Glue Data Catalog. As Hive 3 Metastore is compatible with Hive 2 applications, you can continue to use Amazon EMR 5.X with the upgraded Hive Metastore.

Solution overview

In the following section, we provide steps to upgrade the Hive Metastore schema using MySQL as the backend.. For any other backends (such as MariaDB, Oracle, or SQL Server), update the commands accordingly.

There are two options to upgrade the Amazon EMR Hive Metastore:

  • Upgrade the Hive Metastore schema from 2.X to 3.X by using the Hive Schema Tool
  • Migrate the Hive Metastore to the AWS Glue Data Catalog

We walk through the steps for both options.

Pre-upgrade prerequisites

Before upgrading the Hive Metastore, you must complete the following prerequisites steps:

  1. Verify the Hive Metastore database is running and accessible.
    You should be able to run Hive DDL and DML queries successfully. Any errors or issues must be fixed before proceeding with upgrade process. Use the following sample queries to test the database:

    create table testtable1 (id int, name string);)
    insert into testtable1 values (1001, "user1");
    select * from testtable1;

  2. To get the Metastore schema version in the current EMR 5.X cluster, run the following command in the primary node:
    sudo hive —service schematool -dbType mysql -info

    The following code shows our sample output:

    $ sudo hive --service schematool -dbType mysql -info
    Metastore connection URL: jdbc:mysql://xxxxxxx.us-east-1.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true
    Metastore Connection Driver : org.mariadb.jdbc.Driver
    Metastore connection User: admin
    Hive distribution version: 2.3.0
    Metastore schema version:  2.3.0

  3. Stop the Metastore service and restrict access to the Metastore MySQL database.
    It’s very important that no one else accesses or modifies the contents of the Metastore database while you’re performing the schema upgrade.To stop the Metastore, use the following commands:

    $ sudo stop hive-server2
    $ sudo stop hive-hcatalog-server

    For Amazon EMR release 5.30 and 6.0 onwards (Amazon Linux 2 is the operating system for the Amazon EMR 5.30+ and 6.x release series), use the following commands:

    $ sudo systemctl stop hive-server2.service
    $ sudo systemctl stop hive-hcatalog-server.service

    You can also note the total number of databases and tables present in the Hive Metastore before the upgrade, and verify the number of databases and tables after the upgrade.

  4. To get the total number of tables and databases before the upgrade, run the following commands after connecting to the external Metastore database (assuming the Hive Metadata DB name is hive):
    $mysql -u <username> -h <mysqlhost> --password;
    $use hive;
    $select count(*) from DBS;
    $select count(*) from TBLS;

  5. Take a backup or snapshot of the Hive database.
    This allows you to revert any changes made during the upgrade process if something goes wrong. If you’re using Amazon Relational Database Service (Amazon RDS), refer to Backing up and restoring an Amazon RDS instance for instructions.
  6. Take note of the Hive table storage location if data is stored in HDFS.

If all the table data is on Amazon Simple Storage Service (Amazon S3), then no action is needed. If HDFS is used as the storage layer for Hive databases and tables, then take a note of them. You will need to copy the files on HDFS to a similar path on the new cluster, and then verify or update the location attribute for databases and tables on the new cluster accordingly.

Upgrade the Amazon EMR Hive Metastore schema with the Hive Schema Tool

In this approach, you use the persistent Hive Metastore on a remote database (Amazon RDS for MySQL or Amazon Aurora MySQL-Compatible Edition). The following diagram shows the upgrade procedure.

EMR Hive Metastore Upgrade

To upgrade the Amazon EMR Hive Metastore from 5.X (Hive version 2.X) to 6.X (Hive version 3.X), we can use the Hive Schema Tool. The Hive Schema Tool is an offline tool for Metastore schema manipulation. You can use it to initialize, upgrade, and validate the Metastore schema. Run the following command to show the available options for the Hive Schema Tool:

sudo hive --service schematool -help

Be sure to complete the prerequisites mentioned earlier, including taking a backup or snapshot, before proceeding with the next steps.

  1. Note down the details of the existing Hive external Metastore to be upgraded.
    This includes the RDS for MySQL endpoint host name, database name (for this post, hive), user name, and password. You can do this through one of the following options:

    • Get the Hive Metastore DB information from the Hive configuration file – Log in to the EMR 5.X primary node, open the file /etc/hive/conf/hive-site.xml, and note the four properties:
      <property>
      	  <name>javax.jdo.option.ConnectionURL</name>
      	  <value>jdbc:mysql://{hostname}:3306/{dbname}?createDatabaseIfNotExist=true</value>
      	  <description>username to use against metastore database</description>
      	</property>
      	<property>
      	  <name>javax.jdo.option.ConnectionDriverName</name>
      	  <value>org.mariadb.jdbc.Driver</value>
      	  <description>username to use against metastore database</description>
      	</property>
      	<property>
      	  <name>javax.jdo.option.ConnectionUserName</name>
      	  <value>{username}</value>
      	  <description>username to use against metastore database</description>
      	</property>
      	<property>
      	  <name>javax.jdo.option.ConnectionPassword</name>
      	  <value>{password}</value>
      	  <description>password to use against metastore database</description>
      	</property>

    • Get the Hive Metastore DB information from the Amazon EMR console – Navigate to the EMR 5.X cluster, choose the Configurations tab, and note down the Metastore DB information.
      EMR Cosole for Configuration
  1. Create a new EMR 6.X cluster.
    To use the Hive Schema Tool, we need to create an EMR 6.X cluster. You can create a new EMR 6.X cluster via the Hive console or the AWS Command Line Interface (AWS CLI), without specifying external hive Metastore details. This lets the EMR 6.X cluster launch successfully using the default Hive Metastore. For more information about EMR cluster management, refer to Plan and configure clusters.
  2. After your new EMR 6.X cluster is launched successfully and is in the waiting state, SSH to the EMR 6.X primary node and take a backup of /etc/hive/conf/hive-site.xml:
    sudo cp /etc/hive/conf/hive-site.xml /etc/hive/conf/hive-site.xml.bak

  3. Stop Hive services:
    sudo systemctl stop hive-hcatalog-server.service
    sudo systemctl stop hive-server2.service

    Now you update the Hive configuration and point it to the old hive Metastore database.

  4. Modify /etc/hive/conf/hive-site.xml and update the properties with the values you collected earlier:
    <property>
      <name>javax.jdo.option.ConnectionURL</name>
      <value>jdbc:mysql://{hostname}:3306/{dbname}?createDatabaseIfNotExist=true</value>
      <description>username to use against metastore database</description>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionDriverName</name>
      <value>org.mariadb.jdbc.Driver</value>
      <description>username to use against metastore database</description>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionUserName</name>
      <value>{username}</value>
      <description>username to use against metastore database</description>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionPassword</name>
      <value>{password}</value>
      <description>password to use against metastore database</description>
    </property>

  5. On the same or new SSH session, run the Hive Schema Tool to check that the Metastore is pointing to the old Metastore database:
    sudo hive --service schemaTool -dbType mysql -info

    The output should look as follows (old-hostname, old-dbname, and old-username are the values you changed):

    Metastore connection URL:     jdbc:mysql://{old-hostname}:3306/{old-dbname}?createDatabaseIfNotExist=true
    Metastore Connection Driver :     org.mariadb.jdbc.Driver
    Metastore connection User:     {old-username}
    Hive distribution version:     3.1.0
    Metastore schema version:      2.3.0
    schemaTool completed

    You can upgrade the Hive Metastore by passing the -upgradeSchema option to the Hive Schema Tool. The tool figures out the SQL scripts required to initialize or upgrade the schema and then runs those scripts against the backend database.

  6. Run the upgradeSchema command with -dryRun, which only lists the SQL scripts needed during the actual run:
    sudo hive --service schematool -dbType mysql -upgradeSchema -dryRun

    The output should look like the following code. It shows the Metastore upgrade path from the old version to the new version. You can find the upgrade order on the GitHub repo. In case of failure during the upgrade process, these scripts can be run manually in the same order.

    Metastore connection URL:     jdbc:mysql://{old-hostname}:3306/{old-dbname}?createDatabaseIfNotExist=true
    Metastore Connection Driver :     org.mariadb.jdbc.Driver
    Metastore connection User:     {old-username}
    Hive distribution version:     3.1.0
    Metastore schema version:      2.3.0
    schemaTool completed

  7. To upgrade the Hive Metastore schema, run the Hive Schema Tool with -upgradeSchema:
    sudo hive --service schematool -dbType mysql -upgradeSchema

    The output should look like the following code:

    Starting upgrade metastore schema from version 2.3.0 to 3.1.0
    Upgrade script upgrade-2.3.0-to-3.0.0.mysql.sql
    ...
    Completed upgrade-2.3.0-to-3.0.0.mysql.sql
    Upgrade script upgrade-3.0.0-to-3.1.0.mysql.sql
    ...
    Completed upgrade-3.0.0-to-3.1.0.mysql.sql
    schemaTool completed

    In case of any issues or failures, you can run the preceding command with verbose. This prints all the queries getting run in order and their output.

    sudo hive --service schemaTool -verbose -dbType mysql -upgradeSchema

    If you encounter any failures during this process and you want to upgrade your Hive Metastore by running the SQL yourself, refer to Upgrading Hive Metastore.

    If HDFS was used as storage for the Hive warehouse or any Hive DB location, you need to update the NameNode alias or URI with the new cluster’s HDFS alias.

  8. Use the following commands to update the HDFS NameNode alias (replace <new-loc> <old-loc> with the HDFS root location of the new and old clusters, respectively):
    hive —service metatool -updateLocation <new-loc> <old-loc>

    You can run the following command on any EMR cluster node to get the HDFS NameNode alias:

    hdfs getconf -confKey dfs.namenode.rpc-address

    At first you can run with the dryRun option, which displays all the changes but aren’t persisted. For example:

    [[email protected] ~]$ hive --service metatool -updateLocation hdfs://ip-172-31-50-80.ec2.internal:8020 hdfs://ip-172-31-87-188.ec2.internal:8020 -dryRun
    Initializing HiveMetaTool..
    Looking for LOCATION_URI field in DBS table to update..
    Dry Run of updateLocation on table DBS..
    old location: hdfs://ip-172-31-87-188.ec2.internal:8020/user/hive/warehouse/testdb.db new location: hdfs://ip-172-31-50-80.ec2.internal:8020/user/hive/warehouse/testdb.db
    old location: hdfs://ip-172-31-87-188.ec2.internal:8020/user/hive/warehouse/testdb_2.db new location: hdfs://ip-172-31-50-80.ec2.internal:8020/user/hive/warehouse/testdb_2.db
    old location: hdfs://ip-172-31-87-188.ec2.internal:8020/user/hive/warehouse new location: hdfs://ip-172-31-50-80.ec2.internal:8020/user/hive/warehouse
    Found 3 records in DBS table to update
    Looking for LOCATION field in SDS table to update..
    Dry Run of updateLocation on table SDS..
    old location: hdfs://ip-172-31-87-188.ec2.internal:8020/user/hive/warehouse/testtable1 new location: hdfs://ip-172-31-50-80.ec2.internal:8020/user/hive/warehouse/testtable1
    Found 1 records in SDS table to update

    However, if the new location needs to be changed to a different HDFS or S3 path, then use the following approach.

    First connect to the remote Hive Metastore database and run the following query to pull all the tables for a specific database and list the locations. Replace HiveMetastore_DB with the database name used for the Hive Metastore in the external database (for this post, hive) and the Hive database name (default):

    mysql> SELECT d.NAME as DB_NAME, t.TBL_NAME, t.TBL_TYPE, s.LOCATION FROM <HiveMetastore_DB>.TBLS t 
    JOIN <HiveMetastore_DB>.DBS d ON t.DB_ID = d.DB_ID JOIN <HiveMetastore_DB>.SDS s 
    ON t.SD_ID = s.SD_ID AND d.NAME='default';

    Identify the table for which location needs to be updated. Then run the Alter table command to update the table locations. You can prepare a script or chain of Alter table commands to update the locations for multiple tables.

    ALTER TABLE <table_name> SET LOCATION "<new_location>";

  9. Start and check the status of Hive Metastore and HiveServer2:
    sudo systemctl start hive-hcatalog-server.service
    sudo systemctl start hive-server2.service
    sudo systemctl status hive-hcatalog-server.service
    sudo systemctl status hive-server2.service

Post-upgrade validation

Perform the following post-upgrade steps:

  1. Confirm the Hive Metastore schema is upgraded to the new version:
    sudo hive --service schemaTool -dbType mysql -validate

    The output should look like the following code:

    Starting metastore validation
    
    Validating schema version
    Succeeded in schema version validation.
    [SUCCESS]
    Validating sequence number for SEQUENCE_TABLE
    Succeeded in sequence number validation for SEQUENCE_TABLE.
    [SUCCESS]
    Validating metastore schema tables
    Succeeded in schema table validation.
    [SUCCESS]
    Validating DFS locations
    Succeeded in DFS location validation.
    [SUCCESS]
    Validating columns for incorrect NULL values.
    Succeeded in column validation for incorrect NULL values.
    [SUCCESS]
    Done with metastore validation: [SUCCESS]
    schemaTool completed

  2. Run the following Hive Schema Tool command to query the Hive schema version and verify that it’s upgraded:
    $ sudo hive --service schemaTool -dbType mysql -info
    Metastore connection URL:        jdbc:mysql://<host>:3306/<hivemetastore-dbname>?createDatabaseIfNotExist=true
    Metastore Connection Driver :    org.mariadb.jdbc.Driver
    Metastore connection User:       <username>
    Hive distribution version:       3.1.0
    Metastore schema version:        3.1.0

  3. Run some DML queries against old tables and ensure they are running successfully.
  4. Verify the table and database counts using the same commands mentioned in the prerequisites section, and compare the counts.

The Hive Metastore schema migration process is complete, and you can start working on your new EMR cluster. If for some reason you want to relaunch the EMR cluster, then you just need to provide the Hive Metastore remote database that we upgraded in the previous steps using the options on the Amazon EMR Configurations tab.

Migrate the Amazon EMR Hive Metastore to the AWS Glue Data Catalog

The AWS Glue Data Catalog is flexible and reliable, and can reduce your operation cost. Moreover, the Data Catalog supports different versions of EMR clusters. Therefore, when you migrate your Amazon EMR 5.X Hive Metastore to the Data Catalog, you can use the same Data Catalog with any new EMR 5.8+ cluster, including Amazon EMR 6.x. There are some factors you should consider when using this approach; refer to Considerations when using AWS Glue Data Catalog for more information. The following diagram shows the upgrade procedure.
EMR Hive Metastore Migrate to Glue Data Catalog
To migrate your Hive Metastore to the Data Catalog, you can use the Hive Metastore migration script from GitHub. The following are the major steps for a direct migration.

Make sure all the table data is stored in Amazon S3 and not HDFS. Otherwise, tables migrated to the Data Catalog will have the table location pointing to HDFS, and you can’t query the table. You can check your table data location by connecting to the MySQL database and running the following SQL:

select SD_ID, LOCATION from SDS where LOCATION like 'hdfs%';

Make sure to complete the prerequisite steps mentioned earlier before proceeding with the migration. Ensure the EMR 5.X cluster is in a waiting state and all the components’ status are in service.

  1. Note down the details of the existing EMR 5.X cluster Hive Metastore database to be upgraded.
    As mentioned before, this includes the endpoint host name, database name, user name, and password. You can do this through one of the following options:

    • Get the Hive Metastore DB information from the Hive configuration file – Log in to the Amazon EMR 5.X primary node, open the file /etc/hive/conf/hive-site.xml, and note the four properties:
    <property>
      <name>javax.jdo.option.ConnectionURL</name>
      <value>jdbc:mysql://{hostname}:3306/{dbname}?createDatabaseIfNotExist=true</value>
      <description>username to use against metastore database</description>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionUserName</name>
      <value>{username}</value>
      <description>username to use against metastore database</description>
    </property>
    <property>
      <name>javax.jdo.option.ConnectionPassword</name>
      <value>{password}</value>
      <description>password to use against metastore database</description>
    </property>

    • Get the Hive Metastore DB information from the Amazon EMR console – Navigate to the Amazon EMR 5.X cluster, choose the Configurations tab, and note down the Metastore DB information.

    EMR Cosole for Configuration

  2. On the AWS Glue console, create a connection to the Hive Metastore as a JDBC data source.
    Use the connection JDBC URL, user name, and password you gathered in the previous step. Specify the VPC, subnet, and security group associated with your Hive Metastore. You can find these on the Amazon EMR console if the Hive Metastore is on the EMR primary node, or on the Amazon RDS console if the Metastore is an RDS instance.
  3. Download two extract, transform, and load (ETL) job scripts from GitHub and upload them to an S3 bucket:
    wget https://raw.githubusercontent.com/aws-samples/aws-glue-samples/master/utilities/Hive_metastore_migration/src/hive_metastore_migration.py
    wget https://raw.githubusercontent.com/aws-samples/aws-glue-samples/master/utilities/Hive_metastore_migration/src/import_into_datacatalog.py

    If you configured AWS Glue to access Amazon S3 from a VPC endpoint, you must upload the script to a bucket in the same AWS Region where your job runs.

    Now you must create a job on the AWS Glue console to extract metadata from your Hive Metastore to migrate it to the Data Catalog.

  4. On the AWS Glue console, choose Jobs in the navigation pane.
  5. Choose Create job.
  6. Select Spark script editor.
  7. For Options¸ select Upload and edit an existing script.
  8. Choose Choose file and upload the import_into_datacatalog.py script you downloaded earlier.
  9. Choose Create.
    Glue Job script editor
  10. On the Job details tab, enter a job name (for example, Import-Hive-Metastore-To-Glue).
  11. For IAM Role, choose a role.
  12. For Type, choose Spark.
    Glue ETL Job details
  13. For Glue version¸ choose Glue 3.0.
  14. For Language, choose Python 3.
  15. For Worker type, choose G1.X.
  16. For Requested number of workers, enter 2.
    Glue ETL Job details
  17. In the Advanced properties section, for Script filename, enter import_into_datacatalog.py.
  18. For Script path, enter the S3 path you used earlier (just the parent folder).
    Glue ETL Job details
  19. Under Connections, choose the connection you created earlier.
    Glue ETL Job details
  20. For Python library path, enter the S3 path you used earlier for the file hive_metastore_migration.py.
  21. Under Job parameters, enter the following key-pair values:
    • --mode: from-jdbc
    • --connection-name: EMR-Hive-Metastore
    • --region: us-west-2

    Glue ETL Job details

  22. Choose Save to save the job.
  23. Run the job on demand on the AWS Glue console.

If the job runs successfully, Run status should show as Succeeded. When the job is finished, the metadata from the Hive Metastore is visible on the AWS Glue console. Check the databases and tables listed to verify that they were migrated correctly.

Known issues

In some cases where the Hive Metastore schema version is on a very old release or if some required metadata tables are missing, the upgrade process may fail. In this case, you can use the following steps to identify and fix the issue. Run the schemaTool upgradeSchema command with verbose as follows:

sudo hive --service schemaTool -verbose -dbType mysql -upgradeSchema

This prints all the queries being run in order and their output:

jdbc:mysql://metastore.xxxx.us-west-1> CREATE INDEX PCS_STATS_IDX ON PAR T_COL_STATS (DB_NAME,TABLE_NAME,COLUMN_NAME,PARTITION_NAME) USING BTREE
Error: (conn=6831922) Duplicate key name 'PCS_STATS_IDX' (state=42000,code=1061)
Closing: 0: jdbc:mysql://metastore.xxxx.us-west-1.rds.amazonaws.com:3306/hive?createDatabaseIfNotExist=true
org.apache.hadoop.hive.metastore.HiveMetaException: Schema initialization FAILED! Metastore state would be inconsistent !!
Underlying cause: java.io.IOException : Schema script failed, errorcode 2

Note down the query and the error message, then take the required steps to address the issue. For example, depending on the error message, you may have to create the missing table or alter an existing table. Then you can either rerun the schemaTool upgradeSchema command, or you can manually run the remaining queries required for upgrade. You can get the complete script that schemaTool runs from the following path on the primary node /usr/lib/hive/scripts/metastore/upgrade/mysql/ or from GitHub.

Clean up

Running additional EMR clusters to perform the upgrade activity in your AWS account may incur additional charges. When you complete the Hive Metastore upgrade successfully, we recommend deleting the additional EMR clusters to save cost.

Conclusion

To upgrade Amazon EMR from 5.X to 6.X and take advantage of some features from Hive 3.X or Spark SQL 3.X, you have to upgrade the Hive Metastore first. If you’re using the AWS Glue Data Catalog as your Hive Metastore, you don’t need to do anything because the Data Catalog supports both Amazon EMR versions. If you’re using a MySQL database as the external Hive Metastore, you can upgrade by following the steps outlined in this post, or you can migrate your Hive Metastore to the Data Catalog.

There are some functional differences between the different versions of Hive, Spark, and Flink. If you have some applications running on Amazon EMR 5.X, make sure test your applications in Amazon EMR 6.X and validate the function compatibility. We will cover application upgrades for Amazon EMR components in a future post.


About the authors

Jianwei Li is Senior Analytics Specialist TAM. He provides consultant service for AWS enterprise support customers to design and build modern data platform. He has more than 10 years experience in big data and analytics domain. In his spare time, he like running and hiking.

Narayanan Venkateswaran is an Engineer in the AWS EMR group. He works on developing Hive in EMR. He has over 17 years of work experience in the industry across several companies including Sun Microsystems, Microsoft, Amazon and Oracle. Narayanan also holds a PhD in databases with focus on horizontal scalability in relational stores.

Partha Sarathi is an Analytics Specialist TAM – at AWS based in Sydney, Australia. He brings 15+ years of technology expertise and helps Enterprise customers optimize Analytics workloads. He has extensively worked on both on-premise and cloud Bigdata workloads along with various ETL platform in his previous roles. He also actively works on conducting proactive operational reviews around the Analytics services like Amazon EMR, Redshift, and OpenSearch.

Krish is an Enterprise Support Manager responsible for leading a team of specialists in EMEA focused on BigData & Analytics, Databases, Networking and Security. He is also an expert in helping enterprise customers modernize their data platforms and inspire them to implement operational best practices. In his spare time, he enjoys spending time with his family, travelling, and video games.

Enable self-service visual data integration and analysis for fund performance using AWS Glue Studio and Amazon QuickSight

Post Syndicated from Rajeshkumar Karuppaswamy original https://aws.amazon.com/blogs/big-data/enable-self-service-visual-data-integration-and-analysis-for-fund-performance-using-aws-glue-studio-and-amazon-quicksight/

IMM (Institutional Money Market) is a mutual fund that invests in highly liquid instruments, cash, and cash equivalents. IMM funds are large financial intermediaries that are crucial to financial stability in the US. Due to its criticality, IMM funds are highly regulated under the security laws, notably Rule 2a-7, Which states that during market stress, fund managers can impose a liquidity fee up to 2% or redemption gates (a delay in processing redemption) if the fund’s weekly liquid assets drop below 30% of its total assets. The liquidity fees and gates allow money market funds to stop heavy redemption in times of market volatility.

Traditional banks use legacy systems and rely on monolithic architectures. Typically, data and business logic is tightly coupled on the same mainframe machines. It’s hard for analysts and fund managers to perform self-service and gather real-time analytics from these legacy systems. They work on the previous nightly report and struggle to keep up with market fluctuations. The slightest modification to the reports on these legacy systems involves vast costs, time, and significant dependency on the software development team. Due to these limitations, analysts and fund managers can’t respond effectively to market trends and face a tremendous challenge in adhering to the regulatory requirements of monitoring the market volatility.

Over the last few years, many banks have adopted the cloud. Banks have migrated their legacy workloads to reduce cost, improve their competitive advantage, and address competition from FinTech and startups. As part of the cloud strategy, many mainframe applications got re-platformed or re-architected to a more efficient database platform. However, many opportunities exist in modernizing the application. One such option is to enable self-service to run real-time analytics. AWS offers various services that help such use cases. In this post, we demonstrate how to analyze fund performance visually using AWS Glue Studio and QuickSight in a self-service fashion.

The aim of the post is to assist operations analysts and fund managers to self-service their data analysis needs without previous coding experience. This post demonstrates how AWS Glue Studio reduces the software development team’s dependency and helps analysts and fund managers perform near-real-time analytics. This post also illustrates how to build visualizations and quickly get business insights using Amazon QuickSight.

Solution overview

Most banks record their daily trading transactions activity in relational database systems. A relational database keeps the ledger of daily transactions that involves many buys and sells of IMM funds. We use the mock trades data and a simulated Morningstar data feed to demonstrate our use case.

The following sample Amazon Relational Database Service (Amazon RDS) instance records daily IMM trades, and Morningstar market data gets stored in Amazon Simple Storage Service (Amazon S3). With AWS Glue Studio, analysts and fund managers can analyze the IMM trades in near-real time and compare them with market observations from Morningstar. They can then review the data in Amazon Athena, and use QuickSight to visualize and further analyze the trade patterns and market trends.

This near-real time and self-service enables fund managers quickly respond to the market volatility and apply fees or gates on IMM funds to comply with Rule 2a-7 regulatory requirements.

The following diagram illustrates the solution architecture.

Provision resources with AWS CloudFormation

To create your resources for this use case, we deploy an AWS CloudFormation template. Complete the following steps:

  1. Choose Launch Stack (in us-east-1):
  2. Choose Next three times to reach the Review step.
  3. Select I acknowledge that AWS CloudFormation might create IAM resources.
  4. Choose Create stack.

Create an AWS Glue connection

You create an AWS Glue connection to access the MySQL database created by the CloudFormation template. An AWS Glue crawler uses the connection in the next step.

  1. On the AWS Glue console, under Databases in the navigation pane, choose Connections.
  2. Choose Add connection.
  3. For Connection name, enter Trade-Analysis.
  4. For Connection type¸ choose JDBC.
  5. Choose Next.
  6. For JDBC URL, enter your URL.
    To connect to an Amazon RDS for MySQL data store with a DBDEV database, use the following code:

    jdbc: mysql://xxx-cluster.cluster-xxx.us-east-1.rds.amazonaws.com:3306/DBDEV

    For more details, see AWS Glue connection properties. Refer to the CloudFormation fund-analysis stack Outputs tab to get the Amazon RDS ARN.

    The next step requires you to first retrieve your MySQL database user name and password via AWS Secrets Manager.

  7. On the Secrets Manager console, choose Secrets in the navigation pane.
  8. Choose the secret rds-secret-fund-analysis.
  9. Choose Retrieve secret value to get the user name and password.
  10. Return to the connection configuration and enter the user name and password.
  11. For VPC, choose the VPC ending with fund-analysis.
  12. For Subnet and Security groups, choose the values ending with fund-analysis.
  13. Choose Next and Finish to complete the connection setup.
  14. Select the connection you created and choose Test Connection.
  15. For IAM role, choose the role AWSGlueServiceRole-Studio.

For more details about using AWS Identity and Access Management (IAM), refer to Setting up for AWS Glue Studio.

Create and run AWS Glue crawlers

In this step, you create two crawlers. The crawlers connect to a data store, determine the schema for your data, and then create metadata tables in your AWS Glue Data Catalog.

Crawl MySQL data stores

The first crawler creates metadata for the MySQL data stores. Complete the following steps:

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Choose Add crawler.
  3. For Crawler name, enter Trades Crawlers.
  4. Choose Next.
  5. For Crawler source type, choose Data stores.
  6. For Repeat crawls of S3 data stores, choose Crawl all folders.
  7. Choose Next.
  8. For Choose a data store, choose JDBC.
  9. For Connection, choose Trade-Analysis.
  10. For Include path, enter the MySQL database name (DBDEV).
  11. Choose Next.
  12. For Add another data store, choose No.
  13. Choose Next.
  14. For the IAM role to access the data stores, choose the role AWSGlueServiceRole-Studio.
  15. For Frequency, choose Run on demand.
  16. Choose Add database.
  17. For Database name, enter trade_analysis_db.
  18. Choose Create.
  19. Choose Next.
  20. Review all the steps and choose Finish to create your crawler.
  21. Select the Trades Crawlers crawler and choose Run crawler to get the metadata.

Crawl Amazon S3 data stores

Now you configure a crawler to create metadata for the Amazon S3 data stores.

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Choose Add crawler.
  3. For Crawler name, enter Ratings.
  4. Choose Next.
  5. For Crawler source type, choose Data stores.
  6. For Repeat crawls of S3 data stores, choose Crawl all folders.
  7. Choose Next.
  8. For Choose a data store, choose S3.
  9. For Connection, choose Trade-Analysis.
  10. For Include path, enter s3://aws-bigdata-blog/artifacts/analyze_fund_performance_using_glue/Morningstar.csv.
  11. Choose Next.
  12. For Add another data store, choose No.
  13. Choose Next.
  14. For the IAM role to access the data stores, choose the role AWSGlueServiceRole-Studio.
  15. For Frequency, choose Run on demand.
  16. Choose Add database.
  17. For Database name, enter trade_analysis_db.
  18. Review all the steps and choose Finish to create your crawler.
  19. Select the Ratings crawler and choose Run crawler to get the metadata.

Review crawler output

To review the output of your two crawlers, navigate to the Databases page on the AWS Glue console.

You can review the database trade_analysis_db created in previous steps and the contents of the metadata tables.

Create a job using AWS Glue Studio

A job is the AWS Glue component that allows the implementation of business logic to transform data as part of the extract, transform, and load (ETL) process. For more information, see Adding jobs in AWS Glue.

To create an AWS Glue job using AWS Glue Studio, complete the following steps:

  1. On the AWS Glue console, in the navigation pane, choose AWS Glue Studio.
  2. Choose Create and manage jobs.
  3. Choose View jobs.
    AWS Glue Studio supports different sources. For this post, you use two AWS Glue tables as data sources and one S3 bucket as the destination.
  4. In the Create job section, select Visual with a blank canvas.
  5. Choose Create.

    This takes you to the visual editor to create an AWS Glue job.
  6. Change the job name from Untitled Job to Trade-Analysis-Job.

You now have an AWS Glue job ready to filter, join, and aggregate data from two different sources.

Add two data sources

For this post, you use two AWS Glue tables as data sources: Trades and Ratings, which you created earlier.

  1. On the AWS Glue Studio console, on the Source menu, choose MySQL.
  2. On the Node properties tab, for Name, enter Trades.
  3. For Node type, choose MySQL.
  4. On the Data Source properties – MySQL tab, for Database, choose trade_analysis_db.
  5. For Table, choose dbdev_mft_actvitity.
    Before adding the second data source to the analysis job, be sure that the node you just created isn’t selected.
  6. On the Source menu, choose Amazon S3.
  7. On the Node properties tab, for Name, enter Ratings.
  8. For Node type, choose Amazon S3.
  9. On the Data Source properties – S3 tab, for Database, choose trade_analysis_db.
  10. For Table, choose morning_star_csv.
    You now have two AWS Glue tables as the data sources for the AWS Glue job.The Data preview tab helps you sample your data without having to save or run the job. The preview runs each transform in your job so you can test and debug your transformations.
  11. Choose the Ratings node and on the Data preview tab, choose Start data preview session.
  12. Choose the AWSGlueServiceRole-Studio IAM role and choose Confirm to sample the data.

Data previews are available for each source, target, and transform node in the visual editor, so you can verify the results step by step for other nodes.

Join two tables

A transform is the AWS Glue Studio component were the data is modified. You have the option of using different transforms that are part of this service or custom code. To add transforms, complete the following steps:

  1. On the Transform menu, choose Join.
  2. On the Node properties tab, for Name, enter trades and ratings join.
  3. For Node type, choose Join.
  4. For Node parents, choose the Trades and Ratings data sources.
  5. On the Transform tab, for Join type, choose Outer join.
  6. Choose the common column between the tables to establish the connection.
  7. For Join conditions, choose symbol from the Trades table and mor_rating_fund_symbol from the Ratings table.

Add a target

Before adding the target to store the result, be sure that the node you just created isn’t selected. To add the target, complete the following steps:

  1. On the Target menu, choose Amazon S3.
  2. On the Node properties tab, for Name, enter trades ratings merged.
  3. For Node type, choose Amazon S3 for writing outputs.
  4. For Node parents, choose trades and ratings join.
  5. On the Data target properties – S3 tab, for Format, choose Parquet.
  6. For Compression type, choose None.
  7. For S3 target location, enter s3://glue-studio-blog- {Your Account ID as a 12-digit number}/.
  8. For Data catalog update options, select Create a table in the Data Catalog and on subsequent runs, update the schema and add new partitions.
  9. For Database, choose trade-analysis-db.
  10. For Table name, enter tradesratingsmerged.

Configure the job

When the logic behind the job is complete, you must set the parameters for the job run. In this section, you configure the job by selecting components such as the IAM role and the AWS Glue version you use to run the job.

  1. Choose the Job details tab.
  2. For Job bookmark, choose Disable.
  3. For Number of retries, optionally enter 0.
  4. Choose Save.
  5. When the job is saved, choose Run.

Monitor the job

AWS Glue Studio offers a job monitoring dashboard that provides comprehensive information about your jobs. You can get job statistics and see detailed information about the job and the job status when running.

  1. In the AWS Glue Studio navigation pane, choose Monitoring.
  2. Change the date range to 1 hour using the Date range selector to get the recently submitted job.
    The Job runs summary section displays the current state of the job run. The status of the job could be Running, Canceled, Success, or Failed.The Job run success rate section provides the estimated DPU usage for jobs, and gives you a summary of the performance of the job. Job type breakdown and Worker type breakdown contain additional information about the job.
  3. For get more details about the job run, choose View run details.

Review the results using Athena

To view the data in Athena, complete the following steps:

  1. Navigate to the Athena console, where you can see the database and tables created by your crawlers.

    If you haven’t used Athena in this account before, a message appears instructing you to set a query result location.
  2. Choose Settings, Manage, Browse S3, and select any bucket that you created.
  3. Choose Save and return to the editor to continue.
  4. In the Data section, expand Tables to see the tables you created with the AWS Glue crawlers.
  5. Choose the options menu (three dots) next to one of the tables and choose Preview Table.

The following screenshot shows an example of the data.

Create a QuickSight dashboard and visualizations

To set up QuickSight for the first time, sign up for a QuickSight subscription and allow connections to Athena.

To create a dashboard in QuickSight based on the AWS Glue Data Catalog tables you created, complete the following steps:

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose New dataset.
  3. Create a new QuickSight dataset called Fund-Analysis with Athena as the data source.
  4. In the Choose your table section, choose AwsDataCatlog for Catalog and choose trade_analysis_db for Database.
  5. For Tables, select the tradesratingmerged table to visualize.
  6. Choose Select.
  7. Import the data into SPICE.
    SPICE is an in-memory engine that QuickSight uses to perform advanced calculations and improve performance. Importing the data into SPICE can save time and money. When using SPICE, you can refresh your datasets both fully or incrementally. As of this writing, you can schedule incremental refreshes up to every 15 minutes. For more information, refer to Refreshing SPICE data. For near-real-time analysis, select Directly query your data instead.
  8. Choose Visualize.

    After you create the dataset, you can view it and edit its properties. For this post, leave the properties unchanged.
  9. To analyze the market performance from the Morningstar file, choose the clustered bar combo chart under Visual types.
  10. Drag Fund_Symbol from Fields list to X-axis.
  11. Drag Ratings to Y-axis and Lines.
  12. Choose the default title choose Edit title to change the title to “Market Analysis.”
    The following QuickSight dashboard was created using a custom theme, which is why the colors may appear different than yours.
  13. To display the Morningstar details in tabular form, add a visual to create additional graphs.
  14. Choose the table visual under Visual types.
  15. Drag Fund Symbol and Fund Names to Group by.
  16. Drag Ratings, Historical Earnings, and LT Earnings to Value.

    In QuickSight, up until this point, you analyzed the market performance reported by Morningstar. Let’s analyze the near-real-time daily trade activities.
  17. Add a visual to create additional graphs.
  18. Choose the clustered bar combo chart under Visual types.
  19. Drag Fund_Symbol from Fields list to X-axis and Trade Amount to Y-axis.
  20. Choose the default title choose Edit title to change the title to “Daily Transactions.”
  21. To display the daily trades in tabular form, add a visual to create additional graphs.
  22. Drag Trade Date, Customer Name, Fund Name, Fund Symbol, and Buy/Sell to Group by.
  23. Drag Trade Amount to Value.

The following screenshot shows a complete dashboard. This compares the market observation reported in the street against the daily trades happening in the bank.

In the Market Analysis section of the dashboard, GMFXXD funds were performing well based on the previous night’s feed from Morningstar. However, the Daily Transactions section of the dashboard shows that customers were selling their positions from the funds. Relying only on the previous nightly batch report will mislead the fund managers or operation analyst to act.

Near-real-time analytics using AWS Glue Studio and QuickSight can enable fund managers and analysts to self-serve and impose fees or gates on those IMM funds.

Clean up

To avoid incurring future charges and to clean up unused roles and policies, delete the resources you created: the CloudFormation stack, S3 bucket, and AWS Glue job.

Conclusion

In this post, you learned how to use AWS Glue Studio to analyze data from different sources with no previous coding experience and how to build visualizations and get business insights using QuickSight. You can use AWS Glue Studio and QuickSight to speed up the analytics process and allow different personas to transform data with no development experience.

For more information about AWS Glue Studio, see the AWS Glue Studio User Guide. For information about QuickSight, refer to the Amazon QuickSight User Guide.


About the authors

Rajeshkumar Karuppaswamy is a Customer Solutions Manager at AWS. In this role, Rajeshkumar works with AWS Customers to drive Cloud strategy, provides thought leadership to accelerate businesses achieve speed, agility, and drive innovation. His areas of interests are AI & ML, analytics, and data engineering.

Richa Kaul is a Senior Leader in Customer Solutions serving Financial Services customers. She is based out of New York. She has extensive experience in large scale cloud transformation, employee excellence, and next generation digital solutions. She and her team focus on optimizing value of cloud by building performant, resilient and agile solutions. Richa enjoys multi sports like triathlons, music, and learning about new technologies.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. This summer, he enjoyed goldfish scooping with his children.

Talk to your data: Query your data lake with Amazon QuickSight Q

Post Syndicated from Ying Wang original https://aws.amazon.com/blogs/big-data/talk-to-your-data-query-your-data-lake-with-amazon-quicksight-q/

Amazon QuickSight Q uses machine learning (ML) and natural language technology to empower you to ask business questions about your data and get answers instantly. You can simply enter your questions (for example, “What is the year-over-year sales trend?”) and get the answer in seconds in the form of a QuickSight visual.

Some business questions can’t be answered through existing business intelligence (BI) dashboards. It can take days or weeks for the BI team to accommodate these needs and refine their solution. Because Q doesn’t depend on prebuilt dashboards or reports to answer questions, it removes the need for BI teams to create or update dashboards every time a new business question arises. You can ask questions and receive answers in the form of visuals in seconds directly from within QuickSight or from web applications and portals. Q empowers every business user to self-serve and get insights faster, regardless of their background or skillset.

In this post, we walk you through the steps to configure Q using an Olympic Games public dataset and demonstrate how an end-user can ask simple questions directly from Q in an interactive manner and receive answers in seconds.

You can interactively play with the Olympic dashboard and Q search bar in the following interactive demo.

Solution overview

We use Olympic games public datasets to configure a Q topic and discuss tips and tricks on how to make further configurations on the topic that enable Q to provide prompt answers using ML-powered, natural language query (NLQ) capabilities that empower you to ask questions about data using everyday business language.

The video from Data Con LA provides a high-level demonstration of the capabilities covered in this post.

Additionally, we discuss the following:

  • Best practices for data modeling of a Q topic
  • How to perform data cleansing using AWS Glue DataBrew, SQL, or an Amazon SageMaker Jupyter notebook on datasets to build a Q topic

We use multiple publicly available datasets from Kaggle. The datasets have historical information about athletes, including name, ID, age, weight, country, and medals.

We use the 2020 Olympic datasets and historical data. We also use the datasets Introduction of Women Olympic Sport and Women of Olympic Games to determine the participation of women athletes in Olympics and discover trends. The QuickSight datasets created using these public data files are added to a Q topic, as shown in the following screenshot. We provide details on creating QuickSight datasets later in this post.

Prerequisites

To follow along with the solution presented in this post, you must have access to the following:

Create solution resources

The public datasets in Kaggle can’t be directly utilized to create a Q topic. We have already cleansed the raw data and have provided the cleansed datasets in the GitHub repo. If you are interested in learning more about data cleansing, we discussed three different data cleansing methods at the end of this post.

To create your resources, complete the following steps:

  1. Create an S3 bucket called olympicsdata.
  2. Create a folder for each data file, as shown in the following screenshot.
  3. Upload the data files from the GitHub repo into their respective folders.
  4. Deploy the provided CloudFormation template and provide the necessary information.

The template creates an Athena database and tables, as shown in the following screenshot.

The template also creates the QuickSight data source athena-olympics and datasets.

Create datasets in QuickSight

To build the Q topic, we need to combine the datasets, because each table contains only partial data. Joining these tables helps answer questions across all the features of the 2020 Olympics.

We create the Olympics 2021 dataset by joining the tables Medals_athletes_2021, Athletes_full_2021, Coach_full_2021, and Tech_official_2021.

The following screenshot shows the joins for our complete dataset.

Medals_athletes_2021 is the main table, with the following join conditions:

  • Left outer join athletes_full_2021 on athlete_name, discipline_code, and country_code
  • Left outer join coach_full_2021 on country, discipline, and event
  • Left outer join tech_official_2021 on discipline

Finally, we have the following datasets that we use for our Q topic:

  • Olympics 2021 Details
  • Medals 2021
  • Olympics History (created using the Olympics table)
  • Introduction of Women Olympics Sports
  • Women in the Olympic Movement

Create a Q topic

Topics are collections of one or more datasets that represent a subject area that your business users can ask questions about. In QuickSight, you can create and manage topics on the Topics page. When you create a topic, your business users can ask questions about it in the Q search bar.

When you create topics in Q, you can add multiple datasets to them and then configure all the fields in the datasets to make them natural language-friendly. This enables Q to provide your business users with the correct visualizations and answers to their questions.

The following are data modeling best practices for Q topics:

  • Reduce the number of datasets by consolidating the data. Any given question can only hit one data set, so only include multiple datasets if they are related enough to be part of the same topic, but distinct enough that you can ask a question against them independently.
  • For naming conventions, provide a meaningful name or alias (synonym) of a field to allow the end-user to easily query it.
  • If a field appears in different datasets, make sure that this field has the same name across different datasets.
  • Validate data consistency. For example, the total value of a metric that aggregates from different datasets should be consistent.
  • For fields that don’t request on-the-fly calculations, for example, metrics with distributive functions (sum, max, min, and so on), push down the calculation into a data warehouse.
  • For fields that request on-the-fly calculations, create the calculated field in the QuickSight dataset or Q topic. If other topics or dashboards might reuse the same field, create it in the datasets.

To create a topic, complete the following steps:

  1. On the QuickSight console, choose Topics in the navigation pane.
  2. Choose New topic.
  3. For Topic name, enter a name.
  4. For Description, enter a description.
  5. Choose Save.
  6. On the Add data to topic page that opens, choose Datasets, and then select the datasets that we created in the previous section.
  7. Choose Add data to create the topic.

Enhance the topic

In this section, we discuss various ways that you can enhance the topic.

Add calculated fields to a topic dataset

You can add new fields to a dataset in a topic by creating calculated fields.

For example, we have the column Age in our Olympics dataset. We can create a calculated field to group age into different ranges using the ifelse function. This calculated field can help us ask a question like “How many athletes for each age group?”

  1. Choose Add calculated field.
  2. In the calculation editor, enter the following syntax:
    ifelse(
    Age<=20, '0-20',
    Age>20 and Age <=40, '21-40',
    Age>40 and Age<=60, '41-60',
    '60+'
    )

  3. Name the calculated field Age Groups.
  4. Choose Save.

The calculated field is added to the list of fields in the topic.

Add filters to a topic dataset

Let’s say lot of analysis is expected on the dataset for the summer season. We can add a filter to allow for easy selection of this value. Furthermore, if we want to allow analysis against data for the summer season only, we can choose to always apply this filter or apply it as the default choice, but allow users to ask questions about other seasons as well.

  1. Choose Add filter.
  2. For Name, enter Summer.
  3. Choose the Women in the Olympic Movement dataset.
  4. Choose the Olympics Season field.
  5. Choose Custom filter list for Filter type and set the rule as include.
  6. Enter Summer under Values.
  7. Choose Apply always, unless a question results in an explicit filter from the dataset.
  8. Choose Save.

The filter is added to the list of fields in the topic.

Add named entities to a topic dataset

We can define named entities if we need to show users a combination of fields. For example, when someone asks for player details, it makes sense to show them player name, age, country, sport, and medal. We can make this happen by defining a named entity.

  1. Choose Add named entity.
  2. Choose the Olympics dataset.
  3. Enter Player Profile for Name.
  4. Enter Information of Player for Description.
  5. Choose Add field.
  6. Choose Player Name from the list.
  7. Choose Add field again and add the fields Age, Countries, Sport, and Medal.
    The fields listed are the order they appear in answers. To move a field, choose the six dots next to the name and drag and drop the field to the order that you want.
  8. Choose Save.

The named entity is added to the list of fields in the topic.

Make Q topics natural language-friendly

To help Q interpret your data and better answer your readers’ questions, provide as much information about your datasets and their associated fields as possible.

To make the topic more natural language-friendly, use the following procedures.

Rename fields

You can make your field names more user-friendly in your topics by renaming them and adding descriptions.

Q uses field names to understand the fields and link them to terms in your readers’ questions. When your field names are user-friendly, it’s easier for Q to draw links between the data and a reader’s question. These friendly names are also presented to readers as part of the answer to their question to provide additional context.

Let’s rename the birth date field from the athlete dataset as Athlete Birth Date. Because we have multiple birth date fields in the topics for coach, athlete, and tech roles, renaming the athletes’ birth date field helps Q easily link to the data field when we ask questions regarding athletes’ birth dates.

  1. On the Fields page, choose the down arrow at far right of the Birth Date field to expand it.
  2. Choose the pencil icon next to the field name.
  3. Rename the field to Athlete Birth Date.

Add synonyms to fields in a topic

Even if you update your field names to be user-friendly and provide a description for them, your readers might still use different names to refer to them. For example, a player name field might be referred to as player, players, or sportsman in your reader’s questions.

To help Q make sense of these terms and map them to the correct fields, you can add one or more synonyms to your fields. Doing this improves Q’s accuracy.

  1. On the Fields page, under Synonyms, choose the pencil icon for Player Name.
  2. Enter player and sportsman as synonyms.

Add synonyms to field values

Like we did for field names, we can add synonyms for category values as well.

  1. Choose the Gender field’s row to expand it.
  2. Choose Configure value synonyms, then choose Add.
  3. Choose the pencil icon next to the F value.
  4. Add the synonym Female.
  5. Repeat these steps to add the synonym Male for M.
  6. Choose Done.

Assign field roles

Every field in your dataset is either a dimension or a measure. Knowing whether a field is a dimension or a measure determines what operations Q can and can’t perform on a field.

For example, setting the field Age as a dimension means that Q doesn’t try to aggregate it as it does measures.

  1. On the Fields page, expand the Age field.
  2. For Role, choose Dimension.

Set field aggregations

Setting field aggregations tells Q which function should or shouldn’t be used when those fields are aggregated across multiple rows. You can set a default aggregation for a field, and specify aggregations that aren’t allowed.

A default aggregation is the aggregation that’s applied when there’s no explicit aggregation function mentioned or identified in a reader’s question. For example, let’s ask Q “Show total number of events.” In this case, Q uses the field Total Events, which has a default aggregation of Sum, to answer the question.

  1. On the Fields page, expand the Total Events field.
  2. For Default aggregation, choose Sum.
  3. For Not allowed aggregation, choose Average.

Specify field semantic types

Providing more details on the field context will help Q answer more natural language questions. For example, users might ask “Who won the most medals?” We haven’t set any semantic information for any fields in our dataset yet, so Q doesn’t know what fields to associate with “who.” Let’s see how we can enable Q to tackle this question.

  1. On the Fields page, expand the Player Name field.
  2. For Semantic Type, choose Person.

This enables Q to surface Player Name as an option when answering “who”-based questions.

Exclude unused or unnecessary fields

Fields from all included datasets are displayed by default. However, we have a few fields like Short name of Country, URL Coach Full 2021, and URL Tech Official 2021 that we don’t need in our topic. We can exclude unnecessary fields from the topic to prevent them from showing up in results by choosing the slider next to each field.

Ask questions with Q

After we create and configure our topic, we can now interact with Q by entering questions in the Q search bar.

For example, let’s enter show total medals by country. Q presents an answer to your question as a visual.

You can see how Q interpreted your question in the description at the visual’s upper left. Here you can see the fields, aggregations, topic filters, and datasets used to answer the question. The topic filter na is applied on the Medal attribute, which excludes na values from the aggregation. For more information on topic filters, see Adding filters to a topic dataset.

Q displays the results using the visual type best suited to convey the information. However, Q also gives you the flexibility to view results in other visual types by choosing the Visual icon.

Another example, let’s enter who is the oldest player in basketball. Q presents an answer to your question as a visual.

Sometimes Q might not interpret your question the way you wanted. When this happens, you can provide feedback on the answer or make suggestions for corrections to the answer. For more information about providing answer feedback, see Providing feedback about QuickSight Q topics. For more information about correcting answers, see Correcting wrong answers provided by Amazon QuickSight Q.

Conclusion

In this post, we showed you how to configure Q using an Olympic games public dataset and so end-users can ask simple questions directly from Q in an interactive manner and receive answers in seconds. If you have any feedback or questions, please leave them in the comments section.

Appendix 1: Types of questions supported by Q

Let’s look at samples of each question type that Q can answer using the topic created earlier in this post.

Try the following questions or your own questions and continue enhancing the topic to improve accuracy of responses.

Question Type Example
Dimensional Group Bys show total medals by country
Dimensional Filters (Include) show total medals for united states
Date Group Bys show yearly trend of women participants
Multi Metrics number of women events compared to total events
KPI-Based Period over Periods (PoPs) how many women participants in 2018 over 2016
Relative Date Filters show total medals for united states in the last 5 years
Time Range Filters list of women sports introduced since 2016
Top/Bottom Filter show me the top 3 player with gold medal
Sort Order show top 3 countries with maximum medals
Aggregate Metrics Filter show teams that won more than 50 medals
List Questions list the women sports by year in which they are introduced
OR filters Show player who got gold or silver medal
Percent of Total Percentage of players by country
Where Questions where are the most number of medals
When Questions when women volleyball introduced into olympic games
Who Questions who is the oldest player in basketball
Exclude Questions show countries with highest medals excluding united states

Appendix 2: Data cleansing

In this section, we provide three options for data cleansing: SQL, DataBrew, and Python.

Option 1: SQL

For our first option, we discuss how to create Athena tables on the downloaded Excel or CSV files and then perform the data cleansing using SQL. This option is suitable for those who use Athena tables as a data source for QuickSight datasets and are comfortable using SQL.

The SQL queries to create Athena tables are available in the GitHub repo. In these queries, we perform data cleansing by renaming, changing the data type of some columns, as well as removing the duplicates of rows. Proper naming conventions and accurate data types help Q efficiently link the questions to the data fields and provide accurate answers.

Use the following sample DDL query to create an Athena table for women_introduction_to_olympics:

CREATE EXTERNAL TABLE women_introduction_to_olympics(
year string,
sport string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://<<s3 bucket name>>/womeninolympics/introduction_of_women_olympic_sports'
TBLPROPERTIES (
'has_encrypted_data'='false')

In our data files, there are few columns that are common across more than one dataset that have different column names. For example, gender is available as gender or sex, country is available as country or team or team/noc, and person names have a role prefix in one dataset but not in other datasets. We rename such columns using SQL to maintain consistent column names.

Additionally, we need to change other demographic columns like age, height, and weight to the INT data type, so that they don’t get imported as String.

The following columns from the data files have been transformed using SQL.

Data File Original Column New Column
medals Discipline
Medal_date (timestamp)
Sport
Medal_date (date)
Athletes name
gender
birth_date
birth_place
birth_country
athlete_name
athlete_gender
athlete_birth_date
athlete_birth_place
athlete_birth_country
Coaches name
gender
birth_date
function
coach_name
coach_gender
coach_birth_date
coach_function
Athlete_events (history) Team
NOC
Age (String)
Height (String)
Weight (String)
country
country_code
Age (Integer)
Height (Integer)
Weight (Integer)

Option 2: DataBrew

In this section, we discuss a data cleansing option using DataBrew. DataBrew is a visual data preparation tool that makes it easy to clean and prepare data with no prior coding knowledge. You can directly load the results into an S3 bucket or load the data by uploading an Excel or CSV file.

For our example, we walk you through the steps to implement data cleansing on the medals_athletes_2021 dataset. You can follow the same process to perform any necessary data cleaning on other datasets as well.

Create a new dataset in DataBrew using medals_athletes.csv and then create a DataBrew project and implement the following recipes to cleanse the data in the medals_athletes_2021 dataset.

  1. Delete empty rows in the athlete_name column.
  2. Delete empty rows in the medal_type column.
  3. Delete duplicate rows in the dataset.
  4. Rename discipline to Sport.
  5. Delete the column discipline_code.
  6. Split the column medal_type on a single delimiter.
  7. Delete the column medal_type_2, which was created as a result of step 6.
  8. Rename medal_type_1 to medal_type.
  9. Change the data type of column medal_date from timestamp to date.

After you create the recipe, publish it and create a job to output the results in your desired destination. You can create QuickSight SPICE datasets by importing the cleaned CSV file.

Option 3: Python

In this section, we discuss data cleansing using NumPy and Pandas of Python on the medals_athletes_2021 dataset. You can follow the same process to perform any necessary data cleansing on other datasets as well. The sample Python code is available on GitHub. This option is suitable for someone who is comfortable processing the data using Python.

  1. Delete the column discipline_code:
    olympic.drop(columns='discipline_code')

  2. Rename the column discipline to sport:
    olympic.rename(columns={'discipline': 'sport'})

You can create QuickSight SPICE datasets by importing the cleansed CSV.

Appendix 3: Data cleansing and modeling in the QuickSight data preparation layer

In this section, we discuss one more method of data cleansing that you can perform from the QuickSight data preparation layer, in addition to the methods discussed previously. Using SQL, DataBrew, or Python have advantages because you can prepare and clean the data outside QuickSight so other AWS services can use the cleansed results. Additionally, you can automate the scripts. However, Q authors have to learn other tools and programming languages to take advantage of these options.

Cleansing data in the QuickSight dataset preparation stage allows non-technical Q authors to build the application end to end in QuickSight with a codeless method.

The QuickSight dataset stores any data preparation done on the data, so that the prepared data can be reused in multiple analyses and topics.

We have provided a few examples for data cleansing in the QuickSight data preparation layer.

Change a field name

Let’s change the name data field from Athletes_full_2021 to athlete_name.

  1. In the data preview pane, choose the edit icon on the field that you want to change.
  2. For Name, enter a new name.
  3. Choose Apply.

Change a field data type

You can change the data type of any field from the data source in the QuickSight data preparation layer using the following procedure.

  1. In the data preview pane, choose the edit icon on the field you want to change (for example, birth_date).
  2. Choose Change data type and choose Date.

This converts the string field to a date field.

Appendix 4: Information about the tables

The following table illustrates the scope of each table in the dataset.

Table Name Link Table Data Scope
medals https://www.kaggle.com/piterfm/tokyo-2020-olympics?select=medals.csv Information about medals won by each athlete and the corresponding event and country details
athletes https://www.kaggle.com/piterfm/tokyo-2020-olympics?select=athletes.csv Details about each athlete, such as demographic and country
coaches https://www.kaggle.com/piterfm/tokyo-2020-olympics?select=coaches.csv Details about each coach, such as demographic and country
technical_officials https://www.kaggle.com/piterfm/tokyo-2020-olympics?select=technical_officials.csv Details about each technical official, such as demographic and country
athlete_events https://www.kaggle.com/heesoo37/120-years-of-olympic-history-athletes-and-results Historical information of Olympic games
Introduction_of_Women_Olympics_Sports https://data.world/sports/women-in-the-olympic-games Information on when the women Olympic sports were introduced
womens_participation_in_the_olympic https://data.world/sports/women-in-the-olympic-games Information on participation of women in Olympic sports

About the authors

Ying Wang is a Manager of Software Development Engineer. She has 12 years experience in data analytics and data science. In her data architect life, she helped customer on enterprise data architecture solutions to scale their data analytics in the cloud. Currently, she helps customer to unlock the power of Data with QuickSight from engineering/product by delivering new features.

Ginni Malik is a Data & ML Engineer with AWS Professional Services. She assists customers by architecting enterprise level data lake solutions to scale their data analytics in the cloud. She is a travel enthusiast and likes to run half-marathons.

Niharika Katnapally is a QuickSight Business Intelligence Engineer with AWS Professional Services. She assists customers by developing QuickSight dashboards to help them gain insights into their data and make data driven business decisions.

Design considerations for Amazon EMR on EKS in a multi-tenant Amazon EKS environment

Post Syndicated from Lotfi Mouhib original https://aws.amazon.com/blogs/big-data/design-considerations-for-amazon-emr-on-eks-in-a-multi-tenant-amazon-eks-environment/

Many AWS customers use Amazon Elastic Kubernetes Service (Amazon EKS) in order to take advantage of Kubernetes without the burden of managing the Kubernetes control plane. With Kubernetes, you can centrally manage your workloads and offer administrators a multi-tenant environment where they can create, update, scale, and secure workloads using a single API. Kubernetes also allows you to improve resource utilization, reduce cost, and simplify infrastructure management to support different application deployments. This model is beneficial for those running Apache Spark workloads, for several reasons. For example, it allows you to have multiple Spark environments running concurrently with different configurations and dependencies that are segregated from each other through Kubernetes multi-tenancy features. In addition, the same cluster can be used for various workloads like machine learning (ML), host applications, data streaming and thereby reducing operational overhead of managing multiple clusters.

AWS offers Amazon EMR on EKS, a managed service that enables you to run your Apache Spark workloads on Amazon EKS. This service uses the Amazon EMR runtime for Apache Spark, which increases the performance of your Spark jobs so that they run faster and cost less. When you run Spark jobs on EMR on EKS and not on self-managed Apache Spark on Kubernetes, you can take advantage of automated provisioning, scaling, faster runtimes, and the development and debugging tools that Amazon EMR provides

In this post, we show how to configure and run EMR on EKS in a multi-tenant EKS cluster that can used by your various teams. We tackle multi-tenancy through four topics: network, resource management, cost management, and security.

Concepts

Throughout this post, we use terminology that is either specific to EMR on EKS, Spark, or Kubernetes:

  • Multi-tenancy – Multi-tenancy in Kubernetes can come in three forms: hard multi-tenancy, soft multi-tenancy and sole multi-tenancy. Hard multi-tenancy means each business unit or group of applications gets a dedicated Kubernetes; there is no sharing of the control plane. This model is out of scope for this post. Soft multi-tenancy is where pods might share the same underlying compute resource (node) and are logically separated using Kubernetes constructs through namespaces, resource quotas, or network policies. A second way to achieve multi-tenancy in Kubernetes is to assign pods to specific nodes that are pre-provisioned and allocated to a specific team. In this case, we talk about sole multi-tenancy. Unless your security posture requires you to use hard or sole multi-tenancy, you would want to consider using soft multi-tenancy for the following reasons:
    • Soft multi-tenancy avoids underutilization of resources and waste of compute resources.
    • There is a limited number of managed node groups that can be used by Amazon EKS, so for large deployments, this limit can quickly become a limiting factor.
    • In sole multi-tenancy there is high chance of ghost nodes with no pods scheduled on them due to misconfiguration as we force pods into dedicated nodes with label, taints and tolerance and anti-affinity rules.
  • Namespace – Namespaces are core in Kubernetes and a pillar to implement soft multi-tenancy. With namespaces, you can divide the cluster into logical partitions. These partitions are then referenced in quotas, network policies, service accounts, and other constructs that help isolate environments in Kubernetes.
  • Virtual cluster – An EMR virtual cluster is mapped to a Kubernetes namespace that Amazon EMR is registered with. Amazon EMR uses virtual clusters to run jobs and host endpoints. Multiple virtual clusters can be backed by the same physical cluster. However, each virtual cluster maps to one namespace on an EKS cluster. Virtual clusters don’t create any active resources that contribute to your bill or require lifecycle management outside the service.
  • Pod template – In EMR on EKS, you can provide a pod template to control pod placement, or define a sidecar container. This pod template can be defined for executor pods and driver pods, and stored in an Amazon Simple Storage Service (Amazon S3) bucket. The S3 locations are then submitted as part of the applicationConfiguration object that is part of configurationOverrides, as defined in the EMR on EKS job submission API.

Security considerations

In this section, we address security from different angles. We first discuss how to protect IAM role that is used for running the job. Then address how to protect secrets use in jobs and finally we discuss how you can protect data while it is processed by Spark.

IAM role protection

A job submitted to EMR on EKS needs an AWS Identity and Access Management (IAM) execution role to interact with AWS resources, for example with Amazon S3 to get data, with Amazon CloudWatch Logs to publish logs, or use an encryption key in AWS Key Management Service (AWS KMS). It’s a best practice in AWS to apply least privilege for IAM roles. In Amazon EKS, this is achieved through IRSA (IAM Role for Service Accounts). This mechanism allows a pod to assume an IAM role at the pod level and not at the node level, while using short-term credentials that are provided through the EKS OIDC.

IRSA creates a trust relationship between the EKS OIDC provider and the IAM role. This method allows only pods with a service account (annotated with an IAM role ARN) to assume a role that has a trust policy with the EKS OIDC provider. However, this isn’t enough, because it would allow any pod with a service account within the EKS cluster that is annotated with a role ARN to assume the execution role. This must be further scoped down using conditions on the role trust policy. This condition allows the assume role to happen only if the calling service account is the one used for running a job associated with the virtual cluster. The following code shows the structure of the condition to add to the trust policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Federated": <OIDC provider ARN >
            },
            "Action": "sts:AssumeRoleWithWebIdentity"
            "Condition": { "StringLike": { “<OIDC_PROVIDER>:sub": "system:serviceaccount:<NAMESPACE>:emr-containers-sa-*-*-<AWS_ACCOUNT_ID>-<BASE36_ENCODED_ROLE_NAME>”} }
        }
    ]
}

To scope down the trust policy using the service account condition, you need to run the following the command with AWS CLI:

aws emr-containers update-role-trust-policy \
–cluster-name cluster \
–namespace namespace \
–role-name iam_role_name_for_job_execution

The command will the add the service account that will be used by the spark client, Jupyter Enterprise Gateway, Spark kernel, driver or executor. The service accounts name have the following structure emr-containers-sa-*-*-<AWS_ACCOUNT_ID>-<BASE36_ENCODED_ROLE_NAME>.

In addition to the role segregation offered by IRSA, we recommend blocking access to instance metadata because a pod can still inherit the rights of the instance profile assigned to the worker node. For more information about how you can block access to metadata, refer to Restrict access to the instance profile assigned to the worker node.

Secret protection

Sometime a Spark job needs to consume data stored in a database or from APIs. Most of the time, these are protected with a password or access key. The most common way to pass these secrets is through environment variables. However, in a multi-tenant environment, this means any user with access to the Kubernetes API can potentially access the secrets in the environment variables if this access isn’t scoped well to the namespaces the user has access to.

To overcome this challenge, we recommend using a Secrets store like AWS Secrets Manager that can be mounted through the Secret Store CSI Driver. The benefit of using Secrets Manager is the ability to use IRSA and allow only the role assumed by the pod access to the given secret, thereby improving your security posture. You can refer to the best practices guide for sample code showing the use of Secrets Manager with EMR on EKS.

Spark data encryption

When a Spark application is running, the driver and executors produce intermediate data. This data is written to the node local storage. Anyone who is able to exec into the pods would be able to read this data. Spark supports encryption of this data, and it can be enabled by passing --conf spark.io.encryption.enabled=true. Because this configuration adds performance penalty, we recommend enabling data encryption only for workloads that store and access highly sensitive data and in untrusted environments.

Network considerations

In this section we discuss how to manage networking within the cluster as well as outside the cluster. We first address how Spark handle cross executors and driver communication and how to secure it. Then we discuss how to restrict network traffic between pods in the EKS cluster and allow only traffic destined to EMR on EKS. Last, we discuss how to restrict traffic of executors and driver pods to external AWS service traffic using security groups.

Network encryption

The communication between the driver and executor uses RPC protocol and is not encrypted. Starting with Spark 3 in the Kubernetes backed cluster, Spark offers a mechanism to encrypt communication using AES encryption.

The driver generates a key and shares it with executors through the environment variable. Because the key is shared through the environment variable, potentially any user with access to the Kubernetes API (kubectl) can read the key. We recommend securing access so that only authorized users can have access to the EMR virtual cluster. In addition, you should set up Kubernetes role-based access control in such a way that the pod spec in the namespace where the EMR virtual cluster runs is granted to only a few selected service accounts. This method of passing secrets through the environment variable would change in the future with a proposal to use Kubernetes secrets.

To enable encryption, RPC authentication must also be enabled in your Spark configuration. To enable encryption in-transit in Spark, you should use the following parameters in your Spark config:

--conf spark.authenticate=true

--conf spark.network.crypto.enabled=true

Note that these are the minimal parameters to set; refer to Encryption from the complete list of parameters.

Additionally, applying encryption in Spark has a negative impact on processing speed. You should only apply it when there is a compliance or regulation need.

Securing Network traffic within the cluster

In Kubernetes, by default pods can communicate over the network across different namespaces in the same cluster. This behavior is not always desirable in a multi-tenant environment. In some instances, for example in regulated industries, to be compliant you want to enforce strict control over the network and send and receive traffic only from the namespace that you’re interacting with. For EMR on EKS, it would be the namespace associated to the EMR virtual cluster. Kubernetes offers constructs that allow you to implement network policies and define fine-grained control over the pod-to-pod communication. These policies are implemented by the CNI plugin; in Amazon EKS, the default plugin would be the VPC CNI. A policy is defined as follows and is applied with kubectl:

Kind: NetworkPolicy
metadata:
  name: default-np-ns1
  namespace: <EMR-VC-NAMESPACE>
spec:
  podSelector: {}
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - namespaceSelector:
        matchLabels:
          nsname: <EMR-VC-NAMESPACE>

Network traffic outside the cluster

In Amazon EKS, when you deploy pods on Amazon Elastic Compute Cloud (Amazon EC2) instances, all the pods use the security group associated with the node. This can be an issue if your pods (executor pods) are accessing a data source (namely a database) that allows traffic based on the source security group. Database servers often restrict network access only from where they are expecting it. In the case of a multi-tenant EKS cluster, this means pods from other teams that shouldn’t have access to the database servers, would be able to send traffic to it.

To overcome this challenge, you can use security groups for pods. This feature allows you to assign a specific security group to your pods, thereby controlling the network traffic to your database server or data source. You can also refer to the best practices guide for a reference implementation.

Cost management and chargeback

In a multi-tenant environment, cost management is a critical subject. You have multiple users from various business units, and you need to be able to precisely chargeback the cost of the compute resource they have used. At the beginning of the post, we introduced three models of multi-tenancy in Amazon EKS: hard multi-tenancy, soft multi-tenancy, and sole multi-tenancy. Hard multi-tenancy is out of scope because the cost tracking is trivial; all the resources are dedicated to the team using the cluster, which is not the case for sole multi-tenancy and soft multi-tenancy. In the next sections, we discuss these two methods to track the cost for each of model.

Soft multi-tenancy

In a soft multi-tenant environment, you can perform chargeback to your data engineering teams based on the resources they consumed and not the nodes allocated. In this method, you use the namespaces associated with the EMR virtual cluster to track how much resources were used for processing jobs. The following diagram illustrates an example.

Diagram to illustrate soft multi-tenancy

Diagram -1 Soft multi-tenancy

Tracking resources based on the namespace isn’t an easy task because jobs are transient in nature and fluctuate in their duration. However, there are partner tools available that allow you to keep track of the resources used, such as Kubecost, CloudZero, Vantage, and many others. For instructions on using Kubecost on Amazon EKS, refer to this blog post on cost monitoring for EKS customers.

Sole multi-tenancy

For sole multi-tenancy, the chargeback is done at the instance (node) level. Each member on your team uses a specific set of nodes that are dedicated to it. These nodes aren’t always running, and are spun up using the Kubernetes auto scaling mechanism. The following diagram illustrates an example.

Diagram to illustrate Sole tenancy

Diagram -2 Sole tenancy

With sole multi-tenancy, you use a cost allocation tag, which is an AWS mechanism that allows you to track how much each resource has consumed. Although the method of sole multi-tenancy isn’t efficient in terms of resource utilization, it provides a simplified strategy for chargebacks. With the cost allocation tag, you can chargeback a team based on all the resources they used, like Amazon S3, Amazon DynamoDB, and other AWS resources. The chargeback mechanism based on the cost allocation tag can be augmented using the recently launched AWS Billing Conductor, which allows you to issue bills internally for your team.

Resource management

In this section, we discuss considerations regarding resource management in multi-tenant clusters. We briefly discuss topics like sharing resources graciously, setting guard rails on resource consumption, techniques for ensuring resources for time sensitive and/or critical jobs, meeting quick resource scaling requirements and finally cost optimization practices with node selectors.

Sharing resources

In a multi-tenant environment, the goal is to share resources like compute and memory for better resource utilization. However, this requires careful capacity management and resource allocation to make sure each tenant gets their fair share. In Kubernetes, resource allocation is controlled and enforced by using ResourceQuota and LimitRange. ResourceQuota limits resources on the namespace level, and LimitRange allows you to make sure that all the containers are submitted with a resource requirement and a limit. In this section, we demonstrate how a data engineer or Kubernetes administrator can set up ResourceQuota as a LimitRange configuration.

The administrator creates one ResourceQuota per namespace that provides constraints for aggregate resource consumption:

apiVersion: v1
kind: ResourceQuota
metadata:
  name: compute-resources
  namespace: teamA
spec:
  hard:
    requests.cpu: "1000"
    requests.memory: 4000Gi
    limits.cpu: "2000"
    limits.memory: 6000Gi

For LimitRange, the administrator can review the following sample configuration. We recommend using default and defaultRequest to enforce the limit and request field on containers. Lastly, from a data engineer perspective while submitting the EMR on EKS jobs, you need to make sure the Spark parameters of resource requirements are within the range of the defined LimitRange. For example, in the following configuration, the request for spark.executor.cores=7 will fail because the max limit for CPU is 6 per container:

apiVersion: v1
kind: LimitRange
metadata:
  name: cpu-min-max
  namespace: teamA
spec:
  limits:
  - max:
      cpu: "6"
    min:
      cpu: "100m"
    default:
      cpu: "500m"
    defaultRequest:
      cpu: "100m"
    type: Container

Priority-based resource allocation

Diagram Illustrates an example of resource allocation with priority

Diagram – 3 Illustrates an example of resource allocation with priority.

As all the EMR virtual clusters share the same EKS computing platform with limited resources, there will be scenarios in which you need to prioritize jobs in a sensitive timeline. In this case, high-priority jobs can utilize the resources and finish the job, whereas low-priority jobs that are running gets stopped and any new pods must wait in the queue. EMR on EKS can achieve this with the help of pod templates, where you specify a priority class for the given job.

When a pod priority is enabled, the Kubernetes scheduler orders pending pods by their priority and places them in the scheduling queue. As a result, the higher-priority pod may be scheduled sooner than pods with lower priority if its scheduling requirements are met. If this pod can’t be scheduled, the scheduler continues and tries to schedule other lower-priority pods.

The preemptionPolicy field on the PriorityClass defaults to PreemptLowerPriority, and the pods of that PriorityClass can preempt lower-priority pods. If preemptionPolicy is set to Never, pods of that PriorityClass are non-preempting. In other words, they can’t preempt any other pods. When lower-priority pods are preempted, the victim pods get a grace period to finish their work and exit. If the pod doesn’t exit within that grace period, that pod is stopped by the Kubernetes scheduler. Therefore, there is usually a time gap between the point when the scheduler preempts victim pods and the time that a higher-priority pod is scheduled. If you want to minimize this gap, you can set a deletion grace period of lower-priority pods to zero or a small number. You can do this by setting the terminationGracePeriodSeconds option in the victim Pod YAML.

See the following code samples for priority class:

apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: high-priority
value: 100
globalDefault: false
description: " High-priority Pods and for Driver Pods."

apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: low-priority
value: 50
globalDefault: false
description: " Low-priority Pods."

One of the key considerations while templatizing the driver pods, especially for low-priority jobs, is to avoid the same low-priority class for both driver and executor. This will save the driver pods from getting evicted and lose the progress of all its executors in a resource congestion scenario. In this low-priority job example, we have used a high-priority class for driver pod templates and low-priority classes only for executor templates. This way, we can ensure the driver pods are safe during the eviction process of low-priority jobs. In this case, only executors will be evicted, and the driver can bring back the evicted executor pods as the resource becomes freed. See the following code:

apiVersion: v1
kind: Pod
spec:
  priorityClassName: "high-priority"
  nodeSelector:
    eks.amazonaws.com/capacityType: ON_DEMAND
  containers:
  - name: spark-kubernetes-driver # This will be interpreted as Spark driver container

apiVersion: v1
kind: Pod
spec:
  priorityClassName: "low-priority"
  nodeSelector:
    eks.amazonaws.com/capacityType: SPOT
  containers:
  - name: spark-kubernetes-executors # This will be interpreted as Spark executor container

Overprovisioning with priority

Diagram Illustrates an example of overprovisioning with priority

Diagram – 4 Illustrates an example of overprovisioning with priority.

As pods wait in a pending state due to resource availability, additional capacity can be added to the cluster with Amazon EKS auto scaling. The time it takes to scale the cluster by adding new nodes for deployment has to be considered for time-sensitive jobs. Overprovisioning is an option to mitigate the auto scaling delay using temporary pods with negative priority. These pods occupy space in the cluster. When pods with high priority are unschedulable, the temporary pods are preempted to make the room. This causes the auto scaler to scale out new nodes due to overprovisioning. Be aware that this is a trade-off because it adds higher cost while minimizing scheduling latency. For more information about overprovisioning best practices, refer to Overprovisioning.

Node selectors

EKS clusters can span multiple Availability Zones in a VPC. A Spark application whose driver and executor pods are distributed across multiple Availability Zones can incur inter- Availability Zone data transfer costs. To minimize or eliminate the data transfer cost, you should configure the job to run on a specific Availability Zone or even specific node type with the help of node labels. Amazon EKS places a set of default labels to identify capacity type (On-Demand or Spot Instance), Availability Zone, instance type, and more. In addition, we can use custom labels to meet workload-specific node affinity.

EMR on EKS allows you to choose specific nodes in two ways:

  • At the job level. Refer to EKS Node Placement for more details.
  • In the driver and executor level using pod templates.

When using pod templates, we recommend using on demand instances for driver pods. You can also consider including spot instances for executor pods for workloads that are tolerant of occasional periods when the target capacity is not completely available. Leveraging spot instances allow you to save cost for jobs that are not critical and can be terminated. Please refer Define a NodeSelector in PodTemplates.

Conclusion

In this post, we provided guidance on how to design and deploy EMR on EKS in a multi-tenant EKS environment through different lenses: network, security, cost management, and resource management. For any deployment, we recommend the following:

  • Use IRSA with a condition scoped on the EMR on EKS service account
  • Use a secret manager to store credentials and the Secret Store CSI Driver to access them in your Spark application
  • Use ResourceQuota and LimitRange to specify the resources that each of your data engineering teams can use and avoid compute resource abuse and starvation
  • Implement a network policy to segregate network traffic between pods

Lastly, if you are considering migrating your spark workload to EMR on EKS you can further learn about design patterns to manage Apache Spark workload in EMR on EKS in this blog and about migrating your EMR transient cluster to EMR on EKS in this blog.


About the Authors

author - lotfiLotfi Mouhib is a Senior Solutions Architect working for the Public Sector team with Amazon Web Services. He helps public sector customers across EMEA realize their ideas, build new services, and innovate for citizens. In his spare time, Lotfi enjoys cycling and running.

author - peter ajeebAjeeb Peter is a Senior Solutions Architect with Amazon Web Services based in Charlotte, North Carolina, where he guides global financial services customers to build highly secure, scalable, reliable, and cost-efficient applications on the cloud. He brings over 20 years of technology experience on Software Development, Architecture and Analytics from industries like finance and telecom.

Detect and process sensitive data using AWS Glue Studio

Post Syndicated from Leonardo Gomez original https://aws.amazon.com/blogs/big-data/detect-and-process-sensitive-data-using-aws-glue-studio/

Data lakes offer the possibility of sharing diverse types of data with different teams and roles to cover numerous use cases. This is very important in order to implement a data democratization strategy and incentivize the collaboration between lines of business. When a data lake is being designed, one of the most important aspects to consider is data privacy. Without it, sensitive information could be accessed by the wrong team, which may affect the reliability of a data platform. However, identifying sensitive data inside a data lake could represent a challenge due to the diversity of the data and also its volume.

Earlier this year, AWS Glue announced the new sensitive data detection and processing feature to help you identify and protect sensitive information in a straightforward way using AWS Glue Studio. This feature uses pattern matching and machine learning to automatically recognize personally identifiable information (PII) and other sensitive data at the column or cell level as part of AWS Glue jobs.

Sensitive data detection in AWS Glue identifies a variety of sensitive data like phone and credit card numbers, and also offers the option to create custom identification patterns or entities to cover your specific use cases. Additionally, it helps you take action, such as creating a new column that contains any sensitive data detected as part of a row or redacting the sensitive information before writing records into a data lake.

This post shows how to create an AWS Glue job that identifies sensitive data at the row level. We also show how create a custom identification pattern to identify case-specific entities.

Overview of solution

To demonstrate how to create an AWS Glue job to identify sensitive data, we use a test dataset with customer comments that contain private data like Social Security number (SSN), phone number, and bank account number. The goal is to create a job that automatically identifies the sensitive data and triggers an action to redact it.

Prerequisites

For this walkthrough, you should have the following prerequisites:

If the AWS account you use to follow this post uses AWS Lake Formation to manage permissions on the AWS Glue Data Catalog, make sure that you log in as a user with access to create databases and tables. For more information, refer to Implicit Lake Formation permissions.

Launch your CloudFormation stack

To create your resources for this use case, complete the following steps:

  1. Launch your CloudFormation stack in us-east-1:
  2. Under Parameters, enter a name for your S3 bucket (include your account number).
  3. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Create stack.
  5. Wait until the creation of the stack is complete, as shown on the AWS CloudFormation console.

Launching this stack creates AWS resources. You need the following resources from the Outputs tab for the next steps:

  • GlueSenRole – The IAM role to run AWS Glue jobs
  • BucketName – The name of the S3 bucket to store solution-related files
  • GlueDatabase – The AWS Glue database to store the table related to this post

Create and run an AWS Glue job

Let’s first create the dataset that is going to be used as the source of the AWS Glue job:

  1. Open AWS CloudShell.
  2. Run the following command:
    aws s3 cp s3://aws-bigdata-blog/artifacts/gluesendata/sourcedata/customer_comments.csv s3://glue-sendata-blog-<YOUR ACCOUNT NUMBER>/customer_comments/customer_comments.csv


    This action copies the dataset that is going to be used as the input for the AWS Glue job covered in this post.

    Now, let’s create the AWS Glue job.

  1. On the AWS Glue Studio console, choose Jobs in the navigation pane.
  2. Select Visual with blank canvas.
  3. Choose the Job Details tab to configure the job.
  4. For Name, enter GlueSenJob.
  5. For IAM Role, choose the role GlueSenDataBlogRole.
  6. For Glue version, choose Glue 3.0.
  7. For Job bookmark, choose Disable.

  8. Choose Save.
  9. After the job is saved, choose the Visual tab and on the Source menu, choose Amazon S3.
  10. On the Data source properties -S3 tab, for S3 source type, select S3 location.
  11. Add the S3 location of the file that you copied previously using CloudShell.
  12. Choose Infer schema.

This last action infers the schema and file type of the of the source for this post, as you can see in the following screenshot.

Now, let’s see what the data looks like.

  1. On the Data preview tab, choose Start data preview session.
  2. For IAM role, choose the role GlueSeDataBlogRole.
  3. Choose Confirm.

This last step may take a couple of minutes to run.

When you review the data, you can see that sensitive data like phone numbers, email addresses, and SSNs are part of the customer comments.

Now let’s identify the sensitive data in the comments dataset and mask it.

  1. On the Transform menu, choose Detect PII.

The AWS Glue sensitive data identification feature allows you to find sensitive data at the row and column level, which covers a diverse number of use cases. For this post, because we scan comments made by customers, we use the row-level scan.

  1. On the Transform tab, select Find sensitive data in each row.
  2. For Types of sensitive information to detect, select Select specific patterns.

Now we need to select the entities or patterns that are going to be identified by the job.

  1. For Selected patterns, choose Browse.
  2. Select the following patterns:
    1. Credit Card
    2. Email Address
    3. IP Address
    4. Mac Address
    5. Person’s Name
    6. Social Security Number (SSN)
    7. US Passport
    8. US Phone
    9. US/Canada bank account
  3. Choose Confirm.

After the sensitive data is identified, AWS Glue offers two options:

  • Enrich data with detection results – Adds a new column to the dataset with the list of the entities or patterns that were identified in that specific row.
  • Redact detected text – Replaces the sensitive data with a custom string. For this post, we use the redaction option.
  1. For Actions, select Redact detected text.
  2. For Replacement text, enter ####.

Let’s see how the dataset looks now.

  1. Check the result data on the Data preview tab.

As you can see, the majority of the sensitive data was redacted, but there is a number on row 11 that isn’t masked. This is because it’s a Canadian permanent resident number, and this pattern isn’t part of the ones that the sensitive data identification feature offers. However, we can add a custom pattern to identify this number.

  1. On the Transform tab, for Selected patterns, choose Create new.

This action opens the Create detection pattern window, where we create the custom pattern to identify the Canadian permanent resident number.

  1. For Pattern name, enter Can_PR_Number.
  2. For Expression, enter the regular expression [P]+[D]+[0]\d\d\d\d\d\d
  3. Choose Validate.
  4. Wait until you get the validation message, then choose Create pattern.

Now you can see the new pattern listed under Custom patterns.

  1. On the AWS Glue Studio Console, for Selected patterns, choose Browse.

Now you can see Can_PR_Number as part of the pattern list.

  1. Select Can_PR_Number and choose Confirm.

On the Data preview tab, you can see that the Canadian permanent resident number has been redacted.

Let’s add a destination for the dataset with redacted information.

  1. On the Target menu, choose Amazon S3.
  2. On the Data target properties -S3 tab, for Format, choose Parquet.
  3. For S3 Target Location, enter s3://glue-sendata-blog-<YOUR ACCOUNT ID>/output/redacted_comments/.
  4. For Data Catalog update options, select Create a table in the Data Catalog and on subsequent runs, update the schema and add new partitions.
  5. For Database, choose gluesenblog.
  6. For Table name, enter custcomredacted.
  7. Choose Save, then choose Run.

You can view the job run details on the Runs tab.

Wait until the job is complete.

Query the dataset

Now let’s see what the final dataset looks like. To do so, we query the data with Athena. As part of this post, we assume that a query result location for Athena is already configured; if not, refer to Working with query results, recent queries, and output files.

  1. On the Athena console, open the query editor.
  2. For Database, choose the gluesenblog database.
  3. Run the following query:
    SELECT * FROM "gluesenblog"."custcomredacted" limit 15;

  1. Verify the results; you can observe that all the sensitive data is redacted.

Clean up

To avoid incurring future charges, and to clean up unused roles and policies, delete the resources you created: Datasets, CloudFormation stack, S3 bucket, AWS Glue job, AWS Glue database, and AWS Glue table.

Conclusion

AWS Glue sensitive data detection offers an easy way to identify and process private data, without coding. This feature allows you to detect and redact sensitive data when it’s ingested into a data lake, enforcing data privacy before the data is available to data consumers. AWS Glue sensitive data detection is generally available in all Regions that support AWS Glue.

To learn more and get started using AWS Glue sensitive data detection, refer to Detect and process sensitive data.


About the author

Leonardo Gómez is a Senior Analytics Specialist Solutions Architect at AWS. Based in Toronto, Canada, he has over a decade of experience in data management, helping customers around the globe address their business and technical needs. Connect with him on LinkedIn

Identify source schema changes using AWS Glue

Post Syndicated from Narendra Gupta original https://aws.amazon.com/blogs/big-data/identify-source-schema-changes-using-aws-glue/

In today’s world, organizations are collecting an unprecedented amount of data from all kinds of different data sources, such as transactional data stores, clickstreams, log data, IoT data, and more. This data is often in different formats, such as structured data or unstructured data, and is usually referred to as the three Vs of big data (volume, velocity, and variety). To extract information from the data, it’s usually stored in a data lake built on Amazon Simple Storage Service (S3). The data lake provides an important characteristic called schema on read, which allows you to bring data in the data lake without worrying about the schema or changes in the schema on the data source. This enables faster ingestion of data or building data pipelines.

However, you may be reading and consuming this data for other use cases, such as pointing to applications, building business intelligence (BI) dashboards in services like Amazon QuickSight, or doing data discovery using a serverless query engine like Amazon Athena. Additionally, you may have built an extract, transform, and load (ETL) data pipeline to populate your data store like a relational database, non-relational database, or data warehouse for further operational and analytical needs. In these cases, you need to define the schema upfront or even keep an account of the changes in the schema, such as adding new columns, deleting existing columns, changing the data type of existing columns, or renaming existing columns, to avoid any failures in your application or issues with your dashboard or reporting.

In many use cases, we have found that the data teams responsible for building the data pipeline don’t have any control of the source schema, and they need to build a solution to identify changes in the source schema in order to be able to build the process or automation around it. This might include sending notifications of changes to the teams dependent on the source schema, building an auditing solution to log all the schema changes, or building an automation or change request process to propagate the change in the source schema to downstream applications such as an ETL tool or BI dashboard. Sometimes, to control the number of schema versions, you may want to delete the older version of the schema when there are no changes detected between it and the newer schema.

For example, assume you’re receiving claim files from different external partners in the form of flat files, and you’ve built a solution to process claims based on these files. However, because these files were sent by external partners, you don’t have much control over the schema and data format. For example, columns such as customer_id and claim_id were changed to customerid and claimid by one partner, and another partner added new columns such as customer_age and earning and kept the rest of the columns the same. You need to identify such changes in advance so you can edit the ETL job to accommodate the changes, such as changing the column name or adding the new columns to process the claims.

In this solution, we showcase a mechanism that simplifies the capture of the schema changes in your data source using an AWS Glue crawler.

Solution overview

An AWS Glue data crawler is built to sync metadata based on existing data. After we identify the changes, we use Amazon CloudWatch to log the changes and Amazon Simple Notification Service (Amazon SNS) to notify the changes to the application team over email. You can expand this solution to solve for other use cases such as building an automation to propagate the changes to downstream applications or pipelines, which is out of scope for this post, to avoid any failures in downstream applications because of schema changes. We also show a way to delete older versions of the schema if there are no changes between the compared schema versions.

If you want to capture the change in an event-driven manner, you can do so by using Amazon EventBridge. However, if you want to capture the schema changes on multiple tables at the same time, based on a specific schedule, you can use the solution in this post.

In our scenario, we have two files, each with different schemas, simulating data that has undergone a schema change. We use an AWS Glue crawler to extract the metadata from data in an S3 bucket. Then we use an AWS Glue ETL job to extract the changes in the schema to the AWS Glue Data Catalog.

AWS Glue provides a serverless environment to extract, transform, and load a large number of datasets from several sources for analytic purposes. The Data Catalog is a feature within AWS Glue that lets you create a centralized data catalog of metadata by storing and annotating data from different data stores. Examples include object stores like Amazon S3, relational databases like Amazon Aurora PostgreSQL-Compatible Edition, and data warehouses like Amazon Redshift. You can then use that metadata to query and transform the underlying data. You use a crawler to populate the Data Catalog with tables. It can automatically discover new data, extract schema definitions, detect schema changes, and version tables. It can also detect Hive-style partitions on Amazon S3 (for example year=YYYY, month=MM, day=DD).

Amazon S3 serves as the storage for our data lake. Amazon S3 is an object storage service that offers industry-leading scalability, data availability, security, and performance.

The following diagram illustrates the architecture for this solution.

The workflow includes the following steps:

  1. Copy the first data file to the data folder of the S3 bucket and run the AWS Glue crawler to create a new table in the Data Catalog.
  2. Move the existing file from the data folder to the archived folder.
  3. Copy the second data file with the updated schema to the data folder, then rerun the crawler to create new version of table schema.
  4. Run the AWS Glue ETL job to check if there is a new version of the table schema.
  5. The AWS Glue job lists the changes in the schema with the previous version of the schema in CloudWatch Logs. If there are no changes in the schema and the flag to delete older versions is set to true, the job also deletes the older schema versions.
  6. The AWS Glue job notifies all changes in the schema to the application team over email using Amazon SNS.

To build the solution, complete the following steps:

  1. Create an S3 bucket with the data and archived folders to store the new and processed data files.
  2. Create an AWS Glue database and an AWS Glue crawler that crawls the data file in the data folder to create an AWS Glue table in the database.
  3. Create an SNS topic and add an email subscription.
  4. Create an AWS Glue ETL job to compare the two versions of the table schema, list the changes in the schema with the older version of schema, and delete older versions of schema if the flag to delete older versions is set to true. The job also publishes an event in Amazon SNS to notify the changes in the schema to the data teams.

For the purpose of this post, we manually perform the steps to move the data files from the data folder to the archive folder, triggering the crawler and ETL job. Depending on your application needs, you can automate and orchestrate this process through AWS Glue workflows.

Let’s set up the infrastructure required to go through the solution to compare an AWS Glue table version to a version updated with recent schema changes.

Create an S3 bucket and folders

To create an S3 bucket with the data and archived folders to store the new and processed data files, complete the following steps:

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose Create bucket.
  3. For Bucket name, enter a DNS-compliant unique name (for example, aws-blog-sscp-ng-202202).
  4. For Region, choose the Region where you want the bucket to reside.
  5. Keep all other settings as default and choose Create bucket.
  6. On the Buckets page, choose the newly created bucket.
  7. Choose Create folder.
  8. For Folder name, enter data.
  9. Leave server-side encryption at its default (disabled).
  10. Choose Create folder.
  11. Repeat these steps to create the archived folder in the same bucket.

Create an AWS Glue database and crawler

Now we create an AWS Glue database and crawler that crawls the data file in the data bucket to create an AWS Glue table in the new database.

  1. On the AWS Glue console, choose Databases in the navigation pane.
  2. Choose Add database.
  3. Enter a name (for example, sscp-database) and description.
  4. Choose Create.
  5. Choose Crawlers in the navigation pane.
  6. Choose Add crawler.
  7. For Crawler name, enter a name (glue-crawler-sscp-sales-data).
  8. Choose Next.
  9. For the crawler source type¸ choose Data stores.
  10. To repeat crawls of the data stores, choose Crawl all folders.
  11. Choose Next.
  12. For Choose a data store, choose S3.
  13. For Include path, choose the S3 bucket and folder you created (s3://aws-blog-sscp-ng-202202/data).
  14. Choose Next.
  15. On the Add another data store page, choose No, then choose Next.
  16. Choose Create an IAM role and enter a name for the role (for example, sscp-blog).
  17. Choose Next.
  18. Choose Run on Demand, then choose Next.
  19. For Database, choose your AWS Glue database (sscp-database).
  20. For Prefix added to tables, enter a prefix (for example, sscp_sales_).
  21. Expand the Configuration options section and choose Update the table definition in the data catalog.
  22. Leave all other settings as default and choose Next.
  23. Choose Finish to create the crawler.

Create an SNS topic

To create an SNS topic and add an email subscription, complete the following steps:

  1. On the Amazon SNS console, choose Topics in the navigation pane.
  2. Choose Create topic.
  3. For Type, choose Standard.
  4. Enter a name for the topic (for example, NotifySchemaChanges).
  5. Leave all other settings as default and choose Create topic.
  6. In the navigation pane, choose Subscriptions.
  7. Choose Create subscription.
  8. For Topic ARN, choose the ARN of the created SNS topic.
  9. For Protocol, choose Email.
  10. For Endpoint, enter the email address to receive notifications.
  11. Leave all other defaults and choose Create subscription.You should receive an email to confirm the subscription.
  12. Choose the link in the email to confirm.
  13. Add the following permission policy to the AWS Glue service role created earlier as part of the crawler creation (AWSGlueServiceRole-sscp-blog) to allow publishing to the SNS topic. Make sure to change <$SNSTopicARN> in the policy with the actual ARN of the SNS topic.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowEventPublishing",
            "Effect": "Allow",
            "Action": "sns:Publish",
            "Resource": "<$SNSTopicARN>"
        }
    ]
}

Create an AWS Glue ETL job

Now you create an AWS Glue ETL job to compare two schema versions of a table and list the changes in schemas. If there are no changes in the schema and the flag to delete older versions is set to true, the job also deletes any older versions. If there are changes in schema, the job lists changes in the CloudWatch logs and publishes an event in Amazon SNS to notify changes to the data team.

  1. On the AWS Glue console, choose AWS Glue Studio.
  2. Choose Create and manage jobs.
  3. Choose the Python Shell script editor.
  4. Choose Create to create a Python Shell job.
  5. Enter the following code in the script editor field:
    import boto3
    import pandas as pd
    
    # Input Paramaters:  
    # catalog_id - Your AWS Glue Catalg Id - it is same as your AWS account ID
    # db_name - name of your AWS Glue Database in your Glue Data catalog_id
    # table_name - name of the table in your AWS Glue Database that you would like to check of change in schema
    # topic_arn - ARN of the SNS topic to publish the changes in table schema
    # versions_to_compare - Two versions that customer would want to compare. 0 is the lastes version and 1 in the version prior to the latest version
    # delete_old_versions - If True and there are no changes in the versions compared, job would delete all old versions except for the latest "number_of_versions_to_retain" versions 
    # number_of_versions_to_retain - if delete_old_versions is True and there are no changes in the versions compared, the job would delete all old versions except for the latest "number_of_versions_to_retain" versions
    
    catalog_id = '<$catalog_id>'
    db_name='<$db_name>'
    table_name='<$table_name>'
    topic_arn='<$sns_topic_ARN>'
    versions_to_compare=[0,1]
    delete_old_versions = False
    number_of_versions_to_retain = 2
    
    columns_modified = []
    
    # Function to compare the name and type of columns in new column list with old column list to 
    # find any newly added column and the columns with changed data type
    def findAddedUpdated(new_cols_df, old_cols_df, old_col_name_list):
        for index, row in new_cols_df.iterrows():
            new_col_name = new_cols_df.iloc[index]['Name']
            new_col_type = new_cols_df.iloc[index]['Type']
    
            # Check if a column with same name exist in old table but the data type has chaged
            if new_col_name in old_col_name_list:
                old_col_idx = old_cols_df.index[old_cols_df['Name']==new_col_name][0]
                old_col_type = old_cols_df.iloc[old_col_idx]['Type']
    
                if old_col_type != new_col_type:
                    columns_modified.append(f"Data type changed for '{new_col_name}' from '{old_col_type}' to '{new_col_type}'")
            # If a column is only in new column list, it a newly added column
            else:
                columns_modified.append(f"Added new column '{new_col_name}' with data type as '{new_col_type}'")
    
    # Function to iterate through the list of old columns and check if any column doesn't exist in new columns list to find out dropped columns
    def findDropped(old_cols_df, new_col_name_list):
        for index, row in old_cols_df.iterrows():
            old_col_name = old_cols_df.iloc[index]['Name']
            old_col_type = old_cols_df.iloc[index]['Type']
    
            #check if column doesn't exist in new column list  
            if old_col_name not in new_col_name_list:
                columns_modified.append(f"Dropped old column '{old_col_name}' with data type as '{old_col_type}'")
    
    # Function to publish changes in schema to a SNS topic that can be subscribed to receive email notifications when changes are detected
    def notifyChanges(message_to_send):
        sns = boto3.client('sns')
        # Publish a simple message to the specified SNS topic
        response = sns.publish(
            TopicArn=topic_arn,   
            Message=message_to_send,  
            Subject="DWH Notification: Changes in table schema"
        )
        
    # Function to convert version_id to int to use for sorting the versions
    def version_id(json):
        try:
            return int(json['VersionId'])
        except KeyError:
            return 0
    
    # Function to delete the table versions
    def delele_versions(glue_client, versions_list, number_of_versions_to_retain):
        print("deleting old versions...")
        if len(versions_list) > number_of_versions_to_retain:
            version_id_list = []
            for table_version in versions_list:
                version_id_list.append(int(table_version['VersionId']))
            # Sort the versions in descending order
            version_id_list.sort(reverse=True)
            versions_str_list = [str(x) for x in version_id_list]
            versions_to_delete = versions_str_list[number_of_versions_to_retain:]
            
            del_response = glue_client.batch_delete_table_version(
                DatabaseName=db_name,
                TableName=table_name,
                VersionIds=versions_to_delete
            )
            return del_response
    
    # Calling glue API to get the list of table versions. The solution assums that number of version in the table are less than 100. If you have more than 100 versions, you should use pagination and loop through each page.  
    glue = boto3.client('glue')
    response = glue.get_table_versions(
        CatalogId=catalog_id,
        DatabaseName=db_name,
        TableName=table_name,
        MaxResults=100
    )
    table_versions = response['TableVersions']
    table_versions.sort(key=version_id, reverse=True)
    
    version_count = len(table_versions)
    print(version_count)
    
    # checking if the version of table to compare exists. You would need pass the numbers of versions to compare to the job. 
    if version_count > max(versions_to_compare):
    
        new_columns = table_versions[versions_to_compare[0]]['Table']['StorageDescriptor']['Columns']
        new_cols_df = pd.DataFrame(new_columns)
    
        old_columns = table_versions[versions_to_compare[1]]['Table']['StorageDescriptor']['Columns']
        old_cols_df = pd.DataFrame(old_columns)
    
        new_col_name_list =  new_cols_df['Name'].tolist()
        old_col_name_list =  old_cols_df['Name'].tolist()
        findAddedUpdated(new_cols_df, old_cols_df, old_col_name_list)
        findDropped(old_cols_df, new_col_name_list)
        if len(columns_modified) > 0: 
            email_msg = f"Following changes are identified in '{table_name}' table of '{db_name}' database of your Datawarehouse. Please review.\n\n"
            print("Job completed! -- below is list of changes.")
            for column_modified in columns_modified:
                email_msg += f"\t{column_modified}\n"
    
            print(email_msg)
            notifyChanges(email_msg)
        else:
            if delete_old_versions:
                delele_versions(glue, table_versions,number_of_versions_to_retain)
            print("Job completed! -- There are no changes in table schema.")
    else:
        print("Job completed! -- Selected table doesn't have the number of versions selected to compare. Please verify the list passed in 'versions_to_compare'")

  6. Enter a name for the job (for example, find-change-job-sscp).
  7. For IAM Role, choose the AWS Glue service role (AWSGlueServiceRole-sscp-blog).
  8. Leave all other defaults and choose Save.

Test the solution

We’ve configured the infrastructure to run the solution. Let’s now see it in action. First we upload the first data file and run our crawler to create a new table in the Data Catalog.

  1. Create a CSV file called salesdata01.csv with the following contents:
    ordertime,region,rep,item,units,unitcost
    2022-01-06,US-West,Jones,Pencil,95,1.99
    2022-01-06,US-Central,Kivell,Binder,50,19.99
    2022-01-07,US-Central,Jardine,Pencil,36,4.99
    2022-01-07,US-Central,Gill,Pen,27,19.99
    2022-01-08,US-East,Sorvino,Pencil,56,2.99
    2022-01-08,US-West,Jones,Binder,60,4.99
    2022-01-09,US-Central,Andrews,Pencil,75,1.99
    2022-01-10,US-Central,Jardine,Pencil,90,4.99
    2022-01-11,US-East,Thompson,Pencil,32,1.99
    2022-01-20,US-West,Jones,Binder,60,8.99

  2. On the Amazon S3 console, navigate to the data folder and upload the CSV file.
  3. On the AWS Glue console, choose Crawlers in the navigation pane.
  4. Select your crawler and choose Run crawler.The crawler takes a few minutes to complete. It adds a table (sscp_sales_data) in the AWS Glue database (sscp-database).
  5. Verify the created table by choosing Tables in the navigation pane.Now we move the existing file in the data folder to the archived folder.
  6. On the Amazon S3 console, navigate to the data folder.
  7. Select the file you uploaded (salesdata01.csv) and on the Actions menu, choose Move.
  8. Move the file to the archived folder.Now we copy the second data file with the updated schema to the data folder and rerun the crawler.
  9. Create a CSV file called salesdata02.csv with the following code. It contains the following changes from the previous version:
    1. The data in the region column is changed from region names to some codes (for example, the data type is changed from string to BIGINT).
    2. The rep column is dropped.
    3. The new column total is added.
      ordertime,region,item,units,unitcost,total
      2022-02-01,01,Pencil,35,4.99,174.65
      2022-02-01,03,Desk,2,125,250
      2022-02-02,01,Pen Set,16,15.99,255.84
      2022-02-02,03,Binder,28,8.99,251.72
      2022-02-03,01,Pen,64,8.99,575.36
      2022-02-03,01,Pen,15,19.99,299.85
      2022-02-06,03,Pen Set,96,4.99,479.04
      2022-02-10,03,Pencil,67,1.29,86.43
      2022-02-11,01,Pen Set,74,15.99,183.26
      2022-02-11,03,Binder,46,8.99,413.54

  10. On the Amazon S3 bucket, upload the file to the data folder.
  11. On the AWS Glue console, choose Crawlers in the navigation pane.
  12. Select your crawler and choose Run crawler.The crawler takes approximately 2 minutes to complete. It updates the schema of the previously created table (sscp_sales_data).
  13. Verify the new version of the table is created on the Tables page.Now we run the AWS Glue ETL job to check if there is a new version of the table schema and list the changes in the schema with the previous version of the schema in CloudWatch Logs.
  14. On the AWS Glue console, choose Jobs in the navigation pane.
  15. Select your job (find-change-job-sscp) and on the Actions menu, choose Edit script.
  16. Change the following input parameters for the job in the script to match with your configuration:
  17. Choose Save.
  18. Close the script editor.
  19. Select the job again and on the Actions menu, choose Run job.
  20. Leave all default parameters and choose Run job.
  21. To monitor the job status, choose the job and review the History tab.
  22. When the job is complete, choose the Output link to open the CloudWatch logs for the job.

The log should show the changes identified by the AWS Glue job.

You should also receive an email with details on the changes in the schema. The following is an example of an email received.

We can now review the changes identified by the AWS Glue ETL job and make changes in the downstream data store accordingly before running the job to propagate the data from the S3 bucket to downstream applications. For example, if you have an Amazon Redshift table, after the job lists all the schema changes, you need to connect to the Amazon Redshift database and make these schema changes. Follow the change request process set by your organization before making schema changes in your production system.

The following table has a list of mappings for Apache Hive and Amazon Redshift data types. You can find similar mappings for other data stores and update your downstream data store.

The provided Python code takes care of the logic to compare the schema changes. The script takes in the parameters of the AWS Glue Data Catalog ID, AWS Glue database name, and AWS Glue table name.

Hive Data Types Description Amazon Redshift Data Types Description
TINYINT 1 byte integer . .
SMALLINT Signed two-byte integer SMALLINT Signed two-byte integer
INT Signed four-byte integer INT Signed four-byte integer
BIGINT Signed eight-byte integer BIGINT Signed eight-byte integer
DECIMAL . . .
DOUBLE . . .
STRING . VARCHAR, CHAR .
VARCHAR 1 to 65355, available starting with Hive 0.12.0 VARCHAR .
CHAR 255 length, available starting with Hive 0.13.0 CHAR .
DATE year/month/day DATE year/month/day
TIMESTAMP No timezone TIME Time without time zone
. . TIMETZ Time with time zone
ARRAY/STRUCTS . SUPER .
BOOLEAN . BOOLEAN .
BINARY . VARBYTE Variable-length binary value

Clean up

When you’re done exploring the solution, clean up the resources you created as part of this walkthrough:

  • AWS Glue ETL job (find-change-job-sscp)
  • AWS Glue crawler (glue-crawler-sscp-sales-data)
  • AWS Glue table (sscp_sales_data)
  • AWS Glue database (sscp-database)
  • IAM role for the crawler and ETL job (AWSGlueServiceRole-sscp-blog)
  • S3 bucket (aws-blog-sscp-ng-202202) with all the files in the data and archived folders
  • SNS topic and subscription (NotifySchemaChanges)

Conclusion

In this post, we showed you how to use AWS services together to detect schema changes in your source data, which you can then use to change your downstream data stores and run ETL jobs to avoid any failures in your data pipeline. We used AWS Glue to understand and catalog the source data schema, AWS Glue APIs to identify schema changes, and Amazon SNS to notify the team about the changes. We also showed you how to delete the older versions of your source data schema using AWS Glue APIs. We used Amazon S3 as our data lake storage tier.

Here you can learn more about AWS Glue.


About the authors

Narendra Gupta is a Specialist Solutions Architect at AWS, helping customers on their cloud journey with a focus on AWS analytics services. Outside of work, Narendra enjoys learning new technologies, watching movies, and visiting new places.

Navnit Shukla is AWS Specialist Solutions Architect in Analytics. He is passionate about helping customers uncover insights from their data. He has been building solutions to help organizations make data-driven decisions.

Run Apache Spark with Amazon EMR on EKS backed by Amazon FSx for Lustre storage

Post Syndicated from Vara Bonthu original https://aws.amazon.com/blogs/big-data/run-apache-spark-with-amazon-emr-on-eks-backed-by-amazon-fsx-for-lustre-storage/

Traditionally, Spark workloads have been run on a dedicated setup like a Hadoop stack with YARN or MESOS as a resource manager. Starting from Apache Spark 2.3, Spark added support for Kubernetes as a resource manager. The new Kubernetes scheduler natively supports the submission of Spark jobs to a Kubernetes cluster. Spark on Kubernetes provides simpler administration, better developer experience, easier dependency management with containers, a fine-grained security layer, and optimized resource allocation. As a result, Spark on Kubernetes gained much traction for high-performance and cost-effective ways of running big data and machine learning (ML) workloads.

In AWS, we offer a managed service, Amazon EMR on EKS, to run your Apache Spark workloads on Amazon Elastic Kubernetes Service (Amazon EKS) . This service uses the Amazon EMR runtime for Apache Spark, which increases the performance of your Spark jobs so that they run faster and cost less. EMR on EKS lets you run Spark applications alongside other application types on the same Amazon EKS cluster to improve resource utilization. In addition, EMR on EKS integrates with Amazon EMR Studio for authoring jobs and the Apache Spark UI for debugging out of the box to simplify infrastructure management.

For storage, EMR on EKS supports node ephemeral storage using hostPath where the storage is attached to individual nodes, and Amazon Elastic Block Store (Amazon EBS) volume per executor/driver pod using dynamic Persistent Volume Claims. However, some Spark users are looking for an HDFS-like shared file system to handle specific workloads like time-sensitive applications or streaming analytics. HDFS is best suited for jobs that requires highly interactive speed for a large number of files with random access reads, atomic rename operations, and sequential metadata requests.

Amazon FSx for Lustre is a fully managed shared storage option built on the world’s most popular high-performance file system. It offers highly scalable, cost-effective storage, which provides sub-millisecond latencies, millions of IOPS, and throughput of hundreds of gigabytes per second. Its popular use cases include high-performance computing (HPC), financial modeling, video rendering, and machine learning. FSx for Lustre supports two types of deployments:

  • Scratch file systems – These are designed for temporary or short-term storage where the data is not needed to replicate or persist if a file server fails
  • Persistent file systems – These are suitable for long-term storage where the file server is highly available and the data is replicated within the Availability Zone

In both deployment types, automatic data sync between the mounted file system and Amazon Simple Storage Service (Amazon S3) buckets is supported, helping you offload large volumes of cold and warm data for a better cost-efficient design. It makes multi-AZ or multi-region failover possible via Amazon S3 for businesses that require resiliency and availability.

This post demonstrates how to use EMR on EKS to submit Spark jobs with FSx for Lustre as the storage. It can be mounted on Spark driver and executor pods through static and dynamic PersistentVolumeClaims methods.

Static vs. dynamic provisioning

With static provisioning, the FSx for Lustre file system and PersistentVolume (PV) must be created in advance. The following diagram illustrates the static provisioning architecture. The Spark application driver and executor pods refer to an existing static PersistentVolumeClaim (PVC) to mount the FSx for Lustre file system.

Unlike static provisioning, the FSx for Lustre file system and PV doesn’t need to be pre-created for dynamic provisioning. As shown in the following diagram, the FSx for Lustre CSI driver plugin is deployed to an Amazon EKS cluster to dynamically provision the FSx for Lustre file system with a given PVC. Dynamic provisioning only requires a PVC and the corresponding storage class. After the PVC is created in Kubernetes, the FSx for Lustre CSI driver identifies the storage class and creates the requested file system.

The Spark application driver and executor pods in the architecture refer to an existing dynamic PVC to mount the FSx for Lustre file system.

Solution overview

In this post, you provision the following resources with Amazon EKS Blueprints for Terraform to run Spark jobs using EMR on EKS:

Pre-requisites

Before you build the entire infrastructure, you must have the following prerequisites:

Now you’re ready to deploy the solution.

Clone the GitHub repo

Open your terminal window, change to the home directory, and clone the GitHub repo:

cd ~
git clone https://github.com/aws-ia/terraform-aws-eks-blueprints.git

Then, navigate to the following:

cd ~/terraform-aws-eks-blueprints/examples/analytics/emr-eks-fsx-lustre

Initialize Terraform

Initialize the project, which downloads plugins that allow Terraform to interact with AWS services:

terraform init

Run terraform plan

Run terraform plan to verify the resources created by this deployment:

export AWS_REGION="<enter-your-region>"
terraform plan

The terraform plan output shows the resources that are created by this plan.

Run terraform apply

Run terraform apply to deploy the resources:

terraform apply --auto-approve

This deployment may take up to 30 minutes to create all the resources.

Verify the resources

Verify the Amazon EKS cluster created by the deployment. This following command displays the cluster details in JSON format:

aws eks describe-cluster --name emr-eks-fsx-lustre

Let’s create a kubeconfig file for the EKS cluster with the following command. This command creates a new cluster context entry with certificate authority data under ~/.kube/config to authenticate with the EKS cluster:

aws eks --region <ENTER_YOUR_REGION> update-kubeconfig --name emr-eks-fsx-lustre

Verify the managed node groups:

aws eks list-nodegroups —cluster-name emr-eks-fsx-lustre

The output should show two node groups:

{
    "nodegroups": [
        "core-node-grp-<some_random_numbers>",
        "spark-node-grp-<some_random_numbers>"
    ]
}

List the pods created by the FSx for Lustre CSI driver. The following command shows two controllers and an fsx-csi-node daemonset pod for each node:

kubectl get pods -n kube-system | grep fsx

List the namespace created for emr-data-team-a:

kubectl get ns | grep emr-data-team-a

The output will display the active namespace.

List the FSx storage class, PV, and PVCs created by this deployment. You may notice that fsx-dynamic-pvc is in Pending status because this dynamic PVC is still creating the FSx for Lustre. The dynamic PV status changed to Bound after the file system was created.

#FSx Storage Class
kubectl get storageclasses | grep fsx
  emr-eks-fsx-lustre   fsx.csi.aws.com         Delete          Immediate              false                  109s

# Output of static persistent volume with name fsx-static-pv
kubectl get pv | grep fsx  
  fsx-static-pv                              1000Gi     RWX            Recycle          Bound    emr-data-team-a/fsx-static-pvc       fsx

# Output of static persistent volume claim with name fsx-static-pvc and fsx-dynamic-pvc
kubectl get pvc -n emr-data-team-a | grep fsx
  fsx-dynamic-pvc   Pending                                             fsx            4m56s
  fsx-static-pvc    Bound     fsx-static-pv   1000Gi     RWX            fsx            4m56s

Log in to the FSx for Lustre console and verify the two file systems created by this deployment:

  • The first file system (emr-eks-fsx-lustre-static) is a persistent file system created with the Terraform resource
  • The second file system (fs-0e77adf20acb4028f) is created by the FSx for Lustre CSI driver dynamically with a dynamic PVC manifest

In this demo, we learn how to use a statically provisioned FSx for Lustre file system and dynamically provisioned FSx for Lustre file system in EMR on EKS Spark jobs.

Static provisioning

You can create an FSx for Lustre file system using the AWS CLI or any infrastructure as code (IaC) tool. In this example, we used Terraform to create the FSx for Lustre file system with deployment type as PERSISTENT_2. For static provisioning, we must create the FSx for Lustre file system first, followed by the PV and PVCs. After we create all three resources, we can mount the FSx for Lustre file system on a Spark driver and executor pod.

We use the following Terraform code snippet in the deployment to create the FSx for Lustre file system (2400 GB) and the file system association with the S3 bucket for import and export under the /data file system path. Note that this resource refers to a single subnet (single Availability Zone) for creating an FSx for Lustre file system. However, the Spark pods can use this file system across all Availability Zones, unlike the EBS volume, which is Availability Zone specific. In addition, the FSx for Lustre association with the S3 bucket creates a file system directory called /data. The Spark job driver and executor pod templates use this /data directory as a spark-local-dir for scratch space.

# New FSx for Lustre filesystem
resource "aws_fsx_lustre_file_system" "this" {
  deployment_type             = "PERSISTENT_2"
  storage_type                = "SSD"
  per_unit_storage_throughput = "500"
  storage_capacity            = 2400

  subnet_ids         = [module.vpc.private_subnets[0]]
  security_group_ids = [aws_security_group.fsx.id]
  log_configuration {
    level = "WARN_ERROR"
  }
  tags = merge({ "Name" : "${local.name}-static" }, local.tags)
}

# S3 bucket association with FSx for Lustre filesystem
resource "aws_fsx_data_repository_association" "example" {
  file_system_id       = aws_fsx_lustre_file_system.this.id
  data_repository_path = "s3://${aws_s3_bucket.this.id}"
  file_system_path     = "/data" # This directory will be used in Spark podTemplates under volumeMounts as subPath

  s3 {
    auto_export_policy {
      events = ["NEW", "CHANGED", "DELETED"]
    }

    auto_import_policy {
      events = ["NEW", "CHANGED", "DELETED"]
    }
  }
}

Persistent Volume

The following YAML template shows the definition of the PV created by this deployment. For example, running the command kubectl edit pv fsx-static-pv -n kube-system displays the manifest. PVs are a cluster scoped resource, therefore no namespace is defined in the template. The DevOps or cluster admin teams typically create this.

apiVersion: v1
kind: PersistentVolume
metadata:
  name: fsx-static-pv
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 1000Gi
  claimRef:  # PV Claimed by fsx-static-pvc                
    apiVersion: v1
    kind: PersistentVolumeClaim             
    name: fsx-static-pvc
    namespace: emr-data-team-a
    resourceVersion: "5731"
    uid: 9110afc4-c605-440e-b022-190904866f0c
  csi:
    driver: fsx.csi.aws.com
    volumeAttributes:
      dnsname: fs-0a85fd096ef3f0089.fsx.eu-west-1.amazonaws.com # FSx DNS Name
      mountname: fz5jzbmv
    volumeHandle: fs-0a85fd096ef3f0089
  mountOptions:
  - flock
  persistentVolumeReclaimPolicy: Recycle

Persistent Volume Claim

The following YAML template shows the definition of the PVC created by this deployment. For example, running the command kubectl edit pvc fsx-static-pvc -n emr-data-team-a shows the deployed resource.

PVCs are namespace-specific resources typically created by the developers. The emr-data-team-a namespace is defined in the template.

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: fsx-static-pvc
  namespace: emr-data-team-a
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 1000Gi
  storageClassName: fsx
  volumeMode: Filesystem
  volumeName: fsx-static-pv
status:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 1000Gi
  phase: Bound

Now that we have set up the static FSx for Lustre file system, we can use the PVC in EMR on EKS Spark jobs with pod templates. Key things to note in the template are that the volumes section in the following code is defined as persistentVolumeClaim with the claim name as fsx-static-pvc, and the containers section refers to the unique mountPath folder /static. We also use initContainers in the driver pod template to give correct permissions and ownership to the Hadoop users to be used by EMR on EKS driver executor pods. Finally, notice that data in the subPath is associated with the S3 bucket sync in the preceding Terraform resource.

We use the following driver pod template:

apiVersion: v1
kind: Pod
metadata:
  name: fsx-taxi-driver
  namespace: emr-data-team-a
spec:
  volumes:
    - name: spark-local-dir-ny-taxi
      persistentVolumeClaim:
        claimName: fsx-static-pvc # Static PVC pre-created by this example terraform template

  nodeSelector:
    NodeGroupType: spark

  containers:
    - name: spark-kubernetes-driver 
      volumeMounts:
        - name: spark-local-dir-ny-taxi
          mountPath: /static
          subPath: data # sub folder created in FSx for Lustre filesystem and mapped to s3 bucket sync and export
          readOnly: false
  initContainers:
    - name: spark-init-container-driver  
      image: public.ecr.aws/y4g4v0z7/busybox
      volumeMounts:
        - name: spark-local-dir-ny-taxi
          mountPath: /static
      command: ["sh", "-c", "chmod -R 777 /static", "chown -hR +999:+1000 /static/data"]

The executor pod template also refers to the same persistentVolumeClaim as fsx-static-pvc and volumeMounts with mountPath as /static. Notice that we don’t use the initContainers section in this template because the required permissions for the file system directory /static/data have been applied by the driver processes already. Because it’s a shared file system, the same permissions apply to the executor process as well.

apiVersion: v1
kind: Pod
metadata:
  name: fsx-taxi-exec
  namespace: emr-data-team-a
spec:
  volumes:
    - name: spark-local-dir-ny-taxi
      persistentVolumeClaim:
        claimName: fsx-static-pvc # Static PVC pre-created by this example terraform template

  nodeSelector:
    NodeGroupType: spark

  containers:
    - name: spark-kubernetes-executor # Don't change this name. EMR on EKS looking for this name
      volumeMounts:
        - name: spark-local-dir-ny-taxi
          mountPath: /static # mountPath name can be anything but this should match with Driver template as well
          subPath: data # sub folder created in FSx for Lustre filesystem and mapped to s3 bucket sync and export
          readOnly: false

Let’s run the sample PySpark script using the preceding pod templates. Navigate to the examples/spark-execute directory and run the shell script (fsx-static-spark.sh):

cd ~/terraform-aws-eks-blueprints/examples/analytics/emr-eks-fsx-lustre/examples/spark-execute

This shell script expects three input values. EMR_VIRTUAL_CLUSTER_ID and EMR_JOB_EXECUTION_ROLE_ARN can be extracted from the Terraform output values. Additionally, you create an S3 bucket with required permissions. This S3 bucket stores the sample PySpark scripts, pod templates, input and output data generated by this shell script, and the Spark job. Check out the shell script for more details.

EMR_VIRTUAL_CLUSTER_ID=$1     # Terraform output variable: emrcontainers_virtual_cluster_id    
S3_BUCKET=$2                  # This script requires s3 bucket as input parameter e.g., s3://<bucket-name>    
EMR_JOB_EXECUTION_ROLE_ARN=$3 # Terraform output variable: emr_on_eks_role_arn

Let’s run the fsx-static-spark.sh shell script. This job takes approximately 6 minutes by two executors, which processes 40 objects with a total size of 1.4 GB. Each object is around 36.4 MB. You can adjust the number of objects from 40 to any large number to process a large amount of data. This shell script downloads the public dataset (NY Taxi Trip Data) locally in your disk and uploads it to the S3 bucket using Amazon S3 sync. PySpark jobs read the data from the S3 buckets, apply GroupBy on a few fields, and write back to the S3 bucket to demonstrate the shuffling activity.

./fsx-static-spark.sh <EMR_VIRTUAL_CLUSTER_ID> \
s3://<YOUR_BUCKET_NAME> \
<EMR_JOB_EXECUTION_ROLE_ARN>

You can run the following queries to monitor the Spark job and the usage of the FSx for Lustre file system mounted on the driver and executor pods. Verify the job run events with the following command:

kubectl get pods --namespace=emr-data-team-a -w

You will notice one job object pod, a driver pod, and two executor pods. The Spark executor instances count can be updated in the Shell script.

You can also query to monitor the usage of FSx for Lustre mounted file system size. The following command shows the size of the mounted file system growth during the test run:

# Verify the used FSx for Lustre filesystem disk size with executor1
kubectl exec -ti ny-taxi-trip-static-exec-1 -c spark-kubernetes-executor -n emr-data-team-a — df -h

# Verify the files created under /static/data FSx mount
kubectl exec -ti ny-taxi-trip-static-exec-1 -c spark-kubernetes-executor -n emr-data-team-a — ls -lah /static

# Verify the file sync from FSx to S3 bucket. 
aws s3 ls s3://<YOUR_SYNC_BUCKET_NAME_FROM_TERRAFORM_OUTPUT>/

The following diagram shows the output for the preceding commands. The files under the executor are the same as those under the S3 bucket. These files are the same because the S3 sync feature is enabled in the FSx for Lustre file system. This test uses the FSx for Lustre file system for scratch space, so the shuffle files will be deleted from the FSx for Lustre file system and S3 bucket when the test is complete.

This PySpark job is writing the aggregated and repartition output directly to an S3 bucket location. Instead, you can choose to write to the FSx for Lustre file system path, which syncs to an S3 bucket eventually. The FSx for Lustre file system provides low latency, high throughput, and high IOPS for reading and writing data by multiple Spark Jobs. In addition, the data stored in FSx disk is synced to an S3 bucket for durable storage.

You can monitor the FSx for Lustre file system using Amazon CloudWatch metrics. The following time series graph shows the average stats with a period of 30 seconds.

When the Spark job is complete, you can verify the results in the Spark Web UI from the EMR on EKS console.

You can also verify the FSx for Lustre file system data sync to an S3 bucket.

Dynamic provisioning

So far, we have looked at an FSx for Lustre statically provisioned file system example and its usage with Spark jobs.

We can also provision an FSx for Lustre file system on-demand using the FSx for Lustre CSI driver and Persistent Volume Claim. Whenever you create a PVC with a dynamic volume referring to an FSx storage class, the FSx for Lustre CSI driver automatically provisions the FSx for Lustre file system and the corresponding Persistent Volume. Admin teams (DevOps) are responsible for deploying the FSx for Lustre CSI driver and FSx storage class, and the developers and data engineers (DataOps) are responsible for deploying the PVC, which refers to the FSx storage class.

The following storage class is deployed to Amazon EKS by this Terraform deployment. This dynamic PVC example doesn’t use the Amazon S3 backup association. You can still do that, but it requires an Amazon S3 config in the storage class manifest. Check out Dynamic Provisioning with Data Repository to configure the FSx storage class with the S3 import/export path with the choice of deployment type (SCRATCH_1, SCRATCH_2 and PERSISTENT_1). We have also created a dedicated security group used in this manifest. For more information, refer to File System Access Control with Amazon VPC.

kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  name: fsx
provisioner: fsx.csi.aws.com
parameters:
  securityGroupIds: sg-0c8a656a0bbb17fe2
  subnetId: subnet-03cb3d850193b907b
reclaimPolicy: Delete
volumeBindingMode: Immediate

The following YAML template shows the definition of the dynamic PVC used in this deployment. Running the command kubectl edit pvc fsx-dynamic-pvc -n emr-data-team-a shows the deployed resource. PVCs are a namespace-specific resources typically created by the developers, therefore we define the emr-data-team-a namespace.

Spark can dynamically provision the PVC with claimName using SparkConf (for example, spark.kubernetes.driver.volumes.persistentVolumeClaim.data.options.claimName=OnDemand). However, we recommend deploying the PVC before the start of Spark jobs to avoid delays to provision the FSx for Lustre file system during the job run. The FSx for Lustre file system takes approximately 10–12 minutes to complete.

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: fsx-dynamic-pvc
  namespace: emr-data-team-a
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 2000Gi
  storageClassName: fsx # PVC reference to Storage class created by Terraform
  volumeMode: Filesystem
  volumeName: pvc-0da5a625-03ba-48fa-b08e-3f74291c0e5e # Dynamically created Persistent Volume
status:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 2400Gi
  phase: Bound

Now that we have set up the dynamic FSx for Lustre file system, we can use this in EMR on EKS Spark jobs using pod templates. Key things to note in the following template are that the volumes section is defined as persistentVolumeClaim with the claim name as fsx-dynamic-pvc, and the containers section refers to the unique mountPath folder as /dynamic. We also use initContainers in the driver pod template to give correct permissions and ownership to the Hadoop users to be used by EMR on EKS driver executor processes.

The following is our driver pod template:

# NOTE: PVC created before the start of the Spark job to avoid waiting for 15 mins to create FSx for Lustre filesystem while the job is running
---
apiVersion: v1
kind: Pod
metadata:
  name: fsx-taxi-driver
  namespace: emr-data-team-a # Namespace used to submit the jobs
spec:
  volumes:
    - name: spark-local-dir-ny-taxi
      persistentVolumeClaim:
        claimName: fsx-dynamic-pvc  # Dynamic PVC pre-created by this example terraform template

  nodeSelector:
    NodeGroupType: spark

  containers:
    - name: spark-kubernetes-driver 
      volumeMounts:
        - name: spark-local-dir-ny-taxi
          mountPath: /dynamic # FSx SCRATCH_1 filesystem for executors scratch space
          readOnly: false
  initContainers:  # initContainer only used in Driver to set the permissions for dynamically created filesystem.
    - name: spark-init-container-driver  
      image: public.ecr.aws/y4g4v0z7/busybox
      volumeMounts:
        - name: spark-local-dir-ny-taxi
          mountPath: /dynamic # FSx Scratch 1 filesystem for executors scratch space
      command: ["sh", "-c", "chmod 777 /dynamic", "chown -hR +999:+1000 /dynamic"]

The executor pod template also refers to the same persistentVolumeClaim as fsx-dynamic-pvc and volumeMounts with mountPath as /dynamic:

apiVersion: v1
kind: Pod
metadata:
  name: fsx-taxi-exec
  namespace: emr-data-team-a
spec:
  volumes:
    - name: spark-local-dir-ny-taxi
      persistentVolumeClaim:
        claimName: fsx-dynamic-pvc  # Dynamic PVC pre-created by this example terraform template

  nodeSelector:
    NodeGroupType: spark

  containers:
    - name: spark-kubernetes-executor # Don't change this name. EMR on EKS looking for this name
      volumeMounts:
        - name: spark-local-dir-ny-taxi
          mountPath: /dynamic  # FSx Scratch 1 filesystem for executor’s scratch space
          readOnly: false

Let’s run the sample PySpark script using the preceding pod templates. Navigate to the examples/spark-execute directory and run the shell script (fsx-dynamic-spark.sh). This script is the same as the static provisioning example; the only difference is the pod templates, which refer to the dynamic volumes.

cd ~/terraform-aws-eks-blueprints/examples/analytics/emr-eks-fsx-lustre/examples/spark-execute

This shell script expects three input values: EMR_VIRTUAL_CLUSTER_ID, EMR_JOB_EXECUTION_ROLE_ARN, and your S3 bucket name. Use the same values used in the previous static provisioning example.

Let’s run the fsx-dynamic-spark.sh shell script:

./fsx-dynamic-spark.sh <EMR_VIRTUAL_CLUSTER_ID> \
s3://<YOUR_BUCKET_NAME> \
<EMR_JOB_EXECUTION_ROLE_ARN>

After the job is triggered, run the following commands to see the output of the job:

# Output of dynamic persistent volume claim fsx-dynamic-pvc
kubectl get pvc -n emr-data-team-a | grep fsx-dynamic-pvc

# Verify the used FSx for Lustre filesystem disk size with executor1
kubectl exec -ti ny-taxi-trip-dynamic-exec-1 -c spark-kubernetes-executor -n emr-data-team-a — df -h

# Verify the files created under /dynamic FSx mount
kubectl exec -ti ny-taxi-trip-dynamic-exec-1 -c spark-kubernetes-executor -n emr-data-team-a — ls -lah /dynamic

The following screenshot shows the file system mounted under the /dynamic path. We can also see the Spark shuffle files created in the /dynamic folder.

Clean up

To clean up your environment, destroy the Terraform modules in reverse order. Then, empty any S3 buckets created by this module and run the following commands:

cd ~/terraform-aws-eks-blueprints/examples/analytics/emr-eks-fsx-lustre

terraform destroy -target="module.eks_blueprints_kubernetes_addons" -auto-approve

terraform destroy -target="module.eks_blueprints" -auto-approve

terraform destroy -target="module.vpc" -auto-approve

# Finally, destroy any additional resources that are not in the above modules

terraform destroy -auto-approve

Furthermore, log in to the AWS Management Console and delete any S3 buckets or FSX for Lustre file systems created by this deployment to avoid unwanted charges to your AWS account.

Conclusion

In this post, we demonstrated how to mount an FSx for Lustre file system as a PVC to Spark applications with EMR on EKS. We showed two mounting methods: static provisioning and dynamic provisioning via the FSx for Lustre CSI driver. The HDFS-like storage can be used by Spark on a Kubernetes pattern to achieve optimal storage performance. You can use it either as a temporary scratch space to store intermediate data while processing, or as a shared, persistent file system to exchange data for multiple pods in a single job or between multiple Spark jobs.

If you want to try out the full solution or for more EMR on EKS examples, check out our open-sourced project on GitHub.


About the authors

Vara Bonthu is a Senior Open Source Engineer focused on data analytics and containers working with Strategic Accounts. He is passionate about open source, big data, Kubernetes, and has a substantial development, DevOps, and architecture background.

Karthik Prabhakar is a Senior Analytics Specialist Solutions Architect at AWS, helping strategic customers adopt and run AWS Analytics services.

Melody Yang is a Senior Big Data Solutions Architect for Amazon EMR at AWS. She is an experienced analytics leader working with AWS customers to provide best practice guidance and technical advice in order to assist their success in data transformation. Her areas of interests are open-source frameworks and automation, data engineering, and DataOps.

Choose the k-NN algorithm for your billion-scale use case with OpenSearch

Post Syndicated from Jack Mazanec original https://aws.amazon.com/blogs/big-data/choose-the-k-nn-algorithm-for-your-billion-scale-use-case-with-opensearch/

When organizations set out to build machine learning (ML) applications such as natural language processing (NLP) systems, recommendation engines, or search-based systems, often times k-Nearest Neighbor (k-NN) search will be used at some point in the workflow. As the number of data points reaches the hundreds of millions or even billions, scaling a k-NN search system can be a major challenge. Applying Approximate Nearest Neighbor (ANN) search is a great way to overcome this challenge.

The k-NN problem is relatively simple compared to other ML techniques: given a set of points and a query, find the k nearest points in the set to the query. The naive solution is equally understandable: for each point in the set, compute its distance from the query and keep track of the top k along the way.

K-NN concept

The problem with this naive approach is that it doesn’t scale particularly well. The runtime search complexity is O(Nlogk), where N is the number of vectors and k is the number of nearest neighbors. Although this may not be noticeable when the set contains thousands of points, it becomes noticeable when the size gets into the millions. Although some exact k-NN algorithms can speed search up, they tend to perform similarly to the naive approach in higher dimensions.

Enter ANN search. We can reduce the runtime search latency if we loosen a few constraints on the k-NN problem:

  • Allow indexing to take longer
  • Allow more space to be used at query time
  • Allow the search to return an approximation of the k-NN in the set

Several different algorithms have been discovered to do just that.

OpenSearch is a community-driven, Apache 2.0-licensed, open-source search and analytics suite that makes it easy to ingest, search, visualize, and analyze data. The OpenSearch k-NN plugin provides the ability to use some of these algorithms within an OpenSearch cluster. In this post, we discuss the different algorithms that are supported and run experiments to see some of the trade-offs between them.

Hierarchical Navigable Small Worlds algorithm

The Hierarchical Navigable Small Worlds algorithm (HNSW) is one of the most popular algorithms out there for ANN search. It was the first algorithm that the k-NN plugin supported, using a very efficient implementation from the nmslib similarity search library. It has one of the best query latency vs. recall trade-offs and doesn’t require any training. The core idea of the algorithm is to build a graph with edges connecting index vectors that are close to each other. Then, on search, this graph is partially traversed to find the approximate nearest neighbors to the query vector. To steer the traversal towards the query’s nearest neighbors, the algorithm always visits the closest candidate to the query vector next.

But which vector should the traversal start from? It could just pick a random vector, but for a large index, this might be very far from the query’s actual nearest neighbors, leading to poor results. To pick a vector that is generally close to the query vector to start from, the algorithm builds not just one graph, but a hierarchy of graphs. All vectors are added to the bottom layer, and then a random subset of those are added to the layer above, and then a subset of those are added to the layer above that, and so on.

During search, we start from a random vector in the top layer, partially traverse the graph to find (approximately) the nearest point to the query vector in that layer, and then use this vector as the starting point for our traversal of the layer below. We repeat this until we get to the bottom layer. At the bottom layer, we perform the traversal, but this time, instead of just searching for the nearest neighbor, we keep track of the k-nearest neighbors that are visited along the way.

The following figure illustrates this process (inspired from the image in original paper Efficient and robust approximate nearest neighbor search using Hierarchical Navigable Small World graphs).

You can tune three parameters for HNSW:

  • m – The maximum number of edges a vector will get in a graph. The higher this number is, the more memory the graph will consume, but the better the search approximation may be.
  • ef_search – The size of the queue of the candidate nodes to visit during traversal. When a node is visited, its neighbors are added to the queue to be visited in the future. When this queue is empty, the traversal will end. A larger value will increase search latency, but may provide better search approximation.
  • ef_construction – Very similar to ef_search. When a node is to be inserted into the graph, the algorithm will find its m edges by querying the graph with the new node as the query vector. This parameter controls the candidate queue size for this traversal. A larger value will increase index latency, but may provide a better search approximation.

For more information on HNSW, you can read through the paper Efficient and robust approximate nearest neighbor search using Hierarchical Navigable Small World graphs.

Memory consumption

Although HNSW provides very good approximate nearest neighbor search at low latencies, it can consume a large amount of memory. Each HNSW graph uses roughly 1.1 * (4 * d + 8 * m) * num_vectors bytes of memory:

  • d is the dimension of the vectors
  • m is the algorithm parameter that controls the number of connections each node will have in a layer
  • num_vectors is the number of vectors in the index

To ensure durability and availability, especially when running production workloads, OpenSearch indexes are recommended to have at least one replica shard. Therefore, the memory requirement is multiplied by (1 + number of replicas). For use cases where the data size is 1 billion vectors of 128 dimensions each and m is set to the default value of 16, the estimated amount of memory required would be:

1.1 * (4 * 128 + 8 * 16) * 1,000,000,000 * 2 = 1,408 GB.

If we increase the size of vectors to 512, for example, and the m to 100, which is recommended for vectors with high intrinsic dimensionality, some use cases can require a total memory of approximately 4 TB.

With OpenSearch, we can always horizontally scale the cluster to handle this memory requirement. However, this comes at the expense of raising infrastructure costs. For cases where scaling doesn’t make sense, options to reduce the memory footprint of the k-NN system need to be explored. Fortunately, there are algorithms that we can use to do this.

Inverted File System algorithm

Consider a different approach for approximating a nearest neighbor search: separate your index vectors into a set of buckets, then, to reduce your search time, only search through a subset of these buckets. From a high level, this is what the Inverted File System (IVF) ANN algorithm does. In OpenSearch 1.2, the k-NN plugin introduced support for the implementation of IVF by Faiss. Faiss is an open-sourced library from Meta for efficient similarity search and clustering of dense vectors.

However, if we just randomly split up our vectors into different buckets, and only search a subset of them, this will be a poor approximation. The IVF algorithm uses a more elegant approach. First, before indexing begins, it assigns each bucket a representative vector. When a vector is indexed, it gets added to the bucket that has the closest representative vector. This way, vectors that are closer to each other are placed roughly in the same or nearby buckets.

To determine what the representative vectors for the buckets are, the IVF algorithm requires a training step. In this step, k-Means clustering is run on a set of training data, and the centroids it produces become the representative vectors. The following diagram illustrates this process.

Inverted file system indexing concept

IVF has two parameters:

  • nlist – The number of buckets to create. More buckets will result in longer training times, but may improve the granularity of the search.
  • nprobes – The number of buckets to search. This parameter is fairly straightforward. The more buckets that are searched, the longer the search will take, but the better the approximation.

Memory consumption

In general, IVF requires less memory than HNSW because IVF doesn’t need to store a set of edges for each indexed vector.

We estimate that IVF will roughly require the following amount of memory:

1.1 * (((4 * dimension) * num_vectors) + (4 * nlist * dimension)) bytes

For the case explored for HNSW where there are 1,000,000,000 128-dimensional vectors with one layer of replication, an IVF algorithm with an nlist of 4096 would take roughly 1.1 * (((4 * 128) * 2,000,000,000) + (4 * 4096 * 128)) bytes = 1126 GB.

This savings does come at a cost, however, because HNSW offers a better query latency versus approximation accuracy tradeoff.

Product quantization vector compression

Although you can use HNSW and IVF to speed up nearest neighbor search, they can consume a considerable amount of memory. When we get into the billion-vector scale, we start to require thousands of GBs of memory to support their index structures. As we scale up the number of vectors or the dimension of vectors, this requirement continues to grow. Is there a way to use noticeably less space for our k-NN index?

The answer is yes! In fact, there are a lot of different ways to reduce the amount of memory vectors require. You can change your embedding model to produce smaller vectors, or you can apply techniques like Principle Component Analysis (PCA) to reduce the vector’s dimensionality. Another approach is to use quantization. The general idea of vector quantization is to map a large vector space with continuous values into a smaller space with discrete values. When a vector is mapped into a smaller space, it requires fewer bits to represent. However, this comes at a cost—when mapping to a smaller input space, some information about the vector is lost.

Product quantization (PQ) is a very popular quantization technique in the field of nearest neighbor search. It can be used together with ANN algorithms for nearest neighbor search. Along with IVF, the k-NN plugin added support for Faiss’s PQ implementation in OpenSearch 1.2.

The main idea of PQ is to break up a vector into several sub-vectors and encode the sub-vectors independently with a fixed number of bits. The number of sub-vectors that the original vector is broken up into is controlled by a parameter, m, and the number of bits to encode each sub-vector with is controlled by a parameter, code_size. After encoding finishes, a vector is compressed into roughly m * code_size bits. So, assume we have a set of 100,000 1024-dimensional vectors. With m = 8 and code_size = 8, PQ breaks each vector into 8 128-dimensional sub-vectors and encode each sub-vector with 8 bits.

The values used for encoding are produced during a training step. During training, tables are created with 2code_size entries for each sub-vector partition. Next, k-Means clustering, with a k value of 2code_size, is run on the corresponding partition of sub-vectors from the training data. The centroids produced here are added as the entries to the partition’s table.

After all the tables are created, we encode a vector by replacing each sub-vector with the ID of the closest vector in the partition’s table. In the example where code_size = 8, we only need 8 bits to store an ID because there are 28 elements in the table. So, with dimension = 1024 and m = 8, the total size of one vector (assuming it uses a 32-bit floating point data type) is reduced from 4,096 bytes to roughly 8 bytes!

Product quantization encoding step

When we want to decode a vector, we can reconstruct an approximated version of it by using the stored IDs to retrieve the vectors from each partition’s table. The distance from the query vector to the reconstructed vector can then be computed and used in a nearest neighbor search. (It’s worth noting that, in practice, further optimization techniques like ADC are used to speed up this process for k-NN search).

Product quantization decoding step

Memory consumption

As we mentioned earlier, PQ will encode each vector into roughly m * code_size bits plus some overhead for each vector.

When combining it with IVF, we can estimate the index size as follows:

1.1 * ((((code_size/8) * m + overhead_per_vector) * num_vectors) + (4 * nlist * dimension) + (2 code_size * 4 * dimension) bytes

Using 1 billion vectors, dimension = 128, m = 8, code_size = 8, and nlist = 4096, we get an estimated total memory consumption of 70GB: 1.1 * ((((8 / 8) * 8 + 24) * 1,000,000,000) + (4 * 4096 * 128) + (2^8 * 4 * 128)) * 2 = 70 GB.

Running k-NN with OpenSearch

First make sure you have an OpenSearch cluster up and running. For instructions, refer to Cluster formation. For a more managed solution, you can use Amazon OpenSearch Service.

Before getting into the experiments, let’s go over how to run k-NN workloads in OpenSearch. First, we need to create an index. An index stores a set of documents in a way that they can be easily searched. For k-NN, the index’s mapping tells OpenSearch what algorithms to use and what parameters to use with them. We start by creating an index that uses HNSW as its search algorithm:

PUT my-hnsw-index
{
  "settings": {
    "index": {
      "knn": true,
      "number_of_shards": 10,
      "number_of_replicas" 1,
    }
  },
  "mappings": {
    "properties": {
      "my_vector": {
        "type": "knn_vector",
        "dimension": 4,
        "method": {
          "name": "hnsw",
          "space_type": "l2",
          "engine": "nmslib",
          "parameters": {
            "ef_construction": 128,
            "m": 24
          }
        }
      }
    }
  }
}

In the settings, we need to enable knn so that the index can be searched with the knn query type (more on this later). We also set the number of shards, and the number of replicas each shard will have. An index is made up of a collection of shards. Sharding is how OpenSearch distributes an index across multiple nodes in a cluster. For more information about shards, refer to Sizing Amazon OpenSearch Service domains.

In the mappings, we configure a field called my_vector of type knn_vector to store the vector data. We also pass nmslib as the engine to let OpenSearch know it should use nmslib’s implementation of HNSW. Additionally, we pass l2 as the space_type. The space_type determines the function used to compute the distance between two vectors. l2 refers to the Euclidean distance. OpenSearch also supports cosine similarity and the inner product distance functions.

After the index is created, we can ingest some fake data:

POST _bulk
{ "index": { "_index": "my-hnsw-index", "_id": "1" } }
{ "my_vector": [1.5, 2.5], "price": 12.2 }
{ "index": { "_index": "my-hnsw-index", "_id": "2" } }
{ "my_vector": [2.5, 3.5], "price": 7.1 }
{ "index": { "_index": "my-hnsw-index", "_id": "3" } }
{ "my_vector": [3.5, 4.5], "price": 12.9 }
{ "index": { "_index": "my-hnsw-index", "_id": "4" } }
{ "my_vector": [5.5, 6.5], "price": 1.2 }
{ "index": { "_index": "my-hnsw-index", "_id": "5" } }
{ "my_vector": [4.5, 5.5], "price": 3.7 }
{ "index": { "_index": "my-hnsw-index", "_id": "6" } }
{ "my_vector": [1.5, 5.5, 4.5, 6.4], "price": 10.3 }
{ "index": { "_index": "my-hnsw-index", "_id": "7" } }
{ "my_vector": [2.5, 3.5, 5.6, 6.7], "price": 5.5 }
{ "index": { "_index": "my-hnsw-index", "_id": "8" } }
{ "my_vector": [4.5, 5.5, 6.7, 3.7], "price": 4.4 }
{ "index": { "_index": "my-hnsw-index", "_id": "9" } }
{ "my_vector": [1.5, 5.5, 4.5, 6.4], "price": 8.9 }

After adding some documents to the index, we can search it:

GET my-hnsw-index/_search
{
  "size": 2,
  "query": {
    "knn": {
      "my_vector": {
        "vector": [2, 3, 5, 6],
        "k": 2
      }
    }
  }
}

Creating an index that uses IVF or PQ is a little bit different because these algorithms require training. Before creating the index, we need to create a model using the training API:

POST /_plugins/_knn/models/my_ivfpq_model/_train
{
  "training_index": "train-index",
  "training_field": "train-field",
  "dimension": 128,
  "description": "My model description",
  "method": {
      "name":"ivf",
      "engine":"faiss",
      "parameters":{
        "encoder":{
            "name":"pq",
            "parameters":{
                "code_size": 8,
                "m": 8
            }
        }
      }
  }
}

The training_index and training_field specify where the training data is stored. The only requirement for the training data index is that it has a knn_vector field that has the same dimension as you want your model to have. The method defines the algorithm that should be used for search.

After the training request is submitted, it will run in the background. To check if the training is complete, you can use the GET model API:

GET /_plugins/_knn/models/my_ivfpq_model/filter_path=model_id,state
{
  "model_id" : "my_ivfpq_model",
  "state" : "created"
}

After the model is created, you can create an index that uses this model:

PUT /my-hnsw-index
{
  "settings" : {
    "index.knn": true
    "number_of_shards" : 10,
    "number_of_replicas" : 1,
  },
  "mappings": {
    "properties": {
      "my_vector": {
        "type": "knn_vector",
        "model_id": "my_ivfpq_model"
      }
    }
  }
}

After the index is created, we can add documents to it and search it just like we did for HNSW.

Experiments

Let’s run a few experiments to see how these algorithms perform in practice and what tradeoffs are made. We look at an HNSW versus an IVF index using PQ. For these experiments, we’re interested in search accuracy, query latency, and memory consumption. Because these trade-offs are mainly observed at scale, we use the BIGANN dataset containing 1 billion vectors of 128 dimensions. The dataset also contains 10,000 queries of test data mapping a query to the ground truth closest 100 vectors based on the Euclidean distance.

Specifically, we compute the following search metrics:

  • Latency p99 (ms), Latency p90 (ms), Latency p50 (ms) – Query latency at various quantiles in milliseconds
  • [email protected] – The fraction of the top 10 ground truth neighbors found in the 10 results returned by the plugin
  • Native memory consumption (GB) – The amount of memory used by the plugin during querying

One thing to note is that the BIGANN dataset uses an unsigned integer as the data type. Because the knn_vector field doesn’t support unsigned integers, the data is automatically converted to floats.

To run the experiments, we complete the following steps:

  1. Ingest the dataset into the cluster using the OpenSearch Benchmarks framework (the code can be found on GitHub).
  2. When ingestion is complete, we use the warmup API to prepare the cluster for the search workload.
  3. We run the 10,000 test queries against the cluster 10 times and collect the aggregated results.

The queries return the document ID only, and not the vector, to improve performance (code for this can be found on GitHub).

Parameter selection

One tricky aspect of running experiments is selecting the parameters. There are too many different combinations of parameters to test them all. That being said, we decided to create three configurations for HNSW and IVFPQ:

  • Optimize for search latency and memory
  • Optimize for recall
  • Fall somewhere in the middle

For each optimization strategy, we chose two configurations.

For HNSW, we can tune the m, ef_construction, and ef_search parameters to achieve our desired trade-off:

  • m – Controls the maximum number of edges a node in a graph can have. Because each node has to store all of its edges, increasing this value will increase the memory footprint, but also increase the connectivity of the graph, which will improve recall.
  • ef_construction – Controls the size of the candidate queue for edges when adding a node to the graph. Increasing this value will increase the number of candidates to consider, which will increase the index latency. However, because more candidates will be considered, the quality of the graph will be better, leading to better recall during search.
  • ef_search – Similar to ef_construction, it controls the size of the candidate queue for graph traversal during search. Increasing this value will increase the search latency, but will also improve the recall.

In general, we chose configurations that gradually increased the parameters, as detailed in the following table.

Config ID Optimization Strategy m ef_construction ef_search
hnsw1 Optimize for memory and search latency 8 32 32
hnsw2 Optimize for memory and search latency 16 32 32
hnsw3 Balance between latency, memory, and recall 16 128 128
hnsw4 Balance between latency, memory, and recall 32 256 256
hnsw5 Optimize for recall 32 512 512
hnsw6 Optimize for recall 64 512 512

For IVF, we can tune two parameters:

  • nlist – Controls the granularity of the partitioning. The recommended value for this parameter is a function of the number of vectors in the index. One thing to keep in mind is that there are Faiss indexes that map to Lucene segments. There are several Lucene segments per shard and several shards per OpenSearch index. For our estimates, we assumed that there would be 100 segments per shard and 24 shards, so about 420,000 vectors per Faiss index. With this value, we estimated a good value to be 4096 and kept this constant for the experiments.
  • nprobes – Controls the number of nlist buckets we search. Higher values generally lead to improved recalls at the expense of increased search latencies.

For PQ, we can tune two parameters:

  • mControls the number of partitions to break the vector into. The larger this value is, the better the encoding will approximate the original, at the expense of raising memory consumption.
  • code_sizeControls the number of bits to encode a sub-vector with. The larger this value is, the better the encoding approximates the original, at the expense of raising memory consumption. The max value is 8, so we kept it constant at 8 for all experiments.

The following table summarizes our strategies.

Config ID Optimization Strategy nprobes m (num_sub_vectors)
ivfpq1 Optimize for memory and search latency 8 8
ivfpq2 Optimize for memory and search latency 16 8
ivfpq3 Balance between latency, memory, and recall 32 16
ivfpq4 Balance between latency, memory, and recall 64 32
ivfpq5 Optimize for recall 128 16
ivfpq6 Optimize for recall 128 32

Additionally, we need to figure out how much training data to use for IVFPQ. In general, Faiss recommends between 30,000 and 256,000 training vectors for components involving k-Means training. For our configurations, the maximum k for k-Means is 4096 from the nlist parameter. With this formula, the recommended training set size is between 122,880 and 1,048,576 vectors, so we settled on 1 million vectors. The training data comes from the index vector dataset.

Lastly, for the index configurations, we need to select the shard count. It is recommended to keep the shard size between 10–50 GBs for OpenSearch. Experimentally, we determined that for HNSW, a good number would be 64 shards and for IVFPQ, 42. Both index configurations were configured with one replica.

Cluster configuration

To run these experiments, we used Amazon OpenSearch Service using version 1.3 of OpenSearch to create the clusters. We decided to use the r5 instance family, which provides a good trade-off between memory size and cost.

The number of nodes will depend on the amount of memory that can be used for the algorithm per node and the total amount of memory required by the algorithm. Having more nodes and more memory will generally improve performance, but for these experiments, we want to minimize cost. The amount of memory available per node is computed as memory_available = (node_memory - jvm_size) * circuit_breaker_limit, with the following parameters:

  • node_memory – The total memory of the instance.
  • jvm_size – The OpenSearch JVM heap size. Set to 32 GB.
  • circuit_breaker_limit – The native memory usage threshold for the circuit breaker. Set to 0.5.

Because HNSW and IVFPQ have different memory requirements, we estimate how much memory is needed for each algorithm and determine the required number of nodes accordingly.

For HNSW, with m = 64, the total memory required using the formula from the previous sections is approximately 2,252 GB. Therefore, with r5.12xlarge (384 GB of memory), memory_available is 176 GB and the total number of nodes required is about 12, which we round up to 16 for stability purposes.

Because the IVFPQ algorithm requires less memory, we can use a smaller instance type, the r5.4xlarge instance, which has 128 GB of memory. Therefore, the memory_available for the algorithm is 48 GB. The estimated algorithm memory consumption where m = 64 is a total of 193 GB and the total number of nodes required is four, which we round up to six for stability purposes.

For both clusters, we use c5.2xlarge instance types as dedicated leader nodes. This will provide more stability for the cluster.

According to the AWS Pricing Calculator, for this particular use case, the cost per hour of the HNSW cluster is around $75 an hour, and the IVFPQ cluster costs around $11 an hour. This is important to remember when comparing the results.

Also, keep in mind that these benchmarks can be run using your custom infrastructure, using Amazon Elastic Compute Cloud (Amazon EC2), as long as the instance types and their memory size is equivalent.

Results

The following tables summarize the results from the experiments.

Test ID p50 Query latency (ms) p90 Query latency (ms) p99 Query latency (ms) [email protected] Native memory consumption (GB)
hnsw1 9.1 11 16.9 0.84 1182
hnsw2 11 12.1 17.8 0.93 1305
hnsw3 23.1 27.1 32.2 0.99 1306
hnsw4 54.1 68.3 80.2 0.99 1555
hnsw5 83.4 100.6 114.7 0.99 1555
hnsw6 103.7 131.8 151.7 0.99 2055
Test ID p50 Query latency (ms) p90 Query latency (ms) p99 Query latency (ms) [email protected] Native memory consumption (GB)
ivfpq1 74.9 100.5 106.4 0.17 68
ivfpq2 78.5 104.6 110.2 0.18 68
ivfpq3 87.8 107 122 0.39 83
ivfpq4 117.2 131.1 151.8 0.61 114
ivfpq5 128.3 174.1 195.7 0.40 83
ivfpq6 163 196.5 228.9 0.61 114

As you might expect, given how many more resources it uses, the HNSW cluster has lower query latencies and better recall. However, the IVFPQ indexes use significantly less memory.

For HNSW, increasing the parameters does in fact lead to better recall at the expense of latency. For IVFPQ, increasing m has the most significant impact on improving recall. Increasing nprobes improves the recall marginally, but at the expense of significant increases in latencies.

Conclusion

In this post, we covered different algorithms and techniques used to perform approximate k-NN search at scale (over 1 billion data points) within OpenSearch. As we saw in the previous benchmarks section, there isn’t one algorithm or approach that optimises for all the metrics at once. HNSW, IVF, and PQ each allow you to optimize for different metrics in your k-NN workload. When choosing the k-NN algorithm to use, first understand the requirements of your use case (How accurate does my approximate nearest neighbor search need to be? How fast should it be? What’s my budget?) and then tailor the algorithm configuration to meet them.

You can take a look at the benchmarking code base we used on GitHub. You can also get started with approximate k-NN search today following the instructions in Approximate k-NN search. If you’re looking for a managed solution for your OpenSearch cluster, check out Amazon OpenSearch Service.


About the Authors

Jack Mazanec is a software engineer working on OpenSearch plugins. His primary interests include machine learning and search engines. Outside of work, he enjoys skiing and watching sports.

Othmane Hamzaoui is a Data Scientist working at AWS. He is passionate about solving customer challenges using Machine Learning, with a focus on bridging the gap between research and business to achieve impactful outcomes. In his spare time, he enjoys running and discovering new coffee shops in the beautiful city of Paris.

Fine-grained entitlements in Amazon Redshift: A case study from TrustLogix

Post Syndicated from Srikanth Sallaka original https://aws.amazon.com/blogs/big-data/fine-grained-entitlements-in-amazon-redshift-a-case-study-from-trustlogix/

This post is co-written with Srikanth Sallaka from TrustLogix as the lead author.

TrustLogix is a cloud data access governance platform that monitors data usage to discover patterns, provide insights on least privileged access controls, and manage fine-grained data entitlements across data lake storage solutions like Amazon Simple Storage Service (Amazon S3), data warehouses like Amazon Redshift, and transactional databases like Amazon Relational Database Service (Amazon RDS) and Amazon Aurora.

In this post, we discuss how TrustLogix integrates with Amazon Redshift row-level security (RLS) to help data owners express granular data entitlements in business terms and consistently enforce them.

The challenge: Dynamic data authorization

In this post, we discuss two customer use cases:

  • Data access based on enterprise territory assignments – Sales representatives should only be able to access data in the opportunities dataset for their assigned territories. This customer wants to grant access to the dataset based on a criteria, an attribute of dataset, such as geographic area, industry, and revenue. The criteria is an attribute of the dataset. The challenge is that this access control policy should be applied by Amazon Redshift regardless of the platform from where the data is accessed.
  • Entitlement-based data access – One of TrustLogix’s customers is a fortune 500 financial services firm. They use Amazon Redshift to store and perform analysis on a wide range of datasets, like advertising research, pricing to customers, and equity markets. They share this data with traders, quants, and risk managers. This internal data is also consumed by various users across the firm, but not every user is entitled to see all the data. To track this data and access requests, this firm spent a great deal of resources in building a comprehensive list of permissions that define which business user is entitled to what data. A simple scenario is that this entitlement table contains the customer_id and Book_id values assigned to specific user_id values. Any queries on the trade data table, which is tagged as sensitive data, should enforce this policy. The challenge is that these data entitlements should be enforced centrally in Amazon Redshift regardless of the tool from which they are accessed. Data owners should be able to manage this policy with a simple access control policy management interface and shouldn’t be required to know the internals of Amazon Redshift to implement complex procedures.

User-defined function (UDF) and secure view-based implementation

At present, to define fine-grained access controls in Amazon Redshift, TrustLogix is using custom Amazon Redshift user-defined functions (UDFs) and views to author policies from the TrustLogix policy management console and granting users access to the view.

TrustLogix Policy UDF

This process involves three steps:

  1. Create a user-defined function that returns a Boolean whenever the conditions of the policy match.
  2. Create a view by joining the UDF and base table.
  3. Grant access to the new view to the appropriate users or groups.
  4. Block direct table access to all users.

Native row-level security (RLS) policies in Amazon Redshift

The row-level security (RLS) feature in Amazon Redshift simplifies design and implementation of fine-grained access to the rows in tables. With RLS, you can restrict access to a subset of rows within a table based on the user’s job role or permissions and level of data sensitivity with SQL commands. By combining column-level access control and RLS, you can provide comprehensive protection by enforcing granular access to your data. TrustLogix integrates with this feature to let their customers specify custom SQL queries and dictate what sets of data are accessible by which users.

TrustLogix is now using the RLS feature to address both use cases mentioned earlier. This reduces the complexity of managing additional UDF functions or secure views and additional grants.

“We’re excited about this deeper level of integration with Amazon Redshift. Our joint customers in security-forward and highly regulated sectors including financial services, healthcare, and pharmaceutical need to have incredibly fine-grained control over which users are allowed to access what data, and under which specific contexts. The new row-level security capabilities will allow our customers to precisely dictate data access controls based on their business entitlements while abstracting them away from the technical complexities. The new Amazon Redshift RLS capability will enable our joint customers to model policies at the business level, deploy and enforce them via a security-as-code model, ensuring secure and consistent access to their sensitive data.”

– Ganesh Kirti, Founder & CEO, TrustLogix Inc.

TrustLogix integration with RLS

Let’s look at our two use cases and how to implement TrustLogix integration with RLS.

Data access based on territories

The data owner logs in to the TrustLogix control plane and authors a data access policy using the business-friendly UI.

TrustLogix login page

TrustLogix auto-generates the following Amazon Redshift RLS policy, attaches it to the appropriate table, and turns on the RLS on this table.

Create RLS POLICY OPPORTUNITIES_BY_REGION 
WITH (region VARCHAR(256))
USING (region IN (SELECT region FROM Territories_Mgmt WHERE user_id = current_user));

Then you can use the following grant statement on the table:

Grant Select on table Sales.opportunities to role SalesRepresentative;

After this policy is deployed into the Amazon Redshift data warehouse, any user who queries this table automatically gets only authorized data.

Entitlement-based data access

Similar to the first use case, TrustLogix creates two separate RLS policies, one on the book_id and another with customer_id, attaching both the policies on the trade details table.

Create RLS POLICY entitlement_book_id_rls with ( book_id integer) using (book_id in (select book_id from entitlements);
Create RLS Policy entitlemen_Customer_id_rls with (Customer_id integer)Using (customer_id in (select customer_id from customer_details.customer_id =Customer_id and user_id = current_user ));
Attach RLS POLICY entitlement_book_id_rls on trade_details to Role Trader;
Attach RLS POLICY entitlemen_Customer_id_rls on trade_details to Role Trader;

In this case, Amazon Redshift evaluates both attached policies using the AND operator, with the effect that users with the Trader role get view-only access for only those customers and books that the Trader role is granted.

Additional TrustLogix and Amazon Redshift integration benefits

The following diagram illustrates how TrustLogix integrates with Amazon Redshift.

TrustLogix and RLS diagram

This robust new integration offers many powerful security, productivity, and collaboration benefits to joint Amazon Redshift and TrustLogix customers:

  • A single pane of glass to monitor and manage fine-grained data entitlements across multiple Amazon Redshift data warehouses, AWS data stores including Amazon S3 and Aurora, and other cloud data repositories such as Snowflake and Databricks
  • Monitoring of data access down to the user and tool level to prevent shadow IT, identify overly granted access permissions, discover dark data, and ensure compliance with legislative mandates like GDPR, HIPAA, SOX, and PCI
  • A no-code model that enables security as code, ensures consistency, reduces work, and eliminates errors

Summary

The RLS capability in Amazon Redshift delivers granular controls for restricting data. TrustLogix has delivered an integration that reduces the effort, complexity, and dependency of creating and managing complex user-defined functions to fully take advantage of this capability.

Furthermore, TrustLogix doesn’t need to create additional views, which reduces management of user grants on other derived objects. By using the RLS policies, TrustLogix has simplified creating authorization policies for fine-grained data entitlements in Amazon Redshift. You can now provision both coarse-grained and granular access controls within minutes to enable businesses to deliver faster access to analytics while simultaneously tightening your data access controls.


About the authors

Srikanth Sallaka is Head of Product at TrustLogix. Prior to this he has built multiple SaaS and on-premise Data Security and Identity Management solutions. He has honed his Product Management and technical skills working at large enterprise like Oracle, SAP & multiple startups.

Yanzhu Ji is a Product Manager on the Amazon Redshift team. She worked on the Amazon Redshift team as a Software Engineer before becoming a Product Manager. She has rich experience of how the customer-facing Amazon Redshift features are built from planning to launching, and always treats customers’ requirements as first priority. In her personal life, Yanzhu likes painting, photography, and playing tennis.

Amazon migrates financial reporting to Amazon QuickSight

Post Syndicated from Chitradeep Barman original https://aws.amazon.com/blogs/big-data/amazon-migrates-financial-reporting-to-amazon-quicksight/

This is a guest post by from Chitradeep Barman and Yaniv Ackerman  from Amazon Finance Technology (FinTech).

Amazon Finance Technology (FinTech) is responsible for financial reporting on Earth’s largest transaction dataset, as the central organization supporting accounting and tax operations across Amazon. Amazon FinTech’s accounting, tax, and business finance teams close books and file taxes in different regions.

Amazon FinTech had been using a legacy business intelligence (BI) tool for over 10 years, and with its dataset growing at 20% year over year, it was beginning to face operational and performance challenges.

In 2019, Amazon FinTech decided to migrate its data visualization and BI layer to AWS to improve data analysis capabilities, reduce costs, and improve its use of AWS Cloud–native services, which reduces risk and technical complexity. By the end of 2021, Amazon FinTech had migrated to Amazon QuickSight, which organizations use to understand data by asking questions in natural language, exploring through interactive dashboards, or automatically looking for patterns and outliers powered by machine learning (ML).

In this post, we share the challenges and benefits of this migration.

Improving reporting and BI capabilities on AWS

Amazon FinTech’s customers are in accounting, tax, and business finance teams across Amazon Finance and Global Business Services, AWS, and Amazon subsidiaries. It provides these teams with authoritative data to do financial reporting and close Amazon’s books, as well as file taxes in jurisdictions and countries around the world. Amazon FinTech also provides data and tools for analysis and BI.

“Over time, with data growth, we started facing operational and maintenance challenges with the legacy BI tool, resulting in a multifold increase in engineering overhead,” said Chitradeep Barman, a senior technical program manager with Amazon FinTech who drove the technical implementation of the migration to QuickSight.

To improve security, increase scalability, and reduce costs, Amazon FinTech decided to migrate to QuickSight on AWS. This transition aligned with the organization’s goal to rely on native AWS technology and reduce dependency on other third-party tools.

Amazon FinTech was already using Amazon Redshift, which can analyze exabytes of data and run complex analytical queries. It can run and scale analytics on data in seconds without the need to manage the data warehouse infrastructure for its cloud data warehouse. As an AWS-native data visualization and BI tool, QuickSight seamlessly connects with AWS services, including Amazon Redshift. The migration was sizable: after consolidating existing reports, there were about 2,000 financial reports in the legacy tool that were used by over 2,500 users. The reports pulled data from millions of records.

Innovating while migrating

Amazon FinTech migrated complex reports and simultaneously started multiple training sessions. Additional training modules were built to complement existing QuickSight trainings and calibrated to meet the specific needs of Amazon FinTech’s customers.

Amazon FinTech deals with petabytes of data and had built up a repository of 10,000 reports used by 2,500 employees across Amazon. Collaborating with the QuickSight team, they consolidated their reports to reduce redundancy and focus on what their finance customers needed. Amazon FinTech built 450 canned and over 1,800 ad hoc reports in QuickSight, developing a reusable solution with the QuickSight API. As shown in the following figure, on average per month, Amazon FinTech has over 1,300 unique QuickSight users run almost 2,500 unique QuickSight reports, with more than 4,600 total runs.

Amazon FinTech has been able to scale to meet customer requirements using QuickSight.

“AWS services come along with scalability. The whole point of migrating to AWS is that we do not need to think about scaling our infrastructure, and we can focus on the functional part of it,” says Barman.

QuickSight is cloud based, fully managed, and serverless, meaning you don’t have to build your own infrastructure to handle peak usage. It auto scales across tens of thousands of users who work independently and simultaneously.

As of May 2022, more than 2,500 Amazon Finance employees are using QuickSight for financial and operational reporting and to prepare Amazon’s tax statements.

“The advantage of Amazon QuickSight is that it empowers nontechnical users, including accountants and tax and financial analysts. It gives them more capability to run their reporting and build their own analyses,” says Keith Weiss, principal program manager at Amazon FinTech. According to Weiss, “QuickSight has much richer data visualization than competing BI tools.”

QuickSight is constantly innovating for customers, adding new features, and recently released the AI/ML service Amazon QuickSight Q, which lets users ask questions in natural language and receive accurate answers with relevant visualizations to help gain insights from the underlying data. Barman, Weiss, and the rest of the Amazon FinTech team are excited to implement Q in the near future.

By switching to QuickSight, which uses pay-as-you-go pricing, Amazon FinTech saved 40% without sacrificing the security, governance, and compliance requirements their account needed to comply with internal and external auditors. The AWS pricing structure makes QuickSight much more cost-effective than other BI tools on the market.

Overall, Amazon FinTech saw the following benefits:

  • Performance improvements – Latency of consumer-facing reports was reduced by 30%
  • Cost reduction – FinTech reduced licensing, database, and support costs by over 40%, and with the AWS pay-as-you-go model, it’s much more cost-effective to be on QuickSight
  • Controllership – FinTech reports are global, and controlled accessibility to reporting data is a key aspect to ensure only relevant data is visible to specific teams
  • Improved governance – QuickSight APIs to track and promote changes within different environments reduced manual overhead and improved change trackability

Seamless and reliable

At the end of each month, Amazon FinTech teams must close books in 5 days, and since implementing QuickSight for this purpose, Barman says that “reports have run seamlessly, and there have been no critical situations.”

Amazon FinTech’s account on QuickSight is now the source of truth for Amazon’s financial reporting, including tax filings and preparing financial statements. It enables Amazon’s own finance team to close its books and file taxes at the unparalleled scale at which Amazon operates, with all its complexity. Most importantly, despite initial skepticism, according to Weiss, “Our finance users love it.”

Learn more about Amazon QuickSight and get started diving deeper into your data today!


About the authors

Chitradeep Barman is a Sr. Technical Program Manager at Amazon Finance Technology (FinTech). He led the Amazon wide migration of BI reporting from Oracle BI (OBIEE) to AWS QuickSight. Chitradeep started his career as a data engineer and over time grew as a data architect. Before joining Amazon, he lead the design and implementation to launch the BI analytics and reporting platform for Cisco Capital (a fully owned subsidiary of Cisco Systems).

Yaniv Ackerman is a senior software development manager in Fintech org. He has over 20 years of experience building business critical, scalable and high-performance software. Yaniv’s team build data lakes, analytics and automation solutions for financial usage.

New additions to line charts in Amazon QuickSight

Post Syndicated from Bhupinder Chadha original https://aws.amazon.com/blogs/big-data/new-additions-to-line-charts-in-amazon-quicksight/

Amazon QuickSight is a fully-managed, cloud-native business intelligence (BI) service that makes it easy to create and deliver insights to everyone in your organization or even with your customers and partners. You can make your data come to life with rich interactive charts and create beautiful dashboards to be shared with thousands of users, either directly within the QuickSight application, or embedded in web apps and portals.

Line charts are ubiquitous to the world of data visualization and are used to visualize change in data over a dimension. They are a great way to analyze trends and patterns where data points are connected with a straight line to visualize the overall trend. In this post, we look at some of the new improvements to our line charts:

  • Support for missing data points for line and area charts
  • Improved performance and increased data limit to 10,000 data points

Missing data points

Line charts in QuickSight expect you to have data for each X axis item. If data is missing for any X axis item, it can lead to broken lines (default behavior) because there is no line drawn connecting the missing data points.

Drawing lines with points of missing data could be misleading because it would represent incorrect data, and there are valid use cases to do so. For example, imagine a scenario of a retail sales report for a given time period where data is recorded during days of operation (Monday through Saturday). In such cases, instead of displaying a broken line chart that skips Sunday, you may want to show a continuous trend by directly connecting Saturday to Monday, hiding the fact that Sunday isn’t operational. Alternatively, you may want to view store traffic for Sunday as 0 instead of displaying a broken line.

Previously, line charts only supported treating missing data for date/time fields. Now, we have added support for categorical data for both line and area charts. The following are the different line treatment options:

  1. Continuous line – Display continuous lines by directly connecting the line to the next available data point in the series
  2. Show as zero – Interpolate the missing values with zero and display a continuous line
  3. Broken line – Retain the default experience to display disjointed lines over missing values

The following diagram illustrates a line chart using each option.

This new feature applies for both categorical and time series data on area charts as well, as shown in the following graphs.

Authors can also configure different data treatments for the left and right Y axis for dual axis charts, as shown in the following example.

Increased data limit for line charts

With the recent update, we have improved line chart performance to support a maximum of 10,000 data points instead of the previous 2,500 data point limit. This also increases the limit for more line series created by the Color by field, which is also bound by the total data limit. For example, if the line chart has 1,000 data points for each series, you could display up to 10 unique colored series.

This update enables use cases where authors want to show a higher number of data points, such as hourly trends or daily trends for a year (365 data points) for multiple groups. This update doesn’t change the default limits of the Color by field (25) and X axis data point limit (100) that exist today to be compatible with existing dashboards and analysis, until authors choose to customize the limits.

Summary

In this post, we looked at how to treat missing data for line charts, where instead of viewing broken lines, you can display continuous lines. This helps you customize how you want to visualize overall trends and variations depending on the business context. Additionally, we looked at the new data handling limit for line charts, which supports 10,000 data points—four times more data than before. To learn more refer customizing missing data control.

Try out the new feature and share your feedback and questions in the comments section.


About the author

Bhupinder Chadha is a senior product manager for Amazon QuickSight focused on visualization and front end experiences. He is passionate about BI, data visualization and low-code/no-code experiences. Prior to QuickSight he was the lead product manager for Inforiver, responsible for building a enterprise BI product from ground up. Bhupinder started his career in presales, followed by a small gig in consulting and then PM for xViz, an add on visualization product.

Integrate AWS IAM Identity Center (successor to AWS Single Sign-On) with AWS Lake Formation fine-grained access controls

Post Syndicated from Benon Boyadjian original https://aws.amazon.com/blogs/big-data/integrate-aws-iam-identity-center-successor-to-aws-single-sign-on-with-aws-lake-formation-fine-grained-access-controls/

Data lakes are a centralized repository for storing structured and unstructured data at scale. Data lakes enable you to create dashboards, perform big data processing and real-time analytics, and create machine learning (ML) models on your data to drive business decisions.

Many customers are choosing AWS Lake Formation as their data lake management solution. Lake Formation is an integrated data lake service that makes it simple for you to ingest, clean, catalog, transform, and secure your data and make it available for analysis and ML.

However, some companies require account authentication and authorization to be managed through AWS IAM Identity Center (successor to AWS Single Sign-On), which doesn’t have a built-in integration with Lake Formation.

Integrating Lake Formation with IAM Identity Center can help you manage data access at the organization level, consolidating AWS account and data lake authentication and authorization.

In this post, we walk through the steps to integrate IAM Identity Center with Lake Formation.

Solution overview

In this post, we configure IAM Identity Center with permission sets for your data lake personas. These are the permissions that allow your data lake users to access Lake Formation. When the permission sets are assigned to your data lake account, IAM Identity Center creates Identity and Access Management (IAM) roles in that account. The IAM roles are prefixed with AWSReservedSSO_<Permission Set Name>.

In Lake Formation, you can grant data resource permissions to IAM users and roles. To integrate with IAM Identity Center, you will grant data resource access to the IAM roles created by IAM Identity Center.

Now, when users access the data lake account through the IAM Identity Center portal, they assume an IAM role that has access to Lake Formation resources.

The following diagram illustrates this solution architecture.

To implement the solution, complete the following high-level steps:

  1. Create a permission set within IAM Identity Center
  2. Grant Users or Groups access to the data lake account in IAM Identity Center
  3. Assign an IAM Identity Center role as a Data Lake Administrator
  4. Grant IAM Identity Center generated IAM role data lake permissions in Lake Formation
  5. Grant IAM Identity Center generated IAM role data location permissions in Lake Formation

Prerequisites

For this walkthrough, you should have the following prerequisites: 

Create a permission set with IAM Identity Center

To create your permission set, complete the following steps:

  1. Sign into the AWS Management Console with your management account and go to the Region where IAM Identity Center is configured.
  2. On the IAM Identity Center Console, choose Permissions sets in the navigation pane.
  3. Choose Create permission set.

  4. Select Custom permission set, then choose Next.

  5. Next, you must specify policies. The first permission set you create should have data lake admin privileges.
    AWS recommends granting data lake admins the following AWS managed policies: AWSGlueConsoleFullAccess, AWSLakeFormationCrossAccountManager, AWSLakeFormationDataAdmin, AmazonAthenaFullAccess, and CloudWatchLogsReadOnlyAccess. However, if these permissions are too permissive or not permissive enough, you may prefer using customer managed policies.
  6. Choose Next
  7. Specify permission set details, then choose Next.

  8. Review your settings, then choose Create.

Repeat the steps to create a data analyst role to grant Lake Formation access. For this post, we created the role LakeFormationDataAnalyst with the policy AmazonAthenaFullAccess.

Grant users or groups access to the data lake account in IAM Identity Center

To grant access to users and groups, complete the following steps:

  1. On the IAM Identity Center console, chose AWS accounts in the navigation pane.
  2. Choose Assign users or groups.

  3. Select the user or group you want to assign the data lake account permissions to (DataLakeAdmin).
  4. Choose Next.

  5. Select the permission you created earlier.
  6. Choose Next.

  7. Review your settings, then choose Submit.

Verify your IAM Identity Center permissions have been successfully granted by visiting your IAM Identity Center Portal, choosing the data lake admin, and signing in to the console.

Assign an IAM Identity Center role as a data lake administrator

The following steps set up a data lake administrator with the IAM role created by IAM Identity Center. Administrators have full access to the Lake Formation console, and control the initial data configuration and access permissions. For all users and groups that don’t need to be data lake administrators, skip to the next series of steps.

  1. Sign in to the console as the data lake account with admin access.
  2. Open the Lake Formation console.A pop-up window appears, prompting you to define your administrators.
  3. Select Add other AWS users or roles.
  4. Choose the permission set you created earlier (starting with AWSReservedSSO_DataLakeAdmin).
  5. Choose Get started.
  6. On the Administrative roles and tasks page, under Database creators, choose Grant.
  7. Choose your data lake admin role.
  8. Select Create database under Catalog permissions and Grantable permissions.
  9. Choose Grant.

You now have an IAM Identity Center-generated IAM principal that is assigned as the data lake administrator and database creator.

Grant the IAM Identity Center role data lake permissions in Lake Formation

You now manage data lake permissions. For more information, refer to Managing Lake Formation permissions. 

Whether you’re managing permissions with LF-tags or named resources, the steps for granting access remain the same

  1. On the Lake Formation console, under Permissions in the navigation pane, choose Data lake permissions.
  2. Choose Grant.
  3. Select IAM users and roles.
  4. Choose the AWSReservedSSO_LakeFormationDataAnalyst role.
  5. Grant access to database and table permissions as applicable, then choose Grant.

You now have an IAM Identity Center-generated IAM principal data permissions.

Grant the IAM Identity Center role data location permissions in Lake Formation

When granting access to data locations, the process remains the same.

  1. On the Lake Formation console, under Permissions in the navigation pane, choose Data locations.
  2. Choose Grant.
  3. Choose the AWSReservedSSO_LakeFormationDataAnalyst role.
  4. Complete the remaining fields and choose Grant.

You now have an IAM Identity Center-generated IAM principal with Data location access.

Validate data access

We now validate data access for the IAM Identity Center principal.

  1. Sign in to the console through IAM Identity Center as the principal you granted access to. For this post, we’re logging in as the LakeFormationDataAnalyst role.

    To test data access, we run some queries in Amazon Athena.
  2. On the Athena console, choose Query editor.
  3. On the Settings tab, confirm that a query result location is set up.
  4. If you don’t have a query result location, choose Manage and configure your query result location and encryption.
  5. In the Athena query editor, on the Editor tab, choose the database that you granted access to.If the principal doesn’t have access to the Lake Formation table and data location, you won’t be able to view data in Athena.
  6. Choose the menu icon next to your table and choose Generate table DDL.

Confirm that the data appears on the Query results tab.

Conclusion

In this post, we demonstrated how to integrate IAM Identity Center with Lake Formation permissions. You can now grant IAM Identity Center identities administrator, database creation, database and table, and data location access in Lake Formation. Managing data lake permissions through IAM Identity Center allows you to control data access from your management account, helping to improve your scalability and security.

If you’re wondering how to adapt this solution to Tag-based access control, read Easily manage your data lake at scale using AWS Lake Formation Tag-based access control and apply the techniques you learned from this blog.


About the authors

Benon Boyadjian is a Private Equity Solutions Architect at AWS. He is passionate about helping customers understand the impact AWS can have on their businesses and guiding their AWS implementations. In his free time, he enjoys swimming, snowboarding, and playing with his cat Dirt.

Janakiraman Shanmugam is a Senior Data Architect at Amazon Web Services . He has a focus in Data & Analytics and enjoys helping customers to solve Big data & machine learning problems. Outside of the office, he loves to be with his friends and family and spend time outdoors.

Implement a highly available key distribution center for Amazon EMR

Post Syndicated from Lorenzo Ripani original https://aws.amazon.com/blogs/big-data/implement-a-highly-available-key-distribution-center-for-amazon-emr/

High availability (HA) is the property of a system or service to operate continuously without failing for a designated period of time. Implementing HA properties over a system allows you to eliminate single points of failure that usually translate to service disruptions, which can then lead to a business loss or the inability to use a service.

The core idea behind fault tolerance and high availability is very straightforward in terms of definition. You usually use multiple machines to give you redundancy for a specific service. This guarantees that if a host goes down, other machines are able to take over the traffic. Although this might be easy to say, it’s difficult to obtain such a property, especially when working with distributed technologies.

When focusing on Hadoop technologies, the concept of availability multiplies in different layers depending on the frameworks we’re using. To achieve a fault-tolerant system, we need to consider the following layers:

  • Data layer
  • Processing layer
  • Authentication layer

The first two layers are typically handled using native capabilities of the Hadoop framework (such as HDFS High Availability or ResourceManager High Availability) or with the help of features available in the specific framework used (for example, HBase table replication to achieve highly available reads).

The authentication layer is typically managed through the utilization of the Kerberos protocol. Although multiple implementations of Kerberos exist, Amazon EMR uses a free implementation of the Kerberos protocol, which is directly provided by the Massachusetts Institute of Technology (MIT), also referred to as MIT Kerberos.

When looking at the native setup for a key distribution center (KDC), we can see that the tool comes with a typical primary/secondary configuration, where you can configure a primary KDC with one or more additional replicas to provide some features of a highly available system.

However, this configuration doesn’t provide an automatic failover mechanism to elect a new primary KDC in the event of a system interruption. As a result, the failover has to be performed manually or by implementing an automated process, which can be complex to set up.

With AWS native services, we can improve the MIT KDC capabilities to increase the resilience to failures of our system.

Highly available MIT KDC

Amazon EMR provides different architecture options to enable Kerberos authentication, where each of them tries to solve a specific need or use case. Kerberos authentication can be enabled by defining an Amazon EMR security configuration, which is a set of information stored within Amazon EMR itself. This enables you to reuse this configuration across multiple clusters.

When creating an Amazon EMR security configuration, you’re asked to choose between a cluster-dedicated KDC or an external KDC, so it’s important to understand the benefits and limits of each solution.

When you enable the cluster-dedicated KDC, Amazon EMR configures and installs an MIT KDC on the EMR primary node of the cluster that you’re launching. In contrast, when you use an external KDC, the cluster launched relies on a KDC external to the cluster. In this case, the KDC can be a cluster-dedicated KDC of a different EMR cluster that you reference as an external KDC, or a KDC installed on an Amazon Elastic Compute Cloud (Amazon EC2) instance or a container that you own.

The cluster-dedicated KDC is an easy configuration option that delegates the installation and configuration of the KDC service to the cluster itself. This option doesn’t require significant knowledge of the Kerberos system and might be a good option for a test environment. Additionally, having a dedicated KDC in a cluster enables you to segregate the Kerberos realm, thereby providing a dedicated authentication system that can be used only to authenticate a specific team or department in your organization.

However, because the KDC is located on the EMR primary node, you have to consider that if you delete the cluster, the KDC will be deleted as well. Considering the case in which the KDC is shared with other EMR clusters (defined as external KDC in their security configuration), the authentication layer for those will be compromised and as a result all Kerberos enabled frameworks will break. This might be acceptable in test environments, but it’s not recommended for a production one.

Because the KDC lifetime isn’t always bound to a specific EMR cluster, it’s common to use an external KDC located on an EC2 instance or Docker container. This pattern comes with some benefits:

  • You can persist end-user credentials in the Kerberos KDC rather than using an Active Directory (although you can also enable a cross-realm trust)
  • You can enable communication across multiple EMR clusters, so that all the cluster principals join the same Kerberos realm, thereby enabling a common authentication system for all the clusters
  • You can remove the dependency of the EMR primary node, because deleting it will result in an impairment for other systems to authenticate
  • If you require a multi-master EMR cluster, then an external KDC is required

That being said, installing an MIT KDC on a single instance doesn’t address our HA requirements, which typically are crucial in a production environment. In the following section, we discuss how we can implement a highly available MIT KDC using AWS services to improve the resiliency of our authentication system.

Architecture overview

The architecture presented in the following diagrams describes a highly available setup across multiple Availability Zones for our MIT Kerberos KDC that uses AWS services. We propose two versions of the architecture: one based on an Amazon Elastic File System (Amazon EFS) file system, and another based on an Amazon FSx for NetApp ONTAP file system.

Both services can be mounted on EC2 instances and used as local paths. Although Amazon EFS is cheaper compared to Amazon FSx for NetApp ONTAP, the latter provides better performance thanks to the sub-millisecond operation latency it provides.

We performed multiple tests to benchmark the solutions involving the different file systems. The following graph shows the results with Amazon EMR 5.36, in which we measured the time in seconds taken by the cluster to be fully up and running when selecting Hadoop and Spark as frameworks.

Looking at the test results, we can see that the Amazon EFS file system is suitable to handle small clusters (fewer than 100 nodes), because the performance degradation introduced by the latency of lock operations on the NFS protocol increases the delay in launching clusters as we add more nodes in our cluster topology. For example, for clusters with 200 nodes, the delay introduced by the Amazon EFS file system is such that some instances can’t join the cluster in time. As a result, those instances are deleted and then replaced, making the entire cluster provisioning slower. This is the reason why we decided not to publish any metric for Amazon EFS for 200 cluster nodes on the preceding graph.

On the other side, Amazon FSx for NetApp ONTAP is able to better handle the increasing number of principals created during the cluster provisioning with reduced performance degradation compared to Amazon EFS.

Even with the solution involving Amazon FSx for NetApp ONTAP, for clusters with a higher number of instances it’s still possible to encounter the behavior described earlier for Amazon EFS. Therefore, for big cluster configurations, this solution should be carefully tested and evaluated.

Amazon EFS based solution

The following diagram illustrates the architecture of our Amazon EFS based solution.

The infrastructure relies on different components to improve the fault tolerance of the KDC. The architecture uses the following services:

  • A Network Load Balancer configured to serve Kerberos service ports (port 88 for authentication and port 749 for admin tasks like principals creation and deletion). The purpose of this component is to balance requests across multiple KDC instances located in separate Availability Zones. In addition, it provides a redirection mechanism in case of failures while connecting to an impaired KDC instance.
  • An EC2 Auto Scaling group that helps you maintain KDC availability and allows you to automatically add or remove EC2 instances according to conditions you define. For the purpose of this scenario, we define a minimum number of KDC instances equal to two.
  • The Amazon EFS file system provides a persistent and reliable storage layer for our KDC database. The service comes with built-in HA properties, so we can take advantage of its native features to obtain a persistent and reliable file system.
  • We use AWS Secrets Manager to store and retrieve Kerberos configurations, in specific the password used for the Kadmin service, the Kerberos domain and realm managed by the KDC. With Secrets Manager, we avoid inputting any sensitive information as script parameters or passwords while launching KDC instances.

With this configuration, we eliminate the downsides resulting from a single instance installation:

  • The KDC isn’t a single point of failure anymore because failed connections are redirected to healthy KDC hosts
  • The lack of Kerberos traffic against the EMR primary node for the authentication will improve the health of our primary node, which might be critical for large Hadoop installations (hundreds of nodes)
  • We can recover in case of failures, allowing survived instances to fulfill both admin and authentication operations

Amazon FSx for NetApp ONTAP based solution

The following diagram illustrates the solution using Amazon FSx for NetApp ONTAP.

This infrastructure is almost identical compared to the previous one and provides the same benefits. The only difference is the utilization of a Multi-AZ Amazon FSx for NetApp ONTAP file system as a persistent and reliable storage layer for our KDC database. Even in this case, the service comes with built-in HA properties so we can take advantage of its native features to obtain a persistent and reliable file system.

Solution resources

We provide an AWS CloudFormation template in this post as a general guide. You should review and customize it as needed. You should also be aware that some of the resources deployed by this stack incur costs when they remain in use.

The CloudFormation template contains several nested templates. Together, they create the following:

  • An Amazon VPC with two public and two private subnets where the KDC instances can be deployed
  • An internet gateway attached to the public subnets and a NAT gateway for the private subnets
  • An Amazon Simple Storage Service (Amazon S3) gateway endpoint and a Secrets Manager interface endpoint in each subnet

After the VPC resources are deployed, the KDC nested template is launched and provisions the following components:

  • Two target groups, each connected to a listener for the specific KDC port to monitor (88 for Kerberos authentication and 749 for Kerberos administration).
  • One Network Load Balancer to balance requests across the KDC instances created in different Availability Zones.
  • Depending on the chosen file system, an Amazon EFS or Amazon FSx for NetApp ONTAP file system is created across multiple Availability Zones.
  • Configuration and auto scaling to provision the KDC instances. In specific, the KDC instances are configured to mount the selected file system on a local folder that is used to store the principals database of the KDC.

At the end of the second template, the EMR cluster is launched with an external KDC set up and, if chosen, a multi-master configuration.

Launch the CloudFormation stack

To launch your stack and provision your resources, complete the following steps:

  1. Choose Launch Stack:
    BDB-2063-launch-cloudformation-stack
    This automatically launches AWS CloudFormation in your AWS account with a template. It prompts you to sign in as needed. You can view the template on the AWS CloudFormation console as required. Make sure that you create the stack in your intended Region. The CloudFormation stack requires a few parameters, as shown in the following screenshot.


    The following tables describe the parameters required in each section of the stack.
  2. In the Core section, provide the following parameters:

    Parameter Value (Default) Description
    Project aws-external-kdc The name of the project for which the environment is deployed. This is used to create AWS tags associated to each resource created in the stack.
    Artifacts Repository aws-blogs-artifacts-public/artifacts/BDB-1689 The Amazon S3 location hosting templates and script required to launch this stack.
  3. In the Networking section, provide the following parameters:

    Parameter Value (Default) Description
    VPC Network 10.0.0.0/16 Network range for the VPC (for example, 10.0.0.0/16).
    Public Subnet One 10.0.10.0/24 Network range for the first public subnet (for example, 10.0.10.0/24).
    Public Subnet Two 10.0.11.0/24 Network range for the second public subnet (for example, 10.0.11.0/24).
    Private Subnet One 10.0.1.0/24 Network range for the private subnet (for example, 10.0.1.0/24).
    Private Subnet Two 10.0.2.0/24 Network range for the private subnet (for example, 10.0.2.0/24).
    Availability Zone One (user selected) The Availability Zone chosen to host the first private and public subnets. This should differ from the value used for the Availability Zone Two parameter.
    Availability Zone Two (user selected) The Availability Zone chosen to host the second private and public subnets. This should differ from the value used for the Availability Zone One parameter.
  4. In the KDC section, provide the following parameters:

    Parameter Value (Default) Description
    Storage Service Amazon EFS Specify the KDC shared file system: Amazon EFS or Amazon FSx for NetApp ONTAP.
    Amazon Linux 2 AMI /aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2 AWS Systems Manager parameter alias to retrieve the latest Amazon Linux 2 AMI.
    Instance Count 2 Number of KDC instances launched.
    Instance Type c5.large KDC instance type.
    KDC Realm HADOOP.LAN The Kerberos realm managed by the external KDC servers.
    KAdmin Password Password123 The password to perform admin operations on the KDC.
    Kerberos Secret Name aws-external-kdc/kerberos.config Secrets Manager secret name used to store Kerberos configurations.
  5. In the EMR section, provide the following parameters:

    Parameter Value (Default) Description
    Multi Master Disabled When enabled, the cluster is launched with three primaries configured with Hadoop HA.
    Release Version emr-5.36.0 Amazon EMR release version.
    (Workers) Instance Type m5.xlarge The EC2 instance type used to provision the cluster.
    (Workers) Node Count 1 The number of Amazon EMR CORE nodes provisioned while launching the cluster.
    SSH Key Name (user selected) A valid SSH PEM key that will be attached to the cluster and KDC instances to provide SSH remote access.
  6. Choose Next.
  7. Add additional AWS tags if required (the solution already uses some predefined AWS tags).
  8. Choose Next.
  9. Acknowledge the final requirements.
  10. Choose Create stack.

Make sure to select different Availability Zones in the Network selection of the template (Availability Zone One and Availability Zone Two). This prevents failures in the event of an impairment for an entire Availability Zone.

Test the infrastructure

After you’ve provisioned the whole infrastructure, it’s time to test and validate our HA setup.

In this test, we simulate an impairment on a KDC instance. As a result, we’ll see how we’re able to keep using remaining healthy KDCs, and we’ll see how the infrastructure self-recovers by adding an additional KDC as a substitution for the failed one.

We performed our tests by launching the CloudFormation stack and specifying two KDC instances and using Amazon EFS as the storage layer for the KDC database. The EMR cluster is launched with 11 CORE nodes.

After we deploy the whole infrastructure, we can connect to the EMR primary node using an SSH connection to perform our tests.

When inside our primary node instance, we can proceed with our test setup.

  1. First, we create 10 principals inside the KDC database. To do so, create a bash script named create_users.sh with the following content:
    #!/bin/bash
    realm="HADOOP.LAN"
    password="Password123"
    num_users=10
    
    for (( i=1; i<=$num_users; i++ )); do
      echo "Creating principal [email protected]$realm"
      echo -e "$password\n$password\n$password" | kadmin -p kadmin/[email protected]$realm addprinc "[email protected]$realm" > /dev/null 2>&1
    done

  2. Run the script using the following command:
    sh create_users.sh

  3. We can now verify those 10 principals have been correctly created inside the KDC database. To do so, create another script called list_users.sh and run it as the previous one:
    #!/bin/bash
    realm="HADOOP.LAN"
    password="Password123"
    
    echo -e "$password\n$password\n$password" | kadmin -p kadmin/[email protected]$realm listprincs

    The output of the script shows the principals created by the cluster nodes when they’re provisioned, along with our test users just created.

    We now run in parallel multiple kinit requests and while doing so, we stop the krb5kdc process on one of the two available KDC instances.

    The test is performed through Spark to achieve high parallelization on the kinit requests.

  4. First, create the following script and call it user_kinit.sh:
    #!/bin/sh
    realm="HADOOP.LAN"
    password="Password123"
    num_users="10"
    
    for (( i=1; i<=$num_users; i++ )); do
      echo -e "$password" | kinit [email protected]$realm > /dev/null 2>&1
      echo $?
    done

  5. Open a spark-shell and use the --files parameter to distribute the preceding bash script to all the Spark executors. In addition, we disable the Spark dynamic allocation and launch our application with 10 executors, each using 4 vCores.
    spark-shell --files user_kinit.sh --num-executors 10 --conf spark.dynamicAllocation.enabled=false --conf spark.executor.cores=4

  6. We can now run the following Scala statements to initiate our distributed test:
    val tasks = spark.sparkContext.parallelize(1 to 1600, 1600)
    val scriptPath = "./user_kinit.sh"
    val pipeRDD = tasks.pipe(scriptPath)
    pipeRDD.map(_.toInt).sum

    This Spark application creates 1,600 tasks, and each task performs 10 kinit requests. These tasks are run in parallel in batches of 40 Spark tasks at a time. The final output of our command returns the number of failed kinit requests.

  7. We should now connect on the two available KDCs instances. We can connect without SSH keys by using AWS Systems Manager Session Manager because our template doesn’t provide any SSH key to the KDC instances for additional security. To connect on the KDC instances from the Amazon EC2 console using AWS Systems Manager, see Starting a session (Amazon EC2 console).
  8. On the first KDC, run the following commands to show incoming kinit authentication requests:
    sudo -s
    tail -f /var/log/kerberos/krb5kdc.log

    The following screenshot shows a sample output.

  9. On the second KDC, simulate a failure by running the following commands:

    sudo -s
    killall krb5kdc

  10. We can now connect to the Amazon EC2 console and open the KDC related target group to confirm that the instance became unhealthy (after the three consecutive health checks failed), and was then deleted and replaced by a new one.
    The target group performed the following specific steps during an impairment in one of the services:

    • The KDC instance enters the unhealthy state
    • The unhealthy KDC instance is de-registered from the target group (draining process)
    • A new KDC instance is launched
    • The new KDC is registered to the target group so that it can start receiving traffic from the load balancer

    You might expect to see output similar to the following screenshot while causing an impairment in one of your KDCs.

  11. If we now connect on the replaced KDC instance, we can see the traffic starting to appear in the krbr5kdc logs.

At the end of the tests, we have a total number of failed Kerberos authentications.

As we can see from the output result, we didn’t get any failure during this test. However, when repeating the test multiple times, you might still expect to see few errors (one or two on average) that might occur due to the krbr5kdc process stopping while some requests are still authenticating.

Note the kinit tool itself doesn’t have any retry mechanism. Both the Hadoop services running on the cluster and the creation of Kerberos principals during EMR instance provisioning are configured to retry if KDC calls fails.

If you want to automate these tests, you might also consider using AWS Fault Injection Simulator, a fully managed service for running fault injection experiments on AWS that makes it easier to improve an application’s performance, observability, and resiliency.

Clean up

To clean up all the resources:

  1. Delete the root stack in AWS CloudFormation.
  2. After a while from the deletion startup, you should see a failure.
  3. Click on the VPC nested CloudFormation stack, choose Resources.You should see a single DELETE_FAILED entry for the VPC resource. This is due to EMR automatically creating the Default Security Groups and those are preventing the VPC to be deleted by CloudFormation.
  4. Move to the VPC section of the AWS console and delete that VPC manually.
  5. After that, move back to Cloudformation, select again the root stack and choose Delete. This time the deletion should complete.

File system backups

Both Amazon EFS and Amazon FSx for NetApp ONTAP are natively integrated with AWS Backup.

AWS Backup helps you automate and centrally manage your backups. After you create policy-driven plans, you can monitor the status of ongoing backups, verify compliance, and find and restore backups, all from a central console.

To get more information, refer to Using AWS Backup to back up and restore Amazon EFS file systems and Using AWS Backup with Amazon FSx.

Additional considerations

In this section, we share some additional considerations when using this solution.

Shared file system latency impacts

The utilization of a shared file system implies a degradation of the performance. In particular, the more Kerberos principals that have to be created at the same time, the more we can see a latency on the overall principals creation process and also on the cluster startup time.

This performance degradation is proportional to the number of parallel KDC requests made at the same time. For example, consider the scenario in which we have to launch 10 clusters, each with 20 nodes connected to the same KDC. If we launch all 10 clusters at the same time, we can potentially have 10×20 = 200 parallel connections to the KDC during the initial instance provisioning for the creation of the frameworks related Kerberos principals. In addition, because the duration of Kerberos tickets for services is 10 hours by default, and because all the cluster services are launched more or less at the same time, we could also have the same level of parallelism for service tickets renewal. If, instead, we launch these 10 clusters with a time gap between them, we’ll have potentially 20 parallel connections and as a result the latency introduced by the shared file system isn’t very impactful.

As discussed earlier in this post, multiple clusters can share the same KDC in case they need to communicate between each other without having to set up a cross-realm trust between the related KDCs. Before attaching multiple clusters to the same KDC, you should evaluate if there is a real need for that, because you might also consider segregating Kerberos realms on different KDC instances to obtain better performance and reduce the blast radius in case of issues.

Single-AZ high availability consideration

Although the solutions presented in this post might serve the purpose to provide a highly available MIT KDC across multiple Availability Zones, you might be only interested in providing an HA setup in a single Availability Zone. In this case, for better performance, you might also consider using Amazon FSx for Lustre, or attaching an IO2 EBS disk to multiple KDC instances in the same Availability Zone. In both cases, you might still use the same KDC script used in this post by just modifying the mount command to attach the shared file system on the KDC instances.

If you want to use an IO2 EBS volume as your shared file system, you have to set up a clustered file system to ensure data resiliency and reliability of our KDC database, because standard file systems such as XFS or EXT4 aren’t designed for such use cases. For example, you can use a GFS2 file system to access the KDC database simultaneously across KDC instances. For more details on how to set up a GFS2 file system on EC2 instances, refer to Clustered storage simplified: GFS2 on Amazon EBS Multi-Attach enabled volumes.

Summary

High availability and fault tolerance are key requirements for EMR clusters that can’t tolerate downtime. Analytics workloads run within those clusters can deal with sensitive data, therefore operating in a secured environment is also essential. As a result, we need a secure, highly available, and fault-tolerant setup.

In this post, we showed one possible way of achieving high availability and fault tolerance for the authentication layer of our big data workloads in Amazon EMR. We demonstrated how, by using AWS native services, multiple Kerberos KDCs can operate in parallel and be automatically replaced in case of failures. This, in combination with the framework-specific high availability and fault tolerance capabilities, allows us to operate in a secure, highly available, and fault-tolerant environment.


About the authors

Lorenzo Ripani is a Big Data Solution Architect at AWS. He is passionate about distributed systems, open source technologies and security. He spends most of his time working with customers around the world to design, evaluate and optimize scalable and secure data pipelines with Amazon EMR.

Stefano Sandona is an Analytics Specialist Solution Architect with AWS. He loves data, distributed systems and security. He helps customers around the world architecting their data platforms. He has a strong focus on Amazon EMR and all the security aspects around it.

Create single output files for recipe jobs using AWS Glue DataBrew

Post Syndicated from Dhiraj Thakur original https://aws.amazon.com/blogs/big-data/create-single-output-files-for-recipe-jobs-using-aws-glue-databrew/

AWS Glue DataBrew offers over 350 pre-built transformations to automate data preparation tasks (such as filtering anomalies, standardizing formats, and correcting invalid values) that would otherwise require days or weeks writing hand-coded transformations.

You can now choose single or multiple output files instead of autogenerated files for your DataBrew recipe jobs. You can generate a single output file when the output is small or downstream systems need to consume it more easily, such as visualization tools. Alternatively, you can specify your desired number of output files when configuring a recipe job. This gives you the flexibility to manage recipe job output for visualization, data analysis, and reporting, while helping prevent you from generating too many files. In some cases, you may also want to customize the output file partitions for efficient storage and transfer.

In this post, we walk you through how to connect and transform data from an Amazon Simple Storage Service (Amazon S3) data lake and configure the output as a single file via the DataBrew console.

Solution overview

The following diagram illustrates our solution architecture.

DataBrew queries sales order data from the S3 data lake and performs data transformation. Then the DataBrew job writes the final output back to the data lake in a single file.

To implement the solution, you complete the following high-level steps:

  1. Create a dataset.
  2. Create a DataBrew project using the dataset.
  3. Build a transformation recipe.
  4. Create and run a DataBrew recipe job on the full data.

Prerequisites

To complete this solution, you should have an AWS account and the appropriate permissions to create the resources required as part of the solution.

You also need a dataset in Amazon S3. For our use case, we use a mock dataset. You can download the data files from GitHub. On the Amazon S3 console, upload all three CSV files to an S3 bucket.

Create a dataset

To create your dataset in DataBrew, complete the following steps:

  1. On the Datasets page of the DataBrew console, choose Connect new dataset.
  2. For Dataset name, enter a name (for example, order).
  3. Enter the S3 bucket path where you uploaded the data files as part of the prerequisite steps.
  4. Choose Select the entire folder.
  5. For File type¸ select CSV and choose Comma (,) for CSV delimiter.
  6. For Column header values, select Treat first row as header.
  7. Choose Create dataset.

Create a DataBrew project using the dataset

To create your DataBrew project, complete the following steps:

  1. On the DataBrew console, on the Projects page, choose Create project.
  2. For Project Name, enter valid-order.
  3. For Attached recipe, choose Create new recipe.
    The recipe name is populated automatically (valid-order-recipe).
  4. For Select a dataset, select My datasets.
  5. Select the order dataset.
  6. For Role name, choose the AWS Identity and Access Management (IAM) role to be used with DataBrew.
  7. Choose Create project.

You can see a success message along with our Amazon S3 order table with 500 rows.

After the project is opened, a DataBrew interactive session is created. DataBrew retrieves sample data based on your sampling configuration selection.

Build a transformation recipe

In a DataBrew interactive session, you can cleanse and normalize your data using over 350 pre-built transformations. In this post, we use DataBrew to perform a few transforms and filter only valid orders with order amounts greater than $0.

To do this, you perform the following steps:

  1. Choose Column and choose Delete.
  2. For Source columns, choose the columns order_id, timestamp, and transaction_date.
  3. Choose Apply.
  4. We filter the rows based on an amount value greater than $0 and add the condition as a recipe step.
  5. To create a custom sort based on state, choose SORT and choose Ascending.
  6. For Source, choose the column state_name.
  7. Select Sort by custom values.
  8. Enter a list of state names separated by commas.
  9. Choose Apply.

The following screenshot shows the full recipe that we applied to our dataset.

Create and run a DataBrew recipe job on the full data

Now that we have built the recipe, we can create and run a DataBrew recipe job.

  1. On the project details page, choose Create job.
  2. For Job name, enter valid-order.
  3. For Output to, choose Amazon S3.
  4. Enter the S3 path to store the output file.
  5. Choose Settings.

For File output options, you have multiple options:

    • Autogenerate files – This is the default file output setting, which generates multiple files and usually results in the fastest job runtime
    • Single file output – This option generates a single output file
    • Multiple file output – With this option, you specify the maximum number of files you want to split your data into
  1. For this post, select Single file output.
  2. Choose Save.
  3. For Role name, choose the IAM role to be used with DataBrew.
  4. Choose Create and run job.
  5. Navigate to the Jobs page and wait for the product-wise-sales-job job to complete.
  6. Navigate to output S3 bucket to confirm that a single output file is stored there.

Clean up

To avoid incurring future charges, delete all the resources created during this walkthrough:

  1. Delete the recipe job valid-order.
  2. Empty the job output stored in your S3 bucket and delete the bucket.
  3. Delete the IAM roles created as part of your projects and jobs.
  4. Delete the project valid-order and its associated recipe valid-order-recipe.
  5. Delete the DataBrew datasets.

Conclusion

In this post, we showed how to connect and transform data from an S3 data lake and create a DataBrew dataset. We also demonstrated how we can bring data from our data lake into DataBrew, seamlessly apply transformations, and write the prepared data back to the data lake in a single output file.

To learn more, refer to Creating and working with AWS Glue DataBrew recipe jobs.


About the Author

Dhiraj Thakur is a Solutions Architect with Amazon Web Services. He works with AWS customers and partners to provide guidance on enterprise cloud adoption, migration, and strategy. He is passionate about technology and enjoys building and experimenting in the analytics and AI/ML space.