Tag Archives: AWS Big Data

Migrating your Netezza data warehouse to Amazon Redshift

Post Syndicated from John Hwang original https://aws.amazon.com/blogs/big-data/migrating-your-netezza-data-warehouse-to-amazon-redshift/

With IBM announcing Netezza reaching end-of-life, you’re faced with the prospect of having to migrate your data and workloads off your analytics appliance. For some, this presents an opportunity to transition to the cloud.

Enter Amazon Redshift.

Amazon Redshift is a cloud-native data warehouse platform built to handle workloads at scale, and it shares key similarities with Netezza that make it an excellent candidate to replace your on-premises appliance. You can migrate your data and applications to Amazon Redshift in less time and with fewer changes than migrating to other analytics platforms. For developers, this means less time spent retraining on the new database. For stakeholders, it means a lower cost of, and time to, migration. For more information, see How to migrate a large data warehouse from IBM Netezza to Amazon Redshift with no downtime.

This post discusses important similarities and differences between Netezza and Amazon Redshift, and how they could impact your migration timeline.

Similarities

Three significant similarities between Netezza and Amazon Redshift are their compatibility with Postgres, their massively parallel processing (MPP) architecture, and the Amazon Redshift feature Advanced Query Accelerator (AQUA) compared to Netezza’s use of FPGAs.

Postgres compatibility

Both Netezza and Amazon Redshift share some compatibility with Postgres, an open-source database. This means that Netezza SQL and Amazon Redshift SQL have a similar syntax. In particular, both support many features of PL/pgSQL, Postgres’s procedural language. Your Netezza stored procedures can translate to Amazon Redshift with little-to-no rewriting of code.

You can also use the AWS Schema Conversion Tool, which can automatically migrate a large percentage of Netezza storage procedures to Amazon Redshift syntax with zero user effort. And because both databases are built for analytics and not transactional workloads, there are similar characteristics between the two databases. For example, both Netezza and Amazon Redshift don’t enforce primary keys to improve performance, though you can still define primary keys on your tables to help the optimizer create better query plans.

MPP architecture

Both Netezza and Amazon Redshift are MPP databases. This means that a query is sent to a leader node, which then compiles a set of commands that it sends to multiple compute nodes. Each worker node performs its task in parallel and returns the results to the leader node, where the results are aggregated and returned to the user. This means that you can apply similar architectural strategies from Netezza, such as zone maps and distribution styles, to Amazon Redshift. For more information, see Amazon Redshift Engineering’s Advanced Table Design Playbook: Preamble, Prerequisites, and Prioritization.

Amazon Redshift AQUA and Netezza’s FPGA

AWS recently announced the new Amazon Redshift feature AQUA, which is in preview as of this writing. AQUA uses AWS-designed processors to bring certain compute tasks closer to the storage layer. Compression, encryption, filtering, aggregation—and other tasks—can now happen in this intermediary layer in between the storage and compute, which leaves the CPU free to handle more complex tasks. When it’s released, AQUA will accelerate queries up to 10 times faster.

Netezza uses FPGAs to perform simple compute tasks before data reaches the CPU. Applications designed to employ these FPGA features on Netezza (for example, queries that rely heavily on certain aggregate functions and data filtering) translate well to Amazon Redshift clusters using AQUA.

Differences

For all the similarities that Amazon Redshift and Netezza share, they also have differences. There are three important differences that could have significant impact on your data and application architecture when migrating from Netezza to Amazon Redshift: column store vs. row store, concurrency scaling, and data lake integration.

Column store vs. row store

Netezza stores each row of data onto disk in data blocks, whereas Amazon Redshift stores each column of data. For many analytics workloads, a column store can have dramatic performance benefits over a row store. Typically, an analytics database has tables with many columns, but only a small handful of those columns are used in any one query. For example, assume that you have a table in a row store database with 100 columns and 100 million rows. If you want to sum the entire value of one column in this table, your query has to suffer the I/O penalty of scanning the entire table to retrieve the data in this single column. In a columnar database with the same table, the query only faces I/O for the single column. In addition to enjoying the improved performance for this type of workload in Amazon Redshift, this gives you options for designing wide tables without having to weigh the increase in I/O for typical analytics workloads the way you do in Netezza.

Concurrency scaling

Although both Netezza and Amazon Redshift offer queue priority and short query acceleration to help reduce concurrency issues, Amazon Redshift also uses the benefits of the cloud to offer additional options to handle concurrency.

One option is to take unrelated workloads from a single Netezza appliance and migrate them to separate Amazon clusters, each with an instance type and number of nodes sized accordingly for the workload it has to support. Netezza’s more traditional license and support pricing model, combined with its limited options for appliance sizes, can make this type of architecture untenable for your organization’s budget.

Additionally, Amazon Redshift offers Concurrency Scaling to scale out (and scale back in) additional compute capacity automatically and on the fly. If you want to add or remove nodes in the cluster, or experiment with other nodes types, you can do so in just a few minutes using elastic resize (to change the number of nodes) and classic resize (to change instance types).

This type of scaling and resizing isn’t feasible on Netezza because the on-premises appliance has a fixed number of blades. Concurrency scaling in Amazon Redshift can support virtually unlimited concurrent users and concurrent queries, and its ability to automatically add and remove additional capacity means you only pay for the time the concurrency scaling clusters are in use.

Data lake integration

Netezza offers the ability to create an external table from a data file either on the Netezza host or a remote host. However, querying those tables means migrating the entire dataset internally before running the query. With Amazon Redshift Spectrum, rather than using external tables as a convenient way to migrate entire datasets to and from the database, you can run analytical queries against data in your data lake the same way you do an internal table. As the volume of your data in the lake grows, partitioning can help keep the amount of data scanned for each query (and therefore performance) consistent. You can even join the data from your external tables in Amazon Simple Storage Service (Amazon S3) to your internal tables in Amazon Redshift.

Not only does this mean true integration of your data warehouse and data lake, which can lower your warehouse’s more expensive storage requirements, but because the Amazon Redshift Spectrum layer uses its own dedicated infrastructure outside of your cluster, you can offload many compute-intensive tasks from your cluster and push them down to the Redshift Spectrum layer. For more information, see Amazon Redshift Spectrum overview.

Conclusion

Amazon Redshift can accelerate migration from Netezza and potentially lower the time and cost of moving your analytics workloads into the cloud. The similarity between the two systems eases the pain of migration, but Amazon Redshift has important differences that can offer additional benefits around performance and cost. The expertise of AWS and its partner network can help with your migration strategy and offer guidance to avoid potential roadblocks and pitfalls.

 


About the Author

John Hwang is a senior solutions architect at AWS.

 

 

 

 

 

Brian McDermott is a senior sales manager at AWS.

 

 

 

Build an end to end, automated inventory forecasting capability with AWS Lake Formation and Amazon Forecast

Post Syndicated from Syed Jaffry original https://aws.amazon.com/blogs/big-data/build-an-end-to-end-automated-inventory-forecasting-capability-with-aws-lake-formation-and-amazon-forecast/

Amazon Forecast is a fully managed service that uses machine learning (ML) to generate highly accurate forecasts without requiring any prior ML experience. Forecast is applicable in a wide variety of use cases, including estimating product demand, inventory planning, and workforce planning.

With Forecast, there are no servers to provision or ML models to build manually. Additionally, you only pay for what you use, and there is no minimum fee or upfront commitment. To use Forecast, you only need to provide historical data for what you want to forecast, and any additional related data that may influence your forecasts. The latter may include both time-varying data, such as price, events, and weather, and categorical data, such as color, genre, or region. The service automatically trains and deploys ML models based on your data and provides you with a custom API to retrieve forecasts.

This post demonstrates how you can automate the data extraction, transformation, and use of Forecast for the use case of a retailer that requires recurring replenishment of inventory. You achieve this by using AWS Lake Formation to build a secure data lake and ingest data into it, orchestrate the data transformation using an AWS Glue workflow, and visualize the forecast results in Amazon QuickSight.

Use case background

Retailers have a recurring need to replenish inventory. For example, consider a clothing retailer that typically sells through its e-commerce and physical store channels. You need to maintain optimum levels of inventory on hand to meet demand while minimizing warehouse costs. Some of the common questions you need to answer for effective inventory management are:

  • What is the optimal quantity of inventory to reorder from my supplier for my next sales cycle?
  • What should the composition of product SKUs be in the purchase order to the supplier?
  • How do I most effectively determine the right mix and quantity of products to stock at individual retail store locations?

You can use Forecast to answer these questions. You extract data from the source systems, apply transformations to make the data ready for use in Forecast, and use Forecast to load, train, and forecast.

The following diagram shows the end-to-end system architecture of the proposed solution using Lake Formation, AWS Glue, and Amazon QuickSight.

You use Lake Formation to manage governance and access control on the data lake. Additionally, you use the following resources:

  1. Lake Formation blueprint to ingest sales data into a data lake
  2. AWS Lambda and Amazon S3 event notification to trigger an AWS Glue workflow
  3. AWS Glue workflow to trigger the execution of the data transform AWS Glue job
  4. AWS Glue workflow to orchestrate the three steps within Forecast (load, train, forecast)
  5. Forecast to export the forecast results into the data lake
  6. AWS Glue to trigger a crawler on the exported forecast results
  7. Amazon Athena and Amazon QuickSight to visualize the exported forecast results

Setting up the required IAM policies

Before you get started, you need to set up the required IAM policies. Complete the following steps:

  1. Sign in to the IAM console as a user with the AdministratorAccess AWS managed policy.
  2. Create an IAM user named report_builder to use when building your Amazon QuickSight analysis report and dashboard for visualization.
  3. Grant the AmazonAthenaFullAccess policy to the user.In the following steps, you create an IAM role for the AWS Glue jobs, crawler, and workflow to assume during their execution.
  4. On the IAM console, choose Roles.
  5. Choose Create role.
  6. On the Create role page, choose AWS service, and then choose Glue.
  7. Choose Next: Permissions.
  8. From the list of available policies, search for the AWSGlueServiceRole policy and select it.
  9. Name the role GLUE_WORKFLOW_ROLE.
  10. Choose Create role.
  11. On the Roles page, search for and choose GLUE_WORKFLOW_ROLE.
  12. On the Trust relationships tab, choose Edit trust relationship.
  13. Add the following assume role for Forecast :
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "forecast.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }

  14. Choose Update Trust Policy.
  15. On the role Summary page, on the Permissions tab, choose Add inline policy.
  16. Add the following inline policy:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "lakeformation:GetDataAccess",
                    "lakeformation:GrantPermissions"
                ],
                "Resource": "*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:DeleteObject",
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::forecast-blog-processed/*",
                    "arn:aws:s3:::forecast-blog-published/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
      "forecast:CreateDataset",
      "forecast:CreateDatasetGroup",
      "forecast:CreateDatasetImportJob",
      "forecast:CreateForecast",
      "forecast:CreateForecastExportJob",
      "forecast:CreatePredictor",
      "forecast:DescribeDataset",
      "forecast:DescribeDatasetGroup",
      "forecast:DescribeDatasetImportJob",
      "forecast:DescribeForecast",
      "forecast:DescribeForecastExportJob",
      "forecast:DescribePredictor",
      "forecast:ListDatasetGroups",
      "forecast:ListDatasetImportJobs",
      "forecast:ListDatasets",
      "forecast:ListForecastExportJobs",
      "forecast:ListForecasts",
      "forecast:ListPredictors",
      "forecast:UpdateDatasetGroup"
                ],
                "Resource": "*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "iam:PassRole",
                    "iam:GetRole"
                ],
                "Resource": [
                    "arn:aws:iam::*:role/AWSGlueServiceRole*",
                    "arn:aws:iam::[your aws account id]:role/GLUE_WORKFLOW_ROLE"
                ],
                "Condition": {
                    "StringLike": {
                        "iam:PassedToService": [
                            "glue.amazonaws.com"
                        ]
                    }
                }
            },
            {
                "Effect": "Allow",
                "Action": [
                    "iam:PassRole",
                    "iam:GetRole"
                ],
                "Resource": "arn:aws:iam::[your aws account id]:role/GLUE_WORKFLOW_ROLE",
                "Condition": {
                    "StringEquals": {
                        "iam:PassedToService": "forecast.amazonaws.com"
                    }
                }
            }
        ]
    }

Setting up your data lake storage

Next, you need a data lake to use in this automation. You create S3 buckets for data storage and apply appropriate security and governance. If you already have a data lake on Amazon S3, you can continue to use those S3 buckets with Lake Formation.

On the Amazon S3 console, create the following buckets:

  • forecast-blog-landing (for raw data ingest)
  • forecast-blog-processed (for the transformed data)
  • forecast-blog-published (for end consumers to access results)

This post includes walkthrough instructions for the landing folder, which you can repeat for the other two folders.

Enabling centralized access control on your data lake

You use Lake Formation’s centralized access control to enable access to the underlying S3 buckets for users and roles. With this model, you don’t need to create any additional IAM access policies or S3 bucket policies for your users and roles.

  1. Sign in to the AWS Lake Formation console as a data lake administrator IAM user.For instructions on setting up a data lake administrator user, see Create a Data Lake Administrator.To manage access control from Lake Formation, you register the three S3 buckets you created earlier as data lake locations with Lake Formation.
  1. On the Lake Formation dashboard, choose Register Location.
  2. For Amazon S3 path, enter your S3 bucket location (s3://forecast-blog-landing).
  3. Choose Register location.
  4. Repeat these steps for the processed and published.

Setting up the Lake Formation data catalog for your data lake

You create three databases in the Lake Formation data catalog, one for each S3 bucket you created earlier. All transformations done via AWS Glue operate on the databases in this catalog.

  1. On the Lake Formation console, under Data catalog, choose Databases.
  2. Choose Create database.
  3. In the Database details section, for Name, enter the name for the database (forecast-blog-landing-db).
  4. For Location, enter the location to the corresponding S3 bucket.
  5. For Description, add an optional description.
  6. Deselect Use only IAM access control for new tables in this database.
  7. Choose Create database.
  8. Repeat the above steps for the processed and published.
  9. On the Databases page, select the database you just created.
  10. From the Actions drop-down menu, choose Grant.
  11. For IAM users and roles, choose the GLUE_WORKFLOW_ROLE you created earlier.
  12. For Database permissions, select Create table.
  13. Choose Grant.At this stage, you have set up your data lake with Lake Formation. The following diagram illustrates your resources so far.

You’re now ready to ingest sales data into your data lake.

Ingesting data

This post uses an example MySQL table called sales, which contains 2 years of sales history of a single product. The schema of the table is as follows:

Column NameColumn Type
InvoiceNoint
StockCodeint
Descriptionvarchar(200)
Quantityint
InvoiceDatevarchar(50)
UnitPricefloat
CustomerIDint
StoreLocationvarchar(100)
CustomerNamevarchar(200)
  1. To begin the ingestion, create the source database.The following CloudFormation template creates a free-tier RDS MySQL instance with a database named sales_schema within a new VPC with the required sample data for this post. The template deploys in the us-west-2 Region.

  1. Create a new Glue connection for the source database.
  2. On the Lake Formation console, choose Blueprints.
  3. Choose Use a blueprint.
  4. Select Incremental database as the blueprint type for a regular ingest of sales data from the source relational database.
  5. Follow the prompts to complete the incremental blueprint setup.
  6. For Database connection, choose forecast-blog-db.
  7. For Source data path, enter sales_schema/sales.
  8. For Target database, choose forecast-blog-landing-db.
  9. For Target storage location, enter s3://forecast-blog-landing.
  10. For Data format, choose Parquet.
  11. For Workflow name, enter forecast-blog-wf.
  12. For IAM role, choose GLUE_WORKFLOW_ROLE.
  13. For Table prefix, enter blog.

The target location must be the landing S3 bucket that you created earlier, and table prefix must be set to blog. This is the raw data that subsequent AWS Glue jobs transform and process. For information about using an incremental database blueprint, see Importing Data Using Workflows.

Start the blueprint after you create it.

Orchestrating data transformation and forecast generation

When you have the raw sales data ingested into the data lake landing bucket, you execute a custom AWS Glue workflow to orchestrate the automation of data transformation, AWS Forecast load/train/forecast execution, and making the forecasts available for business dashboards. Complete the following steps:

  1. Create an AWS Glue crawler to crawl the published S3 bucket that you created earlier.
    1. Use the GLUE_WORKFLOW_ROLE that you created earlier.
  2. Create the following AWS Glue jobs to use in the forecasting automation.
    1. Create the data transformation job as a Spark job, and create the remaining jobs as Python shell (Python 3) jobs.
    2. For IAM role, use GLUE_WORKFLOW_ROLE.
  3. On the Connections page, choose Save job and edit script without selecting any connections. The following jobs are available on GitHub:

Next, you create a new AWS Glue workflow to orchestrate the entire automation. The workflow lets you build and orchestrate a sequence of AWS Glue jobs and crawlers via triggers to complete a complex process.

  1. On the AWS Glue console, choose Workflows.
  2. Choose Add workflow.
  3. For Workflow name, enter AmazonForecastWorkflow.
  4. For Description, add an optional description.
  5. For Default run properties, enter the keys and values in the following table.
KeyValue
landingDBforecast-blog-landing-db
landingDBTableblog_sales_schema_sales
processedBucketforecast-blog-processed
publishedBucketforecast-blog-published
  1. Choose Add workflow.
    After you create the workflow, you add triggers, jobs, and a crawler in your workflow.
  1. Choose the workflow you created earlier.
  2. Choose Add trigger.
  3. For name, enter StartWorkflow.
  4. For Trigger type, select On demand.
  5. Choose Add.
  6. On the Jobs tab, choose the job you created earlier.
  7. Choose Add.
  8. Choose Add node.
  9. Create a new trigger to watch the end of the transform job and start the data import job.
  10. For Name, enter StartDataImport.
  11. For Trigger type, select Event.
  12. For Trigger logic, leave as Start after ANY watched event.
  13. Choose Add.
  1. Choose Add node to the left of the trigger and choose the data transformation job you created earlier.
  2. Choose Add node to the right of the trigger and choose the import data job you created earlier.
  1. Repeat these steps to create the following triggers:
TriggerGlue job to completeGlue job to start
CheckImportTrigger

 

Importing data into Forecast

 

Checking the load data job status
StartPredictorTrigger

Checking the load data job status

 

Training the Forecast predictor

 

CheckPredictorTrigger

 

Training the Forecast predictor

 

Checking the train predictor job status
GenerateForecastTriggerChecking the train predictor job statusGenerating forecast
CheckForecastTriggerGenerating forecastChecking the forecast job status
ExportForecastTriggerChecking the forecast job statusExporting forecast to data lake published bucket
CheckExportTriggerExporting forecast to data lake published bucket

Checking the export forecast job status

 

StartCrawlerTrigger

 

Checking the export forecast job status

 

Crawler you created to crawl the published S3 bucket
  1. From the Actions drop-down menu, choose Run.

This starts the end-to-end forecasting process.

Visualizing your forecasts

To provide your users with a dashboard that refreshes regularly with new forecasts, set up an Amazon QuickSight report and a dashboard and connection to the data lake via Athena. You can use Amazon QuickSight and the Athena data source to access the forecast data and make visualizations.

First, you need to grant access to the forecast data to QuickSight. Identify the Amazon QuickSight service role in your account. Amazon QuickSight assumes the service role (aws-quicksight-service-role-v0) to interact with other AWS services. The service role is automatically created when you start using Amazon QuickSight.

  1. As the data lake administrator user, on the Lake Formation console, locate the table created by the AWS Glue workflow under your published-db.This is the table that has the exported forecast data to visualize.
  2. Grant Select access on this table to the Amazon QuickSight service role using the Grant action within Lake Formation.For more information, see Granting Data Catalog Permissions.You now create a dashboard to visualize the forecast data in Amazon QuickSight.
  3. On the Amazon QuickSight console, choose Manage data.
  4. Create a data source for Athena.
  5. For Data source name, enter forecast-blog-published-db.
  6. Choose Create data source.
  7. For Database, choose forecast-blog-published-db.
  8. Select the table that your crawler created for the exported forecast (forecast_blog_published).
  9. Choose Select.
  10. Choose Visualize.
  11. Create a new analysis on the dataset.
  12. Publish a visualization dashboard.The following screenshot is a QuickSight dashboard displaying your exported forecast. The dashboard was created from a line chart analysis with the columns p10, p50, and p90 values selected for the y-axis and date selected for the x-axis.

Forecast generates probabilistic forecasts so you can generate forecasts at different percentiles depending on your specific use case (for example, if under-stocking or over-stocking is critical to the business). The preceding graph represents the upper and lower bands of forecasted inventory values for the product in your sample data and a selected location (applied as a filter) for a 10-day period. Using this, you can decide on the optimum levels of stock to hold or order for that week.

The p10 is the lower boundary, meaning that there’s only a 10% chance that the actual value is below this line. However, P90 is the upper bound, meaning that there’s a 90% chance that the actual value is below this line. As your training data becomes more comprehensive, the p10 and p90 start to converge. You can also generate forecasts on custom quantiles of your choosing.

For conservative planning, choose a value closer to the p90, which means you’re willing to purchase more inventory than what you actually sell. For aggressive planning, choose a value closer to the p10, which means that you’re willing to accept the risk of running out of inventory.

Conclusion

In this post, you learned how to build an automated inventory forecasting capability for your business on AWS using AI through Forecast and Lake Formation. You learned how to set up a data lake on AWS with the required security governance using Lake Formation. You also learned how to automate the end-to-end process of ingesting sales data into your data lake and automating the data transformation; loading, training, and generating forecasts with Forecast; and making the forecasts accessible to your end-users via Amazon QuickSight visualizations.

 


About the Author

Syed Jaffry is a solutions architect with Amazon Web Services. He works with Financial Services customers to help them deploy secure, resilient, scalable and high performance applications in the cloud.

 

 

Build an AWS Well-Architected environment with the Analytics Lens

Post Syndicated from Nikki Rouda original https://aws.amazon.com/blogs/big-data/build-an-aws-well-architected-environment-with-the-analytics-lens/

Building a modern data platform on AWS enables you to collect data of all types, store it in a central, secure repository, and analyze it with purpose-built tools. Yet you may be unsure of how to get started and the impact of certain design decisions. To address the need to provide advice tailored to specific technology and application domains, AWS added the concept of well-architected lenses 2017. AWS now is happy to announce the Analytics Lens for the AWS Well-Architected Framework. This post provides an introduction of its purpose, topics covered, common scenarios, and services included.

The new Analytics Lens offers comprehensive guidance to make sure that your analytics applications are designed in accordance with AWS best practices. The goal is to give you a consistent way to design and evaluate cloud architectures, based on the following five pillars:

  • Operational excellence
  • Security
  • Reliability
  • Performance efficiency
  • Cost optimization

The tool can help you assess the analytics workloads you have deployed in AWS by identifying potential risks and offering suggestions for improvements.

Using the Analytics Lens to address common requirements

The Analytics Lens models both the data architecture at the core of the analytics applications and the application behavior itself. These models are organized into the following six areas, which encompass the vast majority of analytics workloads deployed on AWS:

  1. Data ingestion
  2. Security and governance
  3. Catalog and search
  4. Central storage
  5. Processing and analytics
  6. User access

The following diagram illustrates these areas and their related AWS services.

There are a number of common scenarios where the Analytics Lens applies, such as the following:

  • Building a data lake as the foundation for your data and analytics initiatives
  • Efficient batch data processing at scale
  • Building a platform for streaming ingest and real-time event processing
  • Handling big data processing and streaming
  • Data-preparation operations

Whichever of these scenarios fits your needs, building to the principles of the Analytics Lens in the AWS Well-Architected Framework can help you implement best practices for success.

The Analytics Lens explains when and how to use the core services in the AWS analytics portfolio. These include Amazon Kinesis, Amazon Redshift, Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation. It also explains how Amazon Simple Storage Service (Amazon S3) can serve as the storage for your data lake and how to integrate with relevant AWS security services. With reference architectures, best practices advice, and answers to common questions, the Analytics Lens can help you make the right design decisions.

Conclusion

Applying the lens to your existing architectures can validate the stability and efficiency of your design (or provide recommendations to address the gaps that are identified). AWS is committed to the Analytics Lens as a living tool; as the analytics landscape evolves and new AWS services come on line, we’ll update the Analytics Lens appropriately. Our mission will always be to help you design and deploy well-architected applications.

For more information about building your own Well-Architected environment using the Analytics Lens, see the Analytics Lens whitepaper.

Special thanks to the following individuals who contributed to building this resource, among many others who helped with review and implementation: Radhika Ravirala, Laith Al-Saadoon, Wallace Printz, Ujjwal Ratan, and Neil Mukerje.

Are there questions you’d like to see answered in the tool? Share your thoughts and questions in the comments.

 


About the Authors

Nikki Rouda is the principal product marketing manager for data lakes and big data at Amazon Web Services. Nikki has spent 20+ years helping enterprises in 40+ countries develop and implement solutions to their analytics and IT infrastructure challenges. Nikki holds an MBA from the University of Cambridge and an ScB in geophysics and math from Brown University.

 

 


Radhika Ravirala is a specialist solutions architect at Amazon Web Services, where she helps customers craft distributed analytics applications on the AWS platform. Prior to her cloud journey, she worked as a software engineer and designer for technology companies in Silicon Valley.

Optimize memory management in AWS Glue

Post Syndicated from Mohit Saxena original https://aws.amazon.com/blogs/big-data/optimize-memory-management-in-aws-glue/

AWS Glue provides a serverless environment to prepare and process datasets for analytics using the power of Apache Spark. In the third post of the series, we discussed how AWS Glue can automatically generate code to perform common data transformations. We also looked at how you can use AWS Glue Workflows to build data pipelines that enable you to easily ingest, transform and load data for analytics.

Apache Spark provides several knobs to control how memory is managed for different workloads. However, this is not an exact science and applications may still run into a variety of out of memory (OOM) exceptions because of inefficient transformation logic, unoptimized data partitioning or other quirks in the underlying Spark engine. In this post of the series, we will go deeper into the inner working of a Glue Spark ETL job, and discuss how we can combine AWS Glue capabilities with Spark best practices to scale our jobs to efficiently handle the variety and volume of our data.

Scaling the Apache Spark driver

Apache Spark driver is responsible for analyzing the job, coordinating, and distributing work to tasks to complete the job in the most efficient way possible. In majority of ETL jobs, the driver is typically involved in listing table partitions and the data files in Amazon S3 before it compute file splits and work for individual tasks. The driver then coordinates tasks running the transformations that will process each file split. In addition, the driver needs to keep track of the progress of each task is making and collect the results at the end. The Spark driver may become a bottleneck when a job needs to process large number of files and partitions. AWS Glue offers five different mechanisms to efficiently manage memory on the Spark driver when dealing with a large number of files.

  1. Push down predicates: Glue jobs allow the use of push down predicates to prune the unnecessary partitions from the table before the underlying data is read. This is useful when you have a large number of partitions in a table and you only want to process a subset of them in your Glue ETL job. Pruning catalog partitions reduces both the memory footprint of the driver and the time required to list the files in the pruned partitions. Push down predicates are applied first to ignore unnecessary partitions before the job bookmark and other exclusions can further filter the list of files to be read from each partition. Below is an example to how to use push down predicates to only process data for events logged only on weekends.
    partitionPredicate ="date_format(to_date(concat(year, '-', month, '-', day)), 'E') in ('Sat', 'Sun')"
    
    datasource = glue_context.create_dynamic_frame.from_catalog(
        database = "githubarchive_month", 
        table_name = "data", 
        push_down_predicate = partitionPredicate)

  2. Glue S3 Lister: AWS Glue provides an optimized mechanism to list files on S3 while reading data into a DynamicFrame. The Glue S3 Lister can be enabled by setting the DynamicFram’s additional_options parameter useS3ListImplementation to True. The Glue S3 Lister offers advantage over the default S3 list implementation by strictly iterating over the final list of filtered files to be read.
    datasource = glue_context.create_dynamic_frame.from_catalog(
        database = "githubarchive_month", 
        table_name = "data", 
        push_down_predicate = partitionPredicate,
        additional_options = {"useS3ListImplementation":True}
    )  

  3. Grouping: AWS Glue allows you to consolidate multiple files per Spark task using the file grouping feature. Grouping files together reduces the memory footprint on the Spark driver as well as simplifying file split orchestration. Without grouping, a Spark application must process each file using a different Spark task. Each task must then send mapStatus object containing the location information to the Spark driver. In our testing using AWS Glue standard worker type, we found that Spark applications processing more than roughly 650,000 files often cause the Spark driver to crash with an out of memory exception as shown by the following error message:
    # java.lang.OutOfMemoryError: Java heap space
    # -XX:OnOutOfMemoryError="kill -9 %p"
    # Executing /bin/sh -c "kill -9 12039"...

    • groupFiles allows you to group files within a Hive-style S3 partition (inPartition) and across S3 partitions (acrossPartition). groupSize is an optional field that allows you to configure the amount of data to be read from each file and processed by individual Spark tasks.
      dyf = glueContext.create_dynamic_frame_from_options("s3",
          {'paths': ["s3://input-s3-path/"],
          'recurse':True,
          'groupFiles': 'inPartition',
          'groupSize': '1048576'}, 
          format="json")

  4. Exclusions for S3 Paths: To further aid in filtering out files that are not required by the job, AWS Glue introduced a mechanism for users to provide a glob expression for S3 paths to be excluded. This speeds job processing while reducing the memory footprint on the Spark driver. The following code snippet shows how to exclude all objects ending with _metadata in the selected S3 path.
    dyf = glueContext.create_dynamic_frame_from_options("s3",
        {'paths': ["s3://input-s3-path/"],
        'exclusions': "\"[\"input-s3-path/**_metadata\"]\""}, 
        format="json")

    Exclusions for S3 Storage Classes: AWS Glue offers the ability to exclude objects based on their underlying S3 storage class. As the lifecycle of data evolve, hot data becomes cold and automatically moves to lower cost storage based on the configured S3 bucket policy, it’s important to make sure ETL jobs process the correct data. This is particularly useful when working with large datasets that span across multiple S3 storage classes using Apache Parquet file format where Spark will try to read the schema from the file footers in these storage classes. Amazon S3 offers 5 different storage classes which are STANDARD, INTELLIGENT_TIERING, STANDARD_IA, ONEZONE_IA, GLACIER, DEEP_ARCHIVE and REDUCED_REDUNDANCY. When reading data using DynamicFrames, you can specify a list of S3 storage classes you want to exclude. This feature leverages the optimized AWS Glue S3 Lister. The following example shows how to exclude files stored in GLACIER and DEEP_ARCHIVE storage classes.

    glueContext.create_dynamic_frame.from_catalog(
        database = "my_database",
        tableName = "my_table_name",
        redshift_tmp_dir = "",
        transformation_ctx = "my_transformation_context",
        additional_options = {
            "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"]
        )
    )

    GLACIER and DEEP_ARCHIVE storage classes only allow listing files and require an asynchronous S3 restore process to read the actual data. The following is the exception you will see when trying to access Glacier and Deep Archive storage classes from your Glue ETL job:

    java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
    The operation is not valid for the object's storage class (Service: Amazon S3; Status Code: 403; 
    Error Code: InvalidObjectState; Request ID: ), S3 Extended Request ID: (1)

  5. Optimize Spark queries: Inefficient queries or transformations can have a significant impact on Apache Spark driver memory utilization.Common examples include:
    • collect is a Spark action that collects the results from workers and return them back to the driver. In some cases the results may be very large overwhelming the driver. It is recommended to be careful while using collect as it can frequently cause Spark driver OOM exceptions as shown below:
      An error occurred while calling 
      z:org.apache.spark.api.python.PythonRDD.collectAndServe.
      Job aborted due to stage failure:
      Total size of serialized results of tasks is bigger than spark.driver.maxResultSize

    • Shared Variables: Apache Spark offers two different ways to share variables between Spark driver and executors: broadcast variables and accumulators. Broadcast variables are useful to provide a read-only copy of data or fact tables shared across Spark workers to improve map-side joins. Accumulators are useful to provide a writeable copy to implement distributed counters across Spark executors. Both should be used carefully and destroyed when no longer needed as they can frequently result in Spark driver OOM exceptions.

Scaling Apache Spark executors

Apache Spark executors process data in parallel. However, un-optimized reads from JDBC sources, unbalanced shuffles, buffering of rows with PySpark UDFs, exceeding off-heap memory on each Spark worker, skew in size of partitions, can all result in Spark executor OOM exceptions. We list below some of the best practices with AWS Glue and Apache Spark for avoiding these conditions that result in OOM exceptions.

  1. JDBC Optimizations: Apache Spark uses JDBC drivers to fetch data from JDBC sources such as MySQL, PostgresSQL, Oracle.
    • Fetchsize: By default, the Spark JDBC drivers configure the fetch size to zero. This means that the JDBC driver on the Spark executor tries to fetch all the rows from the database in one network round trip and cache them in memory, even though Spark transformation only streams through the rows one at a time. This may result in the Spark executor running out of memory with the following exception:
      WARN YarnAllocator: Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
      WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
      ERROR YarnClusterScheduler: Lost executor 4 on ip-10-1-2-96.ec2.internal: Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
      WARN TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3, ip-10-1-2-96.ec2.internal, executor 4): ExecutorLostFailure (executor 4 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

      In Spark, you can avoid this scenario by explicitly setting the fetch size parameter to a non-zero default value. With AWS Glue, Dynamic Frames automatically use a fetch size of 1,000 rows that bounds the size of cached rows in JDBC driver and also amortizes the overhead of network round-trip latencies between the Spark executor and database instance. The example below shows how to read from a JDBC source using Glue dynamic frames.

      val (url, database, tableName) = {
       ("jdbc_url", "db_name", "table_name")
       } 
      val source = glueContext.getSource(format, sourceJson)
      val dyf = source.getDynamicFrame

  • Spark’s Read Partitioning: Apache Spark by default uses only one executor to open up a JDBC connection with the database and read the entire table into a Spark dataframe. This can result in an unbalanced distribution of data processed across different executors. As a result, it is usually recommended to use a partitionColumn, lowerBound, upperBound, and numPartitions to enable reading in parallel from different executors. This allows for more balanced partitioning if there exists a column that has a uniform value distribution. However, Apache Spark restricts the partitionColumn to be one of numeric, date, or timestamp data types. For example:
    val df = spark.read.jdbc(url=jdbcUrl, 
        table="employees", partitionColumn="emp_no", 
        lowerBound=1L, upperBound=100000L, numPartitions=100, 
        fetchsize=1000, connectionProperties=connectionProperties)

  • Glue’s Read Partitioning: AWS Glue enables partitioning JDBC tables based on columns with generic types, such as string. This enables you to read from JDBC sources using non-overlapping parallel SQL queries executed against logical partitions of your table from different Spark executors. You can control partitioning by setting a hashfield or hashexpression. You can also control the number of parallel reads that are used to access your data by specifying hashpartitions. For best results, this column should have an even distribution of values to spread the data between partitions. For example, if your data is evenly distributed by month, you can use the month column to read each month of data in parallel. Based on the database instance type, you may like to tune the number of parallel connections by adjusting the hashpartitions. For example:
    glueContext.create_dynamic_frame.from_catalog(
        database = "my_database",
        tableName = "my_table_name",
        transformation_ctx = "my_transformation_context",
        additional_options = {
            'hashfield': 'month',
            'hashpartitions': '5'
        )
    )

  • Bulk Inserts: AWS Glue offers parallel inserts for speeding up bulk loads into JDBC targets. The following example uses a bulk size of two, which allows two inserts to happen in parallel. This is helpful for improving the performance of writes into databases such as Aur.
    val optionsMap = Map(
      "user" -> user,
      "password" -> pwd,
      "url" -> postgresEndpoint,
      "dbtable" -> table,
      "bulkSize" -> "2")
    val options = JsonOptions(optionsMap)
    val jdbcWrapper = JDBCWrapper(glueContext, options)
    glueContext.getSink("postgresql", options).writeDynamicFrame(dyf)

  1. Join Optimizations: One common reason for Apache Spark applications running out of memory is the use of un-optimized joins across two or more tables. This is typically a result of data skew due to the distribution of join columns or an inefficient choice of join transforms. Additionally, ordering of transforms and filters in the user script may limit the Spark query planner’s ability to optimize. There are 3 popular approaches to optimize join’s on AWS Glue.
    • Filter tables before Join: You should pre-filter your tables as much as possible before joining. This helps to minimize the data shuffled between the executors over the network. You can use AWS Glue push down predicates for filtering based on partition columns, AWS Glue exclusions for filtering based on file names, AWS Glue storage class exclusions for filtering based on S3 storage classes, and use columnar storage formats such as Parquet and ORC that support discarding row groups based on column statistics such as min/max of column values.
    • Broadcast Small Tables: Joining tables can result in large amounts of data being shuffled or moved over the network between executors running on different workers. Because of this, Spark may run out of memory and spill the data to physical disk on the worker. This behavior can be observed in the following log message:
      INFO [UnsafeExternalSorter] — Thread 168 spilling sort data of 3.1 GB to disk (0 time so far)

      In cases where one of the tables in the join is small, few tens of MBs, we can indicate Spark to handle it differently reducing the overhead of shuffling data. This is performed by hinting Apache Spark that the smaller table should be broadcasted instead of partitioned and shuffled across the network. The Spark parameter spark.sql.autoBroadcastJoinThreshold configures the maximum size, in bytes, for a table that will be broadcast to all worker nodes when performing a join. Apache Spark will automatically broadcast a table when it is smaller than 10 MB. You can also explicitly tell Spark which table you want to broadcast as shown in the following example:

      val employeesDF = employeesRDD.toDF
      va departmentsDF = departmentsRDD.toDF
      
      // materializing the department data
      val tmpDepartments = broadcast(departmentsDF.as("departments"))
      
      val joinedDF = employeesDF.join(broadcast(tmpDepartments), 
         $"depId" === $"id",  // join by employees.depID == departments.id 
         "inner")
      
      // Show the explain plan and confirm the table is marked for broadcast
      joinedDF.explain()
      
      == Physical Plan ==
      *BroadcastHashJoin [depId#14L], [id#18L], Inner, BuildRight
      :- *Range (0, 100, step=1, splits=8)
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
         +- *Range (0, 100, step=1, splits=8

  1. PySpark User Defined Functions (UDFs): Using PySpark UDFs can turn out to be costly for executor memory. This is because data must be serialized/deserialized when it is exchanged between the Spark executor JVM and the Python interpreter. The Python interpreter needs to process the serialized data in Spark executor’s off-heap memory. For datasets with large or nested records or when using complex UDFs, this processing can consume large amounts of off-heap memory and can lead to OOM exceptions resulting from exceeding the yarn memoryOverhead. Here what the error message looks like:
    ERROR YarnClusterScheduler: Lost executor 1 on ip-xxx:
    Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used.
    Consider boosting spark.yarn.executor.memoryOverhead

    Similarly, data serialization can be slow and often leads to longer job execution times. To avoid such OOM exceptions, it is a best practice to write the UDFs in Scala or Java instead of Python. They can be imported by providing the S3 Path of Dependent Jars in the Glue job configuration. Another optimization to avoid buffering of large records in off-heap memory with PySpark UDFs is to move select and filters upstream to earlier execution stages for an AWS Glue script.

  2. Incremental processing: Processing large datasets in S3 can result in costly network shuffles, spilling data from memory to disk, and OOM exceptions. To avoid these scenarios, it is a best practice to incrementally process large datasets using AWS Glue Job Bookmarks, Push-down Predicates, and Exclusions. Concurrent job runs can process separate S3 partitions and also minimize the possibility of OOMs caused due to large Spark partitions or unbalanced shuffles resulting from data skew. Vertical scaling with higher memory instances can also mitigate the chances of OOM exceptions because of insufficient off-heap memory or Apache Spark applications that can not be readily optimized.

You can also use Glue’s G.1X and G.2X worker types that provide more memory and disk space to vertically scale your Glue jobs that need high memory or disk space to store intermediate shuffle output. Vertical scaling for Glue jobs is discussed in our first blog post of this series.

Conclusion

In this post, we discussed a number of techniques to enable efficient memory management for Apache Spark applications when reading data from Amazon S3 and compatible databases using a JDBC connector. We described how Glue ETL jobs can utilize the partitioning information available from AWS Glue Data Catalog to prune large datasets, manage large number of small files, and use JDBC optimizations for partitioned reads and batch record fetch from databases. You can use some or all of these techniques to help ensure your ETL jobs perform well.

In the next post, we will describe how you can develop Apache Spark applications and ETL scripts locally from your laptop itself with the Glue Spark Runtime containing these optimizations. You can build against the Glue Spark Runtime available from Maven or using a Docker container for cross-platform support. You can develop using Jupyter/Zeppelin notebooks, or your favorite IDE such as PyCharm. Next, you can deploy those Spark applications on AWS Glue’s serverless Spark platform.

 


About the Author

Mohit Saxena is a technical lead manager at AWS Glue. His passion is building scalable distributed systems for efficiently managing data on cloud. He also enjoys watching movies, and reading about the latest technology.

 

 

Build an automatic data profiling and reporting solution with Amazon EMR, AWS Glue, and Amazon QuickSight

Post Syndicated from Francesco Marelli original https://aws.amazon.com/blogs/big-data/build-an-automatic-data-profiling-and-reporting-solution-with-amazon-emr-aws-glue-and-amazon-quicksight/

In typical analytics pipelines, one of the first tasks that you typically perform after importing data into your data lakes is data profiling and high-level data quality analysis to check the content of the datasets. In this way, you can enrich the basic metadata that contains information such as table and column names and their types.

The results of data profiling help you determine whether the datasets contain the expected information and how to use them downstream in your analytics pipeline. Moreover, you can use these results as one of the inputs to an optional data semantics analysis stage.

The great quantity and variety of data in modern data lakes make unstructured manual data profiling and data semantics analysis impractical and time-consuming. This post shows how to implement a process for the automatic creation of a data profiling repository, as an extension of AWS Glue Data Catalog metadata, and a reporting system that can help you in your analytics pipeline design process and by providing a reliable tool for further analysis.

This post describes in detail the application Data Profiler for AWS Glue Data Catalog and provides step-by-step instructions of an example implementation.

Overview and architecture

The following diagram illustrates the architecture of this solution.

Data Profiler for AWS Glue Data Catalog is an Apache Spark Scala application that profiles all the tables defined in a database in the Data Catalog using the profiling capabilities of the Amazon Deequ library and saves the results in the Data Catalog and an Amazon S3 bucket in a partitioned Parquet format. You can use other analytics services such as Amazon Athena and Amazon QuickSight to query and visualize the data.

For more information about the Amazon Deequ data library, see Test data quality at scale with Deequ or the source code on the GitHub repo.

Metadata can be defined as data about data. Metadata for a table contains information like the table name and other attributes, column names and types, and the physical location of the files that contain the data. The Data Catalog is the metadata repository in AWS, and you can use it with other AWS services like Athena, Amazon EMR, and Amazon Redshift.

After you create or update the metadata for tables in a database (for example, adding new data to the table), either with an AWS Glue crawler or manually, you can run the application to profile each table. The results are stored as new versions of the tables’ metadata in the Data Catalog, which you can view interactively via the AWS Lake Formation console or query programmatically via the AWS CLI for AWS Glue.

For more information about the Data Profiler, see the GitHub repo.

The Deequ library does not support tables with nested data (such as JSON). If you want to run the application on a table with nested data, this must be un-nested/flattened or relationalized before profiling. For more information about useful transforms for this task, see AWS Glue Scala DynamicFrame APIs or AWS Glue PySpark Transforms Reference.

The following table shows the profiling metrics the application computes for column data types Text and Numeric. The computation of some profiling metrics for Text columns can be costly and is disabled by default. You can enable it by setting the compExp input parameter to true (see next section).

MetricDescriptionData Type
ApproxCountDistinctApproximate number of distinct values, computed with HyperLogLogPlusPlus sketches.Text / Numeric
CompletenessFraction of non-null values in a column.Text / Numeric
DistinctnessFraction of distinct values of a column over the number of all values of a column. Distinct values occur at least one time. For example, [a, a, b] contains two distinct values a and b, so distinctness is 2/3.Text / Numeric
MaxLengthMaximum length of the column.Text
MinLengthMinimum length of the column.Text
CountDistinctExact number of distinct values.Text
EntropyEntropy is a measure of the level of information contained in an event (value in a column) when considering all possible events (values in a column). It is measured in nats (natural units of information). Entropy is estimated using observed value counts as the negative sum of (value_count/total_count) * log(value_count/total_count). For example, [a, b, b, c, c] has three distinct values with counts [1, 2, 2]. Entropy is then (-1/5*log(1/5)-2/5*log(2/5)-2/5*log(2/5)) = 1.055.Text
HistogramThe summary of values in a column of a table. Groups the given column’s values and calculates the number of rows with that specific value and the fraction of this value.Text
UniqueValueRatioFraction of unique values over the number of all distinct values of a column. Unique values occur exactly one time; distinct values occur at least one time. Example: [a, a, b] contains one unique value b, and two distinct values a and b, so the unique value ratio is 1/2.Text
UniquenessFraction of unique values over the number of all values of a column. Unique values occur exactly one time. Example: [a, a, b] contains one unique value b, so uniqueness is 1/3.Text
ApproxQuantilesApproximate quantiles of a distribution.Numeric
MaximumMaximum value of the column.Numeric
MeanMean value of the column.Numeric
MinimumMinimum value of the column.Numeric
StandardDeviationStandard deviation value of the column.Numeric
SumSum of the column.Numeric

Application description

You can run the application via spark-submit on a transient or permanent EMR cluster (see the “Creating an EMR cluster” section in this post for minimum version specification) with Spark installed and configured with the Data Catalog settings option Use for Spark table metadata enabled.

The following example code executes the application:

 $ spark-submit \
  --class awsdataprofiler.DataProfilerForAWSGlueDataCatalog \
  --master yarn \
  --deploy-mode cluster \
  --name data-profiler-for-aws-glue-data-catalog \
  /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar \
  --dbName nyctlcdb \
  --region eu-west-1 \
  --compExp true \
  --statsPrefix DQP \
  --s3BucketPrefix deequ-profiler/deequ-profiler-metrics \
  --profileUnsupportedTypes true \
  --noOfBins 30 \
  --quantiles 10

The following table summarizes the input parameters that the application accepts.

NameTypeRequiredDefaultDescription
--dbName (-d)StringYesN/AData Catalog database name. The database must be defined in the Catalog owned by the same account where the application is executed.
--region (-r)StringYesN/AAWS Region endpoint where the Data Catalog database is defined, for example us-west-1 or us-east-1. For more information, see Regional Endpoints.
--compExp (-c)BooleanNofalseIf true, the application also executes “expensive” profiling analyzers on Text columns. These are CountDistinct, Entropy, Histogram, UniqueValueRatio, and Uniqueness. If false, only the following default analyzers are executed: ApproxCountDistinct, Completeness, Distinctness, MaxLength, MinLength. All analyzers for Numeric columns are always executed.
--statsPrefix (-p)StringNoDQPString prepended to the statistics names in the Data Catalog. The application also adds two underscores (__). This is useful to identify metrics calculated by the application.
--s3BucketPrefix (-s)StringNoblankFormat must be s3Buckename/prefix. If specified, the application writes Parquet files with metrics in the prefixes db_name=…/table_name=….
--profileUnsupportedTypes (-u)BooleanNofalseBy default, the Amazon Deequ library only supports Text and Numeric columns. If this parameter is set to true, the application also profiles columns of type Boolean and Date.
--noOfBins (-b)IntegerNo10When --compExp (-c) is true, sets the number of maximum values to create for the Histogram analyzer for String columns.
--quantiles (-q)IntegerNo10Sets the number of quantiles to calculate for the ApproxQuantiles analyzer for numeric columns.

Setting up your environment

The following walkthrough demonstrates how to create and populate a Data Catalog database with three tables, which simulates a process with monthly updates. For this post, you simulate three monthly runs: February 2, 2019, March 2, 2019, and April 2, 2019.

After table creation and after each monthly table update, you run the application to generate and update the profiling information for each table in the Data Catalog and Amazon S3 repository. The post also provides examples of how to query the data using the AWS CLI and Athena, and build a simple Amazon QuickSight dashboard.

This post uses the New York City Taxi and Limousine Commission (TLC) Trip Record Data on the Registry of Open Data on AWS. In particular, you use the tables yellow_tripdata, fhv_tripdata, and green_tripdata.

The following steps explain how to set up the environment.

Creating an EMR cluster

The first step is to create an EMR cluster. Connect to the cluster master node and execute the code via spark-submit. Make sure that the cluster version is at least 5.28.0 with at least Hadoop and Spark installed and that you use the Data Catalog as table metadata for Spark.

The master node should also be accessible via SSH. For instructions, see Connect to the Master Node Using SSH.

Downloading the application

You can download the source code and create a uber jar with all dependencies from the application GitHub repo. You can build the application as a uber jar with all dependencies using the Scala Build Tool (sbt) with the following commands (adjust the memory values according to your needs):

$ export SBT_OPTS="-Xms1G -Xmx3G -Xss2M -XX:MaxMetaspaceSize=3G" && sbt assembly

By default, the .jar file is created in the following path, relative to the project root directory:

./target/scala-2.11/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar

When the .jar file is available, copy it to the master node of the EMR cluster. One way to copy the file to the master node code is to copy it to Amazon S3 from the client where the file was created and download it from Amazon S3 to the master node.

For this post, copy the file in the /home/hadoop directory of the master node. The full path is /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar.

Setting up S3 buckets and copy initial data

You use an S3 bucket to store the data that you profile. For this post, the bucket name is

s3://aws-big-data-blog-samples/

You need to create a bucket with a unique name in your account and store the data there. When the bucket is created and available, use the AWS CLI to copy the first set of files for January 2019 (therefore simulating the February 2, 2019, run) from the s3://nyc-tlc/ bucket. See the following code:

$ DEST_BUCKET=aws-big-data-blog-samples
$ MONTH=2019-01
 
$ aws s3 cp "s3://nyc-tlc/trip data/yellow_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-yellow/yellow_tripdata_${MONTH}.csv"
 
$ aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-fhv/fhv_tripdata_${MONTH}.csv"
 
$ aws s3 cp "s3://nyc-tlc/trip data/green_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-green/green_tripdata_${MONTH}.csv"

After you copy the data to your destination bucket, create a second bucket to store the files with the profiling metrics created by the application. This step is optional because you can write the metrics to a prefix in an existing bucket. See the following code:

$ aws s3 mb s3://deequ-profiler/

You are now ready to create the database and tables metadata in the Data Catalog.

Creating metadata in the Data Catalog

The first step is to create a database in AWS Glue. You can create the database using the AWS CLI. See the following code:

$ aws glue create-database \
    --database-input '{"Name": "nyctlcdb"}'

Alternatively, on the AWS Glue console, choose Databases, Add database.

After you create the database, create a new AWS Glue Crawler to infer the schema of the data in the files you copied in the previous step. Complete the following steps:

  1. On the AWS Glue console, choose Crawler.
  2. Choose Add crawler.
  3. For Crawler name, enter nyc-tlc-db-raw.
  4. Choose Next.
  5. For Choose a data store, choose S3.
  6. For Crawl data in, choose Specified path in my account.
  7. For Include path, enter the S3 bucket and prefix where you copied the data earlier.
  8. Choose Next.
  9. In the Choose an IAM role section, select Choose an existing IAM role.
  10. Choose an IAM role that provides access to the S3 bucket and allows writing to the Data Catalog, or create a new one while creating this crawler.
  11. Choose Next.
  12. Choose the database you created earlier to store the tables’ metadata.
  13. Review the crawler properties and choose Finish.
    You can run the crawler when it’s ready. It creates three new tables in the database. The following screenshot shows the update you receive that the crawler is complete.
  14. You can now use the Lake Formation console to check the tables are correct. See the following screenshot of the Tables.If you select one of the tables, the table version is now 0. See the following screenshot.You can also perform the same check using the AWS CLI. See the following code:
    $ aws glue get-table-versions \
    --database-name nyctlcdb \
    --table-name trip_data_yellow \
    --query 'TableVersions[*].VersionId' 

    [
        "0"
    ]

  15. Check the parameters in the table metadata to verify which values the crawler generated. See the following code:
    $ aws glue get-table \
    	--database-name nyctlcdb \
    	--name trip_data_yellow \
       	--query 'Table.Parameters' 

    {
        "CrawlerSchemaDeserializerVersion": "1.0",
        "CrawlerSchemaSerializerVersion": "1.0",
        "UPDATED_BY_CRAWLER": "nyc-tlc-db-raw",
        "areColumnsQuoted": "false",
        "averageRecordSize": "144",
        "classification": "csv",
        "columnsOrdered": "true",
        "compressionType": "none",
        "delimiter": ",",
        "objectCount": "1",
        "recordCount": "4771445",
        "sizeKey": "687088084",
        "skip.header.line.count": "1",
        "typeOfData": "file"
    }

  16. Check the metadata attributes for three columns in the same table. This post chooses the following columns because they have different data types, though any other column is suitable for this check. In the following code, the only attributes currently available for the columns are “Name” and “Type:”
    $ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`store_and_fwd_flag`]'

    [
        {
            "Name": "store_and_fwd_flag",
            "Type": "string"
        }
    ]

    $ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`fare_amount`]'

    [
        {
            "Name": "fare_amount",
            "Type": "double"
        }
    ]

    $ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`passenger_count`]'

    [
        {
            "Name": "passenger_count",
            "Type": "bigint"
        }
    ]

You can display the same information via the Lake Formation console. See the following screenshot.

You are now ready to execute the application.

First application execution

Connect to the EMR cluster master node via SSH and run the application with the following code (change the input parameters as needed, especially the value for the s3BucketPrefix parameter):

$ spark-submit \
    --class awsdataprofiler.DataProfilerForAWSGlueDataCatalog \
    --master yarn \
    --deploy-mode cluster \
    --name data-profiler-for-aws-glue-data-catalog \
    /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar \
    --dbName nyctlcdb \
    --region eu-west-1 \
    --compExp true \
    --statsPrefix DQP \
    --s3BucketPrefix deequ-profiler/deequ-profiler-metrics \
    --profileUnsupportedTypes true \
    --noOfBins 30 \
    --quantiles 10

Profiling information in the metadata in the Data Catalog

When the application is complete, you can recheck the metadata via the Lake Formation console for the tables and verify that a new table version was created. See the following screenshot.

You can verify the same information via the AWS CLI. See the following code:

$ aws glue get-table-versions \
    --database-name nyctlcdb \
    --table-name trip_data_yellow \
    --query 'TableVersions[*].VersionId'
[
    "1",
    "0"
]

Check the metadata for the table and verify that the profiling information the application generated was successfully stored. In the following code, the parameter “DQP__Size” was generated, which contains the number of records in the table as calculated by the Deequ library:

$ aws glue get-table \ 
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.Parameters'
{
    "CrawlerSchemaDeserializerVersion": "1.0-",
    "CrawlerSchemaSerializerVersion": "1.0",
    "DQP__Size": "7667793.0",
    "UPDATED_BY_CRAWLER": "nyc-tlc-db-raw",
    "areColumnsQuoted": "false",
    "averageRecordSize": "144",
    "classification": "csv",
    "columnsOrdered": "true",
    "compressionType": "none",
    "delimiter": ",",
    "objectCount": "1",
    "recordCount": "4771445",
    "sizeKey": "687088084",
    "skip.header.line.count": "1",
    "typeOfData": "file"
}

Similarly, you can verify that the metadata for the columns you checked previously contains the profiling information the application generated. This is stored in the “Parameters” object for each column. Each new attribute starts with the string “DQP” as specified in the statsPrefix input parameter. See the following code:

$ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`store_and_fwd_flag`]' 
[
    {
        "Name": "store_and_fwd_flag",
        "Type": "string",
        "Parameters": {
            "DQP__ApproxCountDistinct": "3.0",
            "DQP__Completeness": "1.0",
            "DQP__CountDistinct": "3.0",
            "DQP__Distinctness": "3.912468685578758E-7",
            "DQP__Entropy": "0.03100483390393341",
            "DQP__Histogram.abs.N": "7630142.0",
            "DQP__Histogram.abs.Y": "37650.0",
            "DQP__Histogram.abs.store_and_fwd_flag": "1.0",
            "DQP__Histogram.bins": "3.0",
            "DQP__Histogram.ratio.N": "0.9950897213839758",
            "DQP__Histogram.ratio.Y": "0.004910148200401341",
            "DQP__Histogram.ratio.store_and_fwd_flag": "1.3041562285262527E-7",
            "DQP__MaxLength": "18.0",
            "DQP__MinLength": "1.0",
            "DQP__UniqueValueRatio": "0.3333333333333333",
            "DQP__Uniqueness": "1.3041562285262527E-7"
        }
    }
]
$ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`fare_amount`]'
[
    {
        "Name": "fare_amount",
        "Type": "double",
        "Parameters": {
            "DQP__ApproxCountDistinct": "6125.0",
            "DQP__Completeness": "0.9999998695843771",
            "DQP__Distinctness": "8.187492802687814E-4",
            "DQP__Maximum": "623259.86",
            "DQP__Mean": "12.40940884025023",
            "DQP__Minimum": "-362.0",
            "DQP__StandardDeviation": "262.0720412055651",
            "DQP__Sum": "9.515276582999998E7",
            "DQP__name-0.1": "5.0",
            "DQP__name-0.2": "6.0",
            "DQP__name-0.3": "7.0",
            "DQP__name-0.4": "8.0",
            "DQP__name-0.5": "9.0",
            "DQP__name-0.6": "10.5",
            "DQP__name-0.7": "12.5",
            "DQP__name-0.8": "15.5",
            "DQP__name-0.9": "23.5",
            "DQP__name-1.0": "623259.86"
        }
    }
]

The parameters named “DQP__name-x.x” are the results of the ApproxQuantiles Deequ analyzer for numeric columns; the number of quantiles is set via the –quantiles (-q) input parameter of the application. See the following code:

$ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`passenger_count`]'
[
    {
        "Name": "passenger_count",
        "Type": "bigint",
        "Parameters": {
            "DQP__ApproxCountDistinct": "10.0",
            "DQP__Completeness": "0.9999998695843771",
            "DQP__Distinctness": "1.3041562285262527E-6",
            "DQP__Maximum": "9.0",
            "DQP__Mean": "1.5670782410373156",
            "DQP__Minimum": "0.0",
            "DQP__StandardDeviation": "1.2244305354114957",
            "DQP__Sum": "1.201603E7",
            "DQP__name-0.1": "1.0",
            "DQP__name-0.2": "1.0",
            "DQP__name-0.3": "1.0",
            "DQP__name-0.4": "1.0",
            "DQP__name-0.5": "1.0",
            "DQP__name-0.6": "1.0",
            "DQP__name-0.7": "1.0",
            "DQP__name-0.8": "2.0",
            "DQP__name-0.9": "3.0",
            "DQP__name-1.0": "9.0"
        }
    }
]

Profiling information in Amazon S3

You can now also verify that the profiling information was saved in Parquet format in the S3 bucket you specified in the s3BucketPrefix application input parameter. The following screenshot shows the buckets via the Amazon S3 console.

The data is stored using prefixes that are compatible with Apache Hive partitions. This is useful to optimize performance and costs when you use analytics services like Athena. The partitions are defined on db_name and table_name. The following screenshot shows the details of table_name=trip_data_yellow.

Each execution of the application generates one Parquet file appending data to the metrics table for each physical table.

Second execution after monthly table updates

To run the application after monthly table updates, complete the following steps:

  1. Copy the new files for February 2019 to simulate the March 2 monthly update of the system. See the following code:
    $ DEST_BUCKET=aws-big-data-blog-samples
    $ MONTH=2019-02
     
    $ aws s3 cp "s3://nyc-tlc/trip data/yellow_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-yellow/yellow_tripdata_${MONTH}.csv"
     
    $ aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_${MONTH}" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-fhv/fhv_tripdata_${MONTH}.csv"
     
    $ aws s3 cp "s3://nyc-tlc/trip data/green_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-green/green_tripdata_${MONTH}.csv"

  2. Run the nyc-tlc-db-raw crawler to update the table metadata to include the new files. The following screenshot shows that the three tables were updated successfully.
  3. Check that the crawler created a third version of the table. See the following code:
    $ aws glue get-table-versions \
        --database-name nyctlcdb \
        --table-name trip_data_yellow \
        --query 'TableVersions[*].VersionId'

    [
        "2",
        "1",
        "0"
    ]

  4. Rerun the application to generate the new profiling metadata, entering the same code as before. To keep clean information, before storing new profiling information in the metadata, the application removes all custom attributes starting with the string specified in the “statsPrefix” See the following code:
    $ spark-submit \
        --class awsdataprofiler.DataProfilerForAWSGlueDataCatalog \
        --master yarn \
        --deploy-mode cluster \
        --name data-profiler-for-aws-glue-data-catalog \
        /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar \
        --dbName nyctlcdb \
        --region eu-west-1 \
        --compExp true \
        --statsPrefix DQP \
        --s3BucketPrefix deequ-profiler/deequ-profiler-metrics \
        --profileUnsupportedTypes true \
        --noOfBins 30 \
        --quantiles 10

    Following a successful execution, a new version of the table was created. See the following code:

    $ aws glue get-table-versions \
        --database-name nyctlcdb \
        --table-name trip_data_yellow \
        --query 'TableVersions[*].VersionId'

    [
        "3",
        "2",
        "1",
        "0"
    ]

  5. Check the value of the DQP__Size attribute; its value has changed. See the following code:
    $ aws glue get-table \
        --database-name nyctlcdb \
        --name trip_data_yellow \
        --query Table.Parameters.{'DQP__Size:DQP__Size}'

    {
        "DQP__Size": "1.4687169E7"
    }

  6. Check one of the columns you saw earlier to see the updated profiling properties values. See the following code:
    $ aws glue get-table \
        --database-name nyctlcdb \
        --name trip_data_yellow \
        --query 'Table.StorageDescriptor.Columns[?Name==`store_and_fwd_flag`]'

    [
        {
            "Name": "store_and_fwd_flag",
            "Type": "string",
            "Parameters": {
                "DQP__ApproxCountDistinct": "3.0",
                "DQP__Completeness": "1.0",
                "DQP__CountDistinct": "3.0",
                "DQP__Distinctness": "2.042599223853147E-7",
                "DQP__Entropy": "0.0317381414905775",
                "DQP__Histogram.abs.N": "1.4613018E7",
                "DQP__Histogram.abs.Y": "74149.0",
                "DQP__Histogram.abs.store_and_fwd_flag": "2.0",
                "DQP__Histogram.bins": "3.0",
                "DQP__Histogram.ratio.N": "0.9949513074984022",
                "DQP__Histogram.ratio.Y": "0.005048556328316233",
                "DQP__Histogram.ratio.store_and_fwd_flag": "1.361732815902098E-7",
                "DQP__MaxLength": "18.0",
                "DQP__MinLength": "1.0",
                "DQP__UniqueValueRatio": "0.0",
                "DQP__Uniqueness": "0.0"
            }
        }
    ]

Third execution after monthly tables updates

To run the application a third time, complete the following steps:

  1. Copy the new files for March 2019 to simulate the April 2 monthly update of the system. See the following code:
    $ DEST_BUCKET=aws-big-data-blog-samples
    $ MONTH=2019-03
     
    $ aws s3 cp "s3://nyc-tlc/trip data/yellow_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-yellow/yellow_tripdata_${MONTH}.csv"
     
    $ aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-fhv/fhv_tripdata_${MONTH}.csv"
     
    $ aws s3 cp "s3://nyc-tlc/trip data/green_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-green/green_tripdata_${MONTH}.csv"

  2. Run the nyc-tlc-db-raw crawler to update the table metadata to include the new files. You now have five versions of the table metadata. See the following code:
    $ aws glue get-table-versions \
        --database-name nyctlcdb \
        --table-name trip_data_yellow \
        --query 'TableVersions[*].VersionId'

    [
        "4",
        "3",
        "2",
        "1",
        "0"
    ]

  3. Rerun the application to update the profiling information. See the following code:
    $ spark-submit \
        --class awsdataprofiler.DataProfilerForAWSGlueDataCatalog \
        --master yarn \
        --deploy-mode cluster \
        --name data-profiler-for-aws-glue-data-catalog \
        /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar \
        --dbName nyctlcdb \
        --region eu-west-1 \
        --compExp true \
        --statsPrefix DQP \
        --s3BucketPrefix deequ-profiler/deequ-profiler-metrics \
        --profileUnsupportedTypes true \
        --noOfBins 30 \
        --quantiles 10

  4. Check the DQP__Size parameter to see its new updated value. See the following code:
    $ aws glue get-table \
        --database-name nyctlcdb \
        --name trip_data_yellow \
        --query Table.Parameters.{'DQP__Size:DQP__Size}'

    {
        "DQP__Size": "2.2519715E7"
    }

  5. Check one of the columns you saw earlier to the update profiling properties values. See the following code:
    $ aws glue get-table \
        --database-name nyctlcdb \
        --name trip_data_yellow \
        --query 'Table.StorageDescriptor.Columns[?Name==`store_and_fwd_flag`]' 

    [
        {
            "Name": "store_and_fwd_flag",
            "Type": "string",
            "Parameters": {
                "DQP__ApproxCountDistinct": "3.0",
                "DQP__Completeness": "1.0",
                "DQP__CountDistinct": "3.0",
                "DQP__Distinctness": "1.3321660598280218E-7",
                "DQP__Entropy": "0.030948463301702846",
                "DQP__Histogram.abs.N": "2.2409376E7",
                "DQP__Histogram.abs.Y": "110336.0",
                "DQP__Histogram.abs.store_and_fwd_flag": "3.0",
                "DQP__Histogram.bins": "3.0",
                "DQP__Histogram.ratio.N": "0.9951003376374878",
                "DQP__Histogram.ratio.Y": "0.004899529145906154",
                "DQP__Histogram.ratio.store_and_fwd_flag": "1.3321660598280218E-7",
                "DQP__MaxLength": "18.0",
                "DQP__MinLength": "1.0",
                "DQP__UniqueValueRatio": "0.0",
                "DQP__Uniqueness": "0.0"
            }
        }
    ]

You can view and manage the same values via the Lake Formation console. See the following screenshot of the Edit column section.

Data profiling reporting with Athena and Amazon QuickSight

As demonstrated earlier, the application can save profiling information in Parquet format to an S3 bucket and prefix into db_name and table_name partitions. See the following code:

$ aws s3 ls s3://deequ-profiler/deequ-profiler-metrics/ --recursive
2020-01-28 09:30:12          0 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_fhv/_SUCCESS
2020-01-28 09:17:15       6506 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_fhv/part-00000-760dafb1-fc37-4700-a506-a9dc71b7a745-c000.snappy.parquet
2020-01-28 09:01:19       6498 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_fhv/part-00000-78dd2c4a-83c2-44c4-aa71-30e7a9fb0089-c000.snappy.parquet
2020-01-28 09:30:11       6505 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_fhv/part-00000-cff4f2de-64b4-4338-a0f6-a50ed34a378f-c000.snappy.parquet
2020-01-28 09:30:08          0 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_green/_SUCCESS
2020-01-28 09:01:15       6355 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_green/part-00000-0d5969c9-70a7-4cd4-ac64-8f16e35e23b5-c000.snappy.parquet
2020-01-28 09:17:11       6353 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_green/part-00000-12a7b0b0-6a2a-45d5-a241-645148af41d7-c000.snappy.parquet
2020-01-28 09:30:08       6415 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_green/part-00000-adecccd6-a884-403f-aa80-c574647a10f9-c000.snappy.parquet
2020-01-28 09:29:56          0 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_yellow/_SUCCESS
2020-01-28 09:16:59       6408 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_yellow/part-00000-2e5e3280-29db-41b9-be67-a68ef8cb9777-c000.snappy.parquet
2020-01-28 09:01:02       6424 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_yellow/part-00000-c4972037-7d3c-4279-8b77-361741133816-c000.snappy.parquet
2020-01-28 09:29:55       6398 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_yellow/part-00000-f2d6076e-7019-4b03-97ba-a6aab8a677b8-c000.snappy.parquet

The application generates one Parquet file per execution.

Preparing metadata for profiler metrics data

To prepare the metadata for profiler metrics data, complete the following steps:

  1. On the Lake Formation console, create a new database with the name deequprofilerdb to contain the metadata.
  2. On the AWS Glue console, create a new crawler with the name deequ-profiler-metrics to infer the schema of the profiling information stored in Amazon S3.

The following screenshot shows the properties of the new crawler.

After you run the crawler, one table with the name deequ_profiler_metrics was created in the database. The table has the following columns.

NameData TypePartitionDescription
instancestringColumn name the statistic in column “name” refers to. Set to “*” if entity is “Dataset”.
entitystringEntity the statistic refers to. Valid values are “Column” and “Dataset”.
namestringMetrics name, derived from the Deequ Analyzer used for the calculation.
valuedoubleValue of the metric.
typestringData type of the column if entity is “Column”, blank otherwise.
db_name_embedstringDatabase name, same values as in partition “db_name”.
table_name_embedstringTable name, same values as in partition “table_name”.
profiler_run_dtdateDate the profiler application was run.
profiler_run_tstimestampDate/time the profile application was run; it can also be used as execution identifier.
db_namestring1Database name.
table_namestring2Table name.

Reporting with Athena

You can use Athena to run a query that checks the statistics for a column in the database for the execution you ran in March 2019. See the following code:

SELECT db_name, 
    profiler_run_dt,
    profiler_run_ts,
    table_name, 
    entity,
    instance,
    name,
    value
FROM "deequprofilerdb"."deequ_profiler_metrics" 
WHERE db_name = 'nyctlcdb' AND
    table_name = 'trip_data_yellow' AND 
    entity = 'Column' AND
    instance = 'extra' AND
    profiler_run_dt = date_parse('2019-03-02','%Y-%m-%d')
ORDER BY name
;

The following screenshot shows the query results.

Reporting with Amazon QuickSight

To create a dashboard in Amazon QuickSight based on the profiling metrics data the application generated, complete the following steps:

  1. Create a new QuickSight dataset called deequ_profiler_metrics with Athena as the data source.
  2. In the Choose your table section, select the profiling metrics table that you created earlier.
  3. Import the data into SPICE.

After you create the dataset, you can view it and edit its properties. For this post, leave the properties unchanged.

You are now ready to build visualizations and dashboards.

The following images in this section show a simple analysis with controls that allow for the selection of the Database, Table profiled, Entity, Column, and Profiling Metric.

Control NameMapped Column
Databasedb_name
Tabletable_name
Entityentity
Columninstance
Profiling Metricname

For more information about adding controls, see Create Amazon QuickSight dashboards that have impact with parameters, on-screen controls, and URL actions.

For example, you can select the Size metric of a specific table to see how many records are available in the table after each monthly load. See the following screenshot.

Similarly, you can use the same analysis to see how a specific metric changes over time for a column. The following screenshot shows that the mean of the fare_amount column changes after each monthly load.

You can select any metric calculated on any column, which makes for a very flexible profiling data reporting system.

Conclusion

This post demonstrated how to extend the metadata contained in the Data Catalog with profiling information calculated with an Apache Spark application based on the Amazon Deequ library running on an EMR cluster.

You can query the Data Catalog using the AWS CLI. You can also build a reporting system with Athena and Amazon QuickSight to query and visualize the data stored in Amazon S3.

Special thanks go to Sebastian Schelter at Amazon Search and Sven Hansen and Vincent Gromakowski at AWS for their help and support

 


About the Author

Francesco Marelli is a senior solutions architect at Amazon Web Services. He has lived and worked in London for 10 years, after that he has worked in Italy, Switzerland and other countries in EMEA. He is specialized in the design and implementation of Analytics, Data Management and Big Data systems, mainly for Enterprise and FSI customers. Francesco also has a strong experience in systems integration and design and implementation of web applications. He loves sharing his professional knowledge, collecting vinyl records and playing bass.

Monitor and optimize queries on the new Amazon Redshift console

Post Syndicated from Debu Panda original https://aws.amazon.com/blogs/big-data/monitor-and-optimize-queries-on-the-new-amazon-redshift-console/

Tens of thousands of customers use Amazon Redshift to power their workloads to enable modern analytics use cases, such as Business Intelligence, predictive analytics, and real-time streaming analytics. As an administrator or data engineer, it’s important that your users, such as data analysts and BI professionals, get optimal performance. You can use the Amazon Redshift console to monitor and diagnose query performance issues.

The Amazon Redshift console features a monitoring dashboard and updated flows to create, manage, and monitor Amazon Redshift clusters. For more information, see Simplify management of Amazon Redshift clusters with the Redshift console.

This post discusses how you can use the new Amazon Redshift console to monitor your user queries, identify slow queries, and terminate runaway queries. The post also reviews details such as query plans, execution details for your queries, in-place recommendations to optimize slow queries, and how to use the Advisor recommendations to improve your query performance.

User query vs. rewritten query

Any query that users submit to Amazon Redshift is a user query. Analysts either author a user query or a BI tool such as Amazon QuickSight or Tableau generates the query. Amazon Redshift typically rewrites queries for optimization purposes. It can rewrite a user query into a single query or break it down into multiple queries. These queries are rewritten queries.

The following steps are performed by Amazon Redshift for each query:

  1. The leader node receives and parses the query.
  2. The parser produces an initial query tree, which is a logical representation of the original query. Amazon Redshift inputs this query tree into the query optimizer.
  3. The optimizer evaluates and, if necessary, rewrites the query to maximize its efficiency. This process sometimes results in creating multiple queries to replace a single query.

The query rewrite is done automatically and is transparent to the user.

Query monitoring with the original Amazon Redshift console and system tables

Previously, you could monitor the performance of rewritten queries in the original Amazon Redshift console or system tables. However, it was often challenging to find the SQL your users submitted.

The following table shows the comparison of query monitoring differences between the original Amazon Redshift console, system tables, and the new console.

Original ConsoleSystem TablesNew Console

•  User query not supported

•  Monitor only rewritten queries

•  Shows only top 100 queries

•  User query not available

•  Rewritten queries

•  All rewritten queries

•  Supports user queries

•  Shows all queries available in system tables

•  Allows you to correlate rewritten queries with user queries

The new console simplifies monitoring user queries and provides visibility to all query monitoring information available in the system.

Monitoring and diagnosing queries

The Amazon Redshift console provides information about the performance of queries that run in the cluster. You can use this information to identify and diagnose queries that take a long time to process and create bottlenecks that prevent other queries from executing efficiently.

The following table shows some of the common questions you may have when monitoring, isolating, and diagnosing query performance issues.

MonitorIsolate and DiagnoseOptimize
How is my cluster doing in terms of query performance and resource utilization?A user complained about performance issues at a specific time. How do I identify that SQL and diagnose problems?How can I optimize the SQL that our end-users author?
How is my cluster throughput, concurrency, and latency looking?

Which other queries were running when my query was slow? Were all queries slow?

 

Is there any optimization required in my schema design?
Are queries being queued in my cluster?Is my database overloaded with queries from other users? Is my queue depth increasing or decreasing?Is there any tuning required for my WLM queues?
Which queries or loads are running now?How do I identify queries that a specific user runs?Can I get any benefit if I enable concurrency scaling?
Which queries or loads are taking longer than usual timing?The resources of my cluster are running very high. How do I find out which queries are running?
What are my top queries by duration in the last hour or last 24 hours?
Which queries have failed?
Is the average query latency for my cluster increasing or decreasing over time?

You can answer these questions by either using the Amazon Redshift console or developing scripts using the system catalog.

Monitoring queries

You can monitor your queries on the Amazon Redshift console on the Queries and loads page or on the Query monitoring tab on the Clusters page. While both options are similar for query monitoring, you can quickly get to your queries for all your clusters on the Queries and loads page. You have to select your cluster and period for viewing your queries.

You can view the queries using List view on the Query monitoring tab on the Clusters page. The query monitoring page visually shows the queries in a Gantt chart.

Each bar represents a user query, and the length of the bar represents runtime for a query. The X-axis shows the selected period, and the location of the bar indicates when a query started and ended. The queries include both standard SQL statements such as SELECT, INSERT, and DELETE, and loads such as COPY commands.

Monitoring top queries

By default, the Query monitoring page shows the top 100 longest queries by runtime or duration for the selected time window. You can change the time window to view the top queries for that period. The top queries also include completed queries and running queries. The completed queries are sorted by descending order of query runtime or duration.

Identifying running queries

You can find out your running queries by choosing Running queries from the drop-down menu.

To see the query’s details such as SQL text, runtime details, related rewritten queries, and execution details, choose the query ID.

The Duration column shows the estimated duration and runtime for a query. You can terminate a query by selecting the query and choosing Terminate query. You need the have the  redshift:CancelQuerySession action added to your IAM policy to cancel a query.

Viewing loads

As a data engineer or Redshift administrator, ensuring that your load jobs complete correctly and meet required performance SLAs is a major priority.

You can view all your load jobs by choosing Loads from the drop-down menu on the Query monitoring page. You can then zoom in on the desired time window.

The preceding Gantt chart shows all loads completed successfully. The query status indicates if the load failed or if an administrator terminated it.

Identifying failed queries

You can identify failed queries by choosing Failed or stopped queries from the drop-down menu on the Query monitoring page and then zooming in on the desired time.

The query page shows 50 queries by default, and you have to paginate to view more results. You can change the page size by choosing the settings gear icon.

In the Preferences section, you can customize what fields you want to see on the Queries and loads list. For example, you can see the PID and not the transaction ID. These changes persist across browser sessions.

Monitoring long-running queries

Amazon Redshift categorizes queries if a query or load runs more than 10 minutes. You can filter long-running queries by choosing Long queries from the drop-down menu. Similarly, you can also filter medium and short queries.

Isolating problematic queries

The following section looks at some use cases in which you use the console to diagnose query performance issues.

Query performance issues

For this use case, a user complains that their queries as part of the dashboards are slow, and you want to identify the associated queries. These queries might not be part of the top queries. To isolate these queries, you can either choose Completed queries or All queries from the drop-down menu and specify the time window by choosing Custom.

You can also drill down to view the queries in a specific period, or filter for queries from one particular user by searching their user name.

You can also filter your queries by searching SQL query text.

As with the earlier charts, the size of a bar represents a relative duration of the runtime for a query. In this period, the highlighted query is the slowest. If you mouse over a bar in the Gantt chart, it provides helpful information about the query such as query ID, part of the query text, and runtime. To view details about a specific query, choose Query ID.

Identifying systemic query performance problems

For this use case, many of your users are complaining about longer-than-normal query runtimes. You want to diagnose what is happening in your cluster. You can customize your time and switch to the graph view, which helps you to correlate longer runtimes with what is happening in the cluster. As the following Gantt chart and CPU utilization graph shows, many queries were running at that time, and CPU utilization almost reached 100%.

The concurrency scaling feature of Amazon Redshift could have helped maintain consistent performance throughput the workload spike.

High CPU utilization

You can correlate query performance with cluster performance and highlight on a given metric such as CPU utilization, which shows you which queries were running at that time.

Monitoring workload performance

You can get a detailed view of your workload’s performance by looking at the Workload execution breakdown chart. You can find out how long it took to plan, wait, and execute your workload. You can also view time spent in operations such as INSERT, UPDATE, DELETE, COPY, UNLOAD, or CTAS. The chosen time in the query history is stored when you navigate between pages.

In the preceding screenshot, you can see several waits in the workload breakdown graph. You can take advantage of concurrency scaling to process a burst of queries.

Correlating query throughput with query duration

You can view the trend of the performance of your queries, such as duration or execution time for your long, medium, and short queries, and correlate with the query throughput.

This information can offer insight into how well the cluster serves each query category with its current configuration.

Monitoring workload for your WLM queues

You can correlate query performance with cluster performance and highlight a given metric such as CPU utilization to see which queries were running at that time. You can view the average throughput, average duration, and average queue time by different WLM queues. Insight from this graph might help you tune your queries; for example, by assigning the right priority for your WLM queue or enabling concurrency scaling for your WLM queue.

Diagnosing and optimizing query performance

After you isolate a slow query, you can drill down to the execution details of the query by choosing Query ID. The following screenshot shows multiple query IDs for a query that has been rewritten to multiple queries.

The Query details page shows you the parent query and all rewritten queries.

You can also find out whether any of the rewritten queries ran on a concurrency scaling cluster.

You can view the query plans, execution statistics such as the cost of each step of the plan, and data scanned for the query.

You can also view the cluster metrics at the time the query ran on the cluster. The following screenshot shows the problematic steps for your query plan. Choosing a problematic step reveals in-place recommendations to improve this query.

Implementing Advisor recommendations

Amazon Redshift Advisor provides recommendations that could improve workload performance. Amazon Redshift uses machine learning to look at your workload and provide customized recommendations. Amazon Redshift monitors and offers guidance for improved performance on the following crucial areas:

  • Short query acceleration (SQA) – Checks for query patterns and reports the number of recent queries in which you can reduce latency and the daily queue time for SQA-eligible queries by enabling SQA, thus improving your query performance
  • Sort key for tables – Analyzes the workload for your data warehouse over several days to identify a beneficial sort key for your tables and makes sort key recommendations
  • Distribution key for tables – Analyzes your workload to identify the most appropriate distribution key for tables that can significantly benefit from a key distribution style

The following screenshot shows a recommendation to alter the distribution key for the table.

Enabling concurrency scaling

To deliver optimal performance for your users, you can monitor user workloads and take action if you diagnose a problem. You can drill down to the query history for that specific time, and see several queries running at that time.

If you aren’t using concurrency scaling, your queries might be getting queued. You can also see that on the Workload concurrency tab. In the following screenshot, you can see that many queries are queued during that time because you didn’t enable concurrency scaling.

You can monitor all submitted queries and enable concurrency scaling when queued queries are increasing.

View a demo of Query Monitoring to learn more about the feature:

Conclusion

This post showed you the new features in the Amazon Redshift console that allow you to monitor user queries and help you diagnose performance issues in your user workload. The console also allows you to view your top queries by duration, filter failed, and long-running queries, and help you drill down to view related rewritten queries and their execution details, which you can use to tune your queries. Start using the query monitoring features of the new Amazon Redshift console to monitor your user workload today!

 


About the Authors

Debu Panda, a senior product manager at AWS, is an industry leader in analytics, application platform, and database technologies. He has more than 20 years of experience in the IT industry and has published numerous articles on analytics, enterprise Java, and databases and has presented at multiple conferences. He is lead author of the EJB 3 in Action (Manning Publications 2007, 2014) and Middleware Management (Packt).

 

 

Apurva Gupta is a user experience designer at AWS. She specializes in databases, analytics and AI solutions. Previously, she has worked with companies both big and small leading end-to-end design and helping teams set-up design-first product development processes, design systems and accessibility programs. 

 

 

 

Chao Duan is a software development manager at Amazon Redshift, where he leads the development team focusing on enabling self-maintenance and self-tuning with comprehensive monitoring for Redshift. Chao is passionate about building high-availability, high-performance, and cost-effective database to empower customers with data-driven decision making.

 

 

 

Sudhakar Reddy is a full stack software development engineer with Amazon Redshift. He is specialized in building cloud services and applications for Big data, Databases and Analytics. 

 

 

 

 

Zayd Simjee is a software development engineer with Amazon Redshift.

Deploy an Amazon EMR edge node with RStudio using AWS Systems Manager

Post Syndicated from Randy DeFauw original https://aws.amazon.com/blogs/big-data/deploy-an-amazon-emr-edge-node-with-rstudio-using-aws-systems-manager/

RStudio is an integrated development environment (IDE) for R, a language and environment for statistical computing and graphics. As a data scientist, you may integrate R and Spark (a big data processing framework) to analyze large datasets. You can use an R package called sparklyr to offload filtering and aggregation of large datasets from your R script to Spark and use R’s native strength to further analyze and visualize the results from Spark.

An R script running in RStudio uses sparklyr to submit Spark jobs to the cluster. Typically, an R script (along with sparklyr) runs in an RStudio environment that is installed on a machine that’s separate from the cluster of machines (in Amazon EMR) that runs Spark. To enable sparklyr to submit Spark jobs, you need to establish network connectivity between the RStudio machine and the cluster running Spark. One way to do that is to run RStudio on an edge node, which is a machine that is part of the cluster’s private network and runs client applications like RStudio. Edge nodes let you run client applications separately from the nodes that run the core Hadoop services. Edge nodes also offer convenient access to local Spark and Hive shells.

However, edge nodes are not easy to deploy. They must have the same versions of Hadoop, Spark, Java, and other tools as the Hadoop cluster, and require the same Hadoop configuration as nodes in the cluster.

This post demonstrates an automated way to create an edge node with RStudio installed using AWS Systems Manager.

Deploying an edge node for an EMR cluster

One method to deploy an edge node involves creating an Amazon EC2 AMI directly from the EMR master node. For more information, see Launch an edge node for Amazon EMR to run RStudio. This post offers an SSM automation document that simplifies on-demand edge node deployment. Systems Manager gives you visibility and control of your AWS infrastructure, and Systems Manager Automation lets you safely automate common and repetitive tasks, like creating edge nodes on demand.

This post walks you through the process of installing the SSM document and how to use the document to create an edge node. For more information about the code, see the GitHub repo.

Creating the automation document

First, you use Terraform to create the automation document. You can download Terraform from the Terraform website. Alternatively, AWS CloudFormation works equally well.

After you install Terraform, go to the directory where you cloned the repo and edit the file vars.tf. For more information, see the GitHub repo. This file defines several input parameters, and the comments in the file should be self-explanatory. You can provide default values in vars.tf or override using one of the other supported techniques. For more information, see Input Variables on the Terraform website.

Next, enter the following code:

tf init # one time only
tf apply

The code runs a Terraform plan to create the document. Your environment should already be configured to access your AWS account with privileges to do the following:

To make updates to the Terraform plan going forward, use Terraform’s shared state feature. For more information, see Remote State on the Terraform website.

The Terraform plan loads your automation document from a local template file and registers it with Systems Manager. See the following code:

# Load our document from a template and substitute some variables.
data "template_file" "ssm_doc_edge_node" {
  template = "${file("${path.module}/ssm_doc_edge_node.tpl")}"

  vars = {
    SSMRoleArn = "${aws_iam_role.ssm_automation_role.arn}"
    InstanceProfileArn = "${aws_iam_instance_profile.edge_node_profile.arn}"
    PlaybookUrl = "s3://${var.bucket}/init.yaml"
    Environment = "${var.environment}"
    Project = "${var.ProjectTag}"
    region = "${var.region}"
  }
}

# Register the document content with SSM
resource "aws_ssm_document" "create_edge_node" {
  name          = "create_edge_node"
  document_type = "Automation"
  document_format = "YAML"
  tags = {
    Name = "create_edge_node"
    Project = "${var.ProjectTag}"
    Environment = "${var.environment}"
  }

  content = "${data.template_file.ssm_doc_edge_node.rendered}"
}

The rest of the Terraform plan does the following:

  • Uploads an Ansible template for the SSM document to use
  • Sets up IAM roles and policies that let Systems Manager and a new edge node assume the correct privileges

What’s in the automation document?

The automation document has three main steps. First, it creates and launches a new AMI from the existing EMR master node. See the following code:

- name: create_ami
  action: aws:createImage
  maxAttempts: 1
  timeoutSeconds: 1200
  onFailure: Abort
  inputs:
    InstanceId: "{{MasterNodeId}}"
    ImageName: AMI Created on{{global:DATE_TIME}}
    NoReboot: true
    
- name: launch_ami
  action: aws:runInstances
  maxAttempts: 1
  timeoutSeconds: 1200
  onFailure: Abort
  inputs:
    ImageId: "{{create_ami.ImageId}}"
  ...

Next, it updates the SSM agent and runs an Ansible playbook to install RStudio. You can examine the Ansible playbook in GitHub; it installs RStudio and dependencies and handles some initial configuration. See the following code:

- name: updateSSMAgent
  action: aws:runCommand
  inputs:
    DocumentName: AWS-UpdateSSMAgent
    InstanceIds:
    - "{{launch_ami.iid}}"
- name: installPip
  action: aws:runCommand
  inputs:
    DocumentName: AWS-RunShellScript
    InstanceIds:
    - "{{launch_ami.iid}}"
    Parameters:
        commands: 
        - pip install ansible boto3 botocore
- name: runPlaybook
  action: aws:runCommand
  inputs:
    DocumentName: AWS-RunAnsiblePlaybook
    InstanceIds:
    - "{{launch_ami.iid}}"
    Parameters:
      playbookurl: "${PlaybookUrl}"

Finally, it adds an Amazon CloudWatch alarm to trigger EC2 instance recovery if the edge node fails. See the following code:

- name: add_recovery
  action: aws:executeAwsApi
  inputs:
    Service: cloudwatch
    Api: PutMetricAlarm
    AlarmName: "Recovery for edge node {{ launch_ami.iid }}"
    ActionsEnabled: true
    AlarmActions: 
    - "arn:aws:automate:${region}:ec2:recover"

Using the automation document

To start using the automation document, complete the following steps:

  1. On the Systems Manager console, choose Automation.
  2. Choose Execute automation.
  3. On the Owned by me tab, choose the document create_edge_node.
  4. Choose Next.

    On the next page, you need to fill in three pieces of information. You may want to get some advice from your cloud operations team, or whomever manages your EMR clusters. For instructions on creating a cluster with the latest EMR version and Spark, see Launch Your Sample Amazon EMR Cluster.
  5. In the Input parameters section, provide the following information:
    • For MasterNodeId, enter the EC2 instance ID of the master node of the EMR cluster you want to connect to.In most cases, your operations team can provide this information, but you can also find the instance ID by going to the Hardware tab of your EMR cluster and drilling into the master node group. Your EMR cluster must have Spark installed because you want to use sparklyr with RStudio.The following screenshot shows where to find your EC2 instance ID on the Hardware tab.
    • For SubnetId, enter the subnet that the edge node should live in. Your operations team should provide this information, or you can see it on the Summary tab of the EMR cluster. The edge node must live in the same VPC as the cluster. It does not need to be in a public subnet because you connect via Session Manager.The following screenshot shows where to find your subnet ID on the Summary tab of your cluster.
    • For QuickIdentifier, enter a user-friendly name to help you remember this edge node; for example, Edge Node with RStudio.

When the execution is finished, you will see the completed steps, as in the following screenshot.

If you choose the last step in the list (step 8), you see the DNS name and EC2 instance ID for your new edge node. See the following screenshot.

You can now connect to this edge node by using another feature of Systems Manager: Session Manager. Session Manager lets you open an SSH tunnel for port forwarding without having to use SSH keys or expose the SSH port to the internet. For instructions on opening a port forwarding session, see Starting a Session (Port Forwarding). You need the Session Manager plugin installed locally. See the following code:

aws ssm start-session \
  --target instance-id \ # Get this from the output of the automation document
  --document-name AWS-StartPortForwardingSession \
  --parameters '{"portNumber":["8787"], "localPortNumber":["8787"]}'

For more information, see Install the Session Manager Plugin for the AWS CLI.

You can now access RStudio at http://localhost:8787. See the following screenshot.

You can also access the node directly and use the local Hive and Spark shells through the Session Manager console.

This post sets up the SSM document to create single-user edge nodes. The default user name to log in to RStudio is ruser. You must set the password by changing the password for the ruser account directly in the operating system, because RStudio uses PAM authentication by default. For more information, see What is my username on my RStudio Server? To change the password, open another Session Manager session and enter the following code:

aws ssm start-session \
  --target instance-id \ # Get this from the output of the automation document

$ sudo passwd ruser # Enter a password of your choice and confirm it

You should keep any valuable files like R scripts in a GitHub repo and store any output data in an S3 bucket for long-term persistence.

Configuring security

The Terraform plan sets up three important IAM roles:

  • A role that Systems Manager assumes when running the automation document. This role needs to perform actions in Amazon EC2, like creating new AMIs, CloudWatch, and Systems Manager.
  • An EC2 instance profile for the edge nodes. Theprofile has the permissions necessary for the SSM agent to run and for the edge node to perform tasks typical of an EMR node.
  • A role for CloudWatch to perform instance recovery.

In your environment, you may want to review the IAM roles and policies and tighten their scope based on tags or other conditions.

Conclusion

This post described an automated way to deploy an EMR edge node with RStudio using an SSM document. EMR edge nodes with RStudio give you a familiar working environment with access to large datasets via Spark and sparklyr. For information about deploying a new edge node and installing the necessary Hadoop libraries with an AWS CloudFormation template, see Launch an edge node for Amazon EMR to run RStudio.

 


About the Author

Randy DeFauw is a principal solutions architect at Amazon Web Services. He works with the AWS customers to provide guidance and technical assistance on database projects, helping them improve the value of their solutions when using AWS.

 

 

Simplify data pipelines with AWS Glue automatic code generation and Workflows

Post Syndicated from Mohit Saxena original https://aws.amazon.com/blogs/big-data/simplify-data-pipelines-with-aws-glue-automatic-code-generation-and-workflows/

In the previous post of the series, we discussed how AWS Glue job bookmarks help you to incrementally load data from Amazon S3 and relational databases. We also saw how using the AWS Glue optimized Apache Parquet writer can help improve performance and manage schema evolution.

In the third post of the series, we’ll discuss three topics. First, we’ll look at how AWS Glue can automatically generate code to help transform data in common use cases such as selecting specific columns, flattening deeply nested records, efficiently parsing nested fields, and handling column data type evolution.

Second, we’ll outline how to use AWS Glue Workflows to build and orchestrate data pipelines using different Glue components such as Crawlers, Apache Spark and Python Shell ETL jobs.

Third, we’ll see how to leverage SparkSQL in your ETL jobs to perform SQL based transformations on datasets stored in Amazon S3 and relational databases.

Automatic Code Generation & Transformations: ApplyMapping, Relationalize, Unbox, ResolveChoice

AWS Glue can automatically generate code to help perform a variety of useful data transformation tasks. These transformations provide a simple to use interface for working with complex and deeply nested datasets. For example, some relational databases or data warehouses do not natively support nested data structures. AWS Glue can automatically generate the code necessary to flatten those nested data structures before loading them into the target database saving time and enabling non-technical users to work with data.

The following is a list of the popular transformations AWS Glue provides to simplify data processing:

  1. ApplyMapping is a transformation used to perform column projection and convert between data types. In this example, we use it to unnest several fields, such as action.id, which we map to the top-level action.id field. We also cast the id column to a long.
    medicare_output = medicare_src.apply_mapping(
        [('id, 'string', id, 'string'), 
        ('type, string, type', string),
        ('actor.id, 'int', actor.id', int),
        ('actor.login', 'string', actor.login', 'string'),
        ('actor.display_login', 'string', 'actor.display_login', 'string'),
        ('actor.gravatar_id', 'long', 'actor.gravatar_id', 'long'),
        ('actor.url', 'string','actor.url', 'string'),
        ('actor.avatar_url', 'string', 'actor.avatar_url', string)]
    )

  1. Relationalize converts a nested dataset stored in a DynamicFrameto a relational (rows and columns) format. Nested structures are unnested into top level columns and arrays decomposed into different tables with appropriate primary and foreign keys inserted. The result is a collection of DynamicFrames representing a set of tables that can be directly inserted into a relational database. More detail about relationalize can be found here.
    ## An example relationalizing and writing to Redshift
    dfc = history.relationalize("hist_root", redshift_temp_dir)
    ## Cycle through results and write to Redshift.
    for df_name in dfc.keys():
        df = dfc.select(df_name)
        print "Writing to Redshift table: ", df_name, " ..."
        glueContext.write_dynamic_frame.from_jdbc_conf(frame = df, 
            catalog_connection = "redshift3", 
            connection_options = {"dbtable": df_name, "database": "testdb"}, 
            redshift_tmp_dir = redshift_temp_dir)

  2. Unbox parses a string field of a certain type, such as JSON, into individual fields with their corresponding data types and store the result in a DynamicFrame. For example, you may have a CSV file with one field that is in JSON format {“a”: 3, “b”: “foo”, “c”: 1.2}. Unbox will reformat the JSON string into three distinct fields: an int, a string, and a double. The Unbox transformation is commonly used to replace costly Python User Defined Functions required to reformat data that may result in Apache Spark out of memory exceptions. The following example shows how to use Unbox:
    df_result = df_json.unbox('json', "json")

  3. ResolveChoice: AWS Glue Dynamic Frames support data where a column can have fields with different types. These columns are represented with Dynamic Frame’s choice type. For example, Dynamic Frame schema for the medicare dataset shows up as follows:
    root
     |-- drg definition: string
     |-- provider id: choice
     |    |-- long
     |    |-- string
     |-- provider name: string
     |-- provider street address: string

    This is because the “provider id” column could either be a long or string type. The Apache Spark Dataframe considers the whole dataset and is forced to cast it to the most general type, namely string. Dynamic Frames allow you to cast the type using the ResolveChoice transform. For example, you can cast the column to long type as follows.

    medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')])
    
    medicare_res.printSchema()
     
    root
     |-- drg definition: string
     |-- provider id: long
     |-- provider name: string
     |-- provider street address: string

    This transform would also insert a null where the value was a string that could not be cast. As a result, the records with string type casted to null values can also be identified now. Alternatively, the choice type can also be cast to struct, which keeps values of both types.

Build and orchestrate data pipelines using AWS Glue Workflows

AWS Glue Workflows provide a visual tool to author data pipelines by combining Glue crawlers for schema discovery, and Glue Spark and Python jobs to transform the data. Relationships can be defined and parameters passed between task nodes to enable users to build pipelines of varying complexity. Workflows can be scheduled to run on a schedule or triggered programmatically. You can track the progress of each node independently or the entire workflow making it easier to troubleshoot your pipelines.

A typical workflow for ETL workloads is organized as follows:

  1. Glue Python command triggered manually, on a schedule, or on an external CloudWatch event. It would pre-process or list the partitions in Amazon S3 for a table under a base location. For example, a CloudTrail logs partition to process could be: s3://AWSLogs/ACCOUNTID/CloudTrail/REGION/YEAR/MONTH/DAY/HOUR/.The Python command can list all the regions and schedule crawlers to create different Glue Data Catalog tables on each region.
  2. Glue Crawlers triggered next to populate new partitions for every hour in Glue Data Catalog for recently ingested in Amazon S3.
  3. Concurrent Glue ETL jobs triggered to separately filter and process each partition or a group of partitions. For example, CloudTrail events corresponding to the last week can be read by a Glue ETL job by passing in the partition prefix as Glue job parameters and using Glue ETL push down predicates to just read all the partitions in that prefix.Partitioning and orchestrating concurrent Glue ETL jobs allows you to scale and reliably execute individual Apache Spark applications by processing only a subset of partitions in the Glue Data Catalog table. The transformed data can then be concurrently written back by all individual Glue ETL jobs to a common target table in Amazon S3 data lake, AWS Redshift or other databases.

Finally, a Glue Python command can be triggered to capture the completion status of the different Glue entities including Glue Crawlers, parallel Glue ETL jobs; and post-process or retry any failed components.

Executing SQL using SparkSQL in AWS Glue

AWS Glue Data Catalog as Hive Compatible Metastore

The AWS Glue Data Catalog is a managed metadata repository compatible with the Apache Hive Metastore API. You can follow the detailed instructions here to configure your AWS Glue ETL jobs and development endpoints to use the Glue Data Catalog. You also need to add the Hive SerDes to the class path of AWS Glue Jobs to serialize/deserialize data for the corresponding formats. You can then natively run Apache Spark SQL queries against your tables stored in the Data Catalog.

The following example assumes that you have crawled the US legislators dataset available at s3://awsglue-datasets/examples/us-legislators. We’ll use the Spark shell running on AWS Glue developer endpoint to execute SparkSQL queries directly on the legislators’ tables cataloged in the AWS Glue Data Catalog.

>>> spark.sql("use legislators")
DataFrame[]
>>> spark.sql("show tables").show()
+-----------+------------------+-----------+
|   database|         tableName|isTemporary|
+-----------+------------------+-----------+
|legislators|        areas_json|      false|
|legislators|    countries_json|      false|
|legislators|       events_json|      false|
|legislators|  memberships_json|      false|
|legislators|organizations_json|      false|
|legislators|      persons_json|      false|

>>> spark.sql("select distinct organization_id from memberships_json").show()
+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+

A similar approach to the above would be to use AWS Glue DynamicFrame API to read the data from S3. The DynamicFrame is then converted to a Spark DataFrame using the toDF method. Next, a temporary view can be registered for DataFrame, which can be queried using SparkSQL. The key difference between the two approaches is the use of Hive SerDes for the first approach, and native Glue/Spark readers for the second approach. The use of native Glue/Spark provides the performance and flexibility benefits such as computation of the schema at runtime, schema evolution, and job bookmarks support for Glue Dynamic Frames.

>>> memberships = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="memberships_json")
>>> memberships.toDF().createOrReplaceTempView("memberships")
>>> spark.sql("select distinct organization_id from memberships").show()
+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+

Workflows and S3 Consistency

If you have a workflow of external processes ingesting data into S3, or upstream AWS Glue jobs generating input for a table used by downstream jobs in a workflow, you can encounter the following Apache Spark errors.

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 16.0 failed 4 times, most recent failure: Lost task 10.3 in stage 16.0 (TID 761, ip-<>.ec2.internal, executor 1): 
java.io.FileNotFoundException: No such file or directory 's3://<bucket>/fileprefix-c000.snappy.parquet'
It is possible the underlying files have been updated.
You can explicitly invalidate the cache in Spark by running 
'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

These errors happen when the upstream jobs overwrite to the same S3 objects that the downstream jobs are concurrently listing or reading. This can also happen due to eventual consistency of S3 resulting in overwritten or deleted objects get updated at a later time when the downstream jobs are reading. A common manifestation of this error occurs when you are create a SparkSQL view and execute SQL queries in the downstream job. To avoid these errors, the best practice is to set up a workflow with upstream and downstream jobs scheduled at different times, and read/write to different S3 partitions based on time.

You can also enable the S3-optimized output committer for your Glue jobs by passing in a special job parameter: “–enable-s3-parquet-optimized-committer” set to true. This committer improves application performance by avoiding list and rename operations in Amazon S3 during job and task commit phases. It also avoids issues that can occur with Amazon S3’s eventual consistency during job and task commit phases, and helps to minimize task failures.

Conclusion

In this post, we discussed how to leverage the automatic code generation process in AWS Glue ETL to simplify common data manipulation tasks such as data type conversion and flattening complex structures. We also explored using AWS Glue Workflows to build and orchestrate data pipelines of varying complexity. Lastly, we looked at how you can leverage the power of SQL, with the use of AWS Glue ETL and Glue Data Catalog, to query and transform your data.

In the final post, we will explore specific capabilities in AWS Glue and best practices to help you better manage the performance, scalability and operation of AWS Glue Apache Spark jobs.

 


About the Authors

Mohit Saxena is a technical lead manager at AWS Glue. His passion is building scalable distributed systems for efficiently managing data on cloud. He also enjoys watching movies, and reading about the latest technology.

 

 

Ingest streaming data into Amazon Elasticsearch Service within the privacy of your VPC with Amazon Kinesis Data Firehose

Post Syndicated from Tarik Makota original https://aws.amazon.com/blogs/big-data/ingest-streaming-data-into-amazon-elasticsearch-service-within-the-privacy-of-your-vpc-with-amazon-kinesis-data-firehose/

Today we are adding a new Amazon Kinesis Data Firehose feature to set up VPC delivery to your Amazon Elasticsearch Service domain from the Kinesis Data Firehose. If you have been managing a custom application on Amazon Kinesis Data Streams to keep traffic private, you can now use Kinesis Data Firehose and load your data into an Amazon Elasticsearch Service endpoint in a VPC without having to invest, operate, and scale ingestion and delivery infrastructure. You can start using this new feature from Kinesis Data Firehose console, AWS CLI, and API by selecting Amazon Elasticsearch Service as the destination, the specific domain with VPC access, and setting the VPC configuration with subnets and the optional security groups.

Before this feature

Amazon Elasticsearch Service domains can have public or private endpoints. Public endpoints are backed by IP addresses on the public internet. Private endpoints are backed by IP addresses within the IP space of your VPC.

If you have been using an Amazon Elasticsearch Service VPC endpoint, you most likely use Kinesis Data Streams or similar soultion to ingest streaming data. This means running a custom application on the stream that delivers it to the Amazon Elasticsearch Service VPC domain. You likely had to perform the following actions:

  • Implement buffering
  • Format conversions
  • Perform compression
  • Apply transformation
  • Manage backup
  • Handle transient delivery failures

Additionally, you have to build, scale, monitor, update, and maintain this custom application.

Kinesis Data Firehose delivery to Amazon Elasticsearch Service VPC endpoint

Kinesis Data Firehose can now deliver data into an Amazon Elasticsearch Service VPC endpoint. This provides a secure and easy way to ingest, transform, and deliver streaming data. You don’t need to worry about managing your data ingestion and delivery infrastructure. With this new feature, Kinesis Data Firehose enables additional secure communication to Amazon Elasticsearch Service VPC endpoints. Amazon Elasticsearch Service endpoints that live within a VPC give you an extra layer of security.

How it works

When you create a Kinesis Data Firehose delivery stream that delivers data to an Amazon Elasticsearch Service VPC endpoint, Kinesis Data Firehose creates an Elastic Network Interface (ENI) in each subnet you select. If you only use one Availability Zone, Kinesis Data Firehose places an endpoint into only one subnet. Similarly, when you create an Amazon Elasticsearch Service VPC endpoint, it creates endpoints in the subnets you chose. Kinesis Data Firehose uses ENI to deliver the data to your Amazon Elasticsearch Service ENI, all inside your VPC. The following screenshot outlines the resulting architecture with a single subnet.

For this walkthrough, you have two security groups:

  • kdf-sec-grp for your Kinesis Data Firehose endpoint
  • es-sec-grp for your Amazon Elasticsearch Service endpoint

To let Kinesis Data Firehose access your Amazon Elasticsearch Service VPC endpoint, security group es-sec-grp needs to allow the ENI that Kinesis Data Firehose created to make HTTPS calls. Kinesis Data Firehose scales the ENIs automatically to meet the throughput requirements. As Kinesis Data Firehose scales ENIs, the outbound rules of the enclosing security group kdf-sec-grp control the data stream. You should configure the Amazon Elasticsearch Service security group (es-sec-grp) to allow HTTPS traffic from the Kinesis Data Firehose security group (kdf-sec-grp). The Kinesis Data Firehose security group needs to allow outbound HTTPS traffic, and its destination is the Amazon Elasticsearch Service security group. With Kinesis Data Firehose VPC delivery, you do not need to make the Firehose security group open to outside traffic.

You can also use the same security group for Kinesis Data Firehose and Amazon Elasticsearch Service endpoints. If you use the same security group for both, make sure the security group inbound rule allows HTTPS traffic.

For your existing delivery streams, you can change the destination endpoint. The new destination must be accessible within the same VPC, subnets, and security groups. Changing either of the VPC, subnets, and security groups requires you to recreate a delivery stream.

All existing Kinesis Data Firehose limits apply to this capability. For example, you can increase the default 50 delivery streams per account by submitting a quota increase request. Also, Kinesis Data Firehose creates one or more ENIs per VPC destination subnet per delivery stream. Kinesis Data Firehose automatically scales the number of ENIs as needed based on the actual throughput. The default throughput limit per delivery stream is 5 MB/second (dependent on Region). You can request an increase to this limit by submitting a support case.

You need to make sure you have enough ENIs available. By default, VPC has a quota of 5000 ENIs per Region. For more information, see Amazon VPC Quotas.

The advantage of using a managed service like Kinesis Data Firehose is that you can focus on the value of your data and not the underlying plumbing. You can configure the frequency of data delivery from your delivery stream to your Amazon Elasticsearch Service domain. Kinesis Data Firehose buffers incoming data before delivering it to Amazon ES. You can configure the values for Amazon Elasticsearch Service buffer size (1 MB–100 MB) or buffer interval (60–900 seconds), and the condition satisfied first triggers data delivery to Amazon Elasticsearch Service. In case data delivery fails for an Amazon Elasticsearch Service destination, you can specify a retry duration between 0 and 7,200 seconds when you create the delivery stream. If data delivery to your Amazon Elasticsearch Service endpoint fails, Kinesis Data Firehose retries data delivery for the specified time duration. After the retrial period, Kinesis Data Firehose skips the current batch of data and moves on to the next batch. Skipped documents go to your Amazon S3 bucket elasticsearch_failed folder, which you can use for manual backfill.

For more information about sizing, see Get started with Amazon Elasticsearch Service: T-shirt-size your domain.

Solution overview

To show you how to use this new feature, this post uses stock demo data available on the Kinesis Data Firehose console to deliver to an Amazon Elasticsearch Service endpoint in VPC. The following diagram illustrates the workflow.

This use case simulates a producer sending stock ticker data to the delivery stream (A). You use an AWS Lambda function (B) to add a timestamp to the stock records so that you can create Kibana visualization. Kinesis Data Firehose streams the stock records to the Amazon Elasticsearch Service endpoint (C) in your VPC. Finally, you can visualize the data using Kibana (D).

This post uses the Amazon Management Console to implement this solution, but you can also use AWS CLI.

Creating security groups

Start by creating two security groups: one for the Amazon Elasticsearch Service VPC endpoint (es-sec-grp) and another for the delivery stream (kdf-sec-grp). Create security groups without any rules first. After you have created them, set the inbound and outbound rules. The following table summarizes these rules.

Creating an Amazon Elasticsearch Service VPC endpoint

To create an Amazon Elasticsearch Service endpoint in VPC, complete the following steps:

  1. On the Amazon Elasticsearch Service console, choose Create a new Domain.
  2. For Deployment Type and Latest Version, choose Development and Testing.
  3. Choose Next.
  4. Give your Amazon Elasticsearch Service endpoint a name.
  5. Select your instance type.

This post uses m5.xlarge.elasticsearch. For production environments, select the appropriately sized instance type. For this post, leave the number of nodes at 1, though best practice is to set it to 2.

  1. Set EBS storage size per node to 100 GiB.
  2. Leave the rest of the settings at their defaults and choose Next.
  3. Select the VPC and private subnet for your Amazon Elasticsearch Service endpoint and the security group for Amazon Elasticsearch Service that you created previously (es-sec-grp).
  4. To access Kibana, choose fine-grained access.
  5. Choose Create Master User.

In this post, we are using internal user database enabled with HTTP basic authentication. For production environments, use IAM roles and configure the appropriate fine-grained access. For more information, see Fine-Grained Access Control in Amazon Elasticsearch Service.

  1. Choose Allow open access to Domain.

Security groups already enforce IP-based access policies. This step opens access to your Amazon Elasticsearch Service endpoint to resources in your VPC, and your Amazon Elasticsearch Service endpoint is not accessible to the internet. For an additional layer of security in your Amazon Elasticsearch Service endpoint, use access policies that specify IAM users or roles. For more information about controlling access to your domains, see Identity and Access Management in Amazon Elasticsearch Service.

  1. Choose Next.
  2. Review your settings and choose Confirm.

The following screenshot shows an example of what your Amazon Elasticsearch Service endpoint VPC settings should look like.

Creating a Lambda function for record transformation

Create a Lambda function to add a timestamp to the data feed. Complete the following steps:

  1. On the Lambda console, choose Create Function.
  2. Choose Author from scratch.
  3. Name your function; for example, tmakAddTSToStream.
  4. Choose Python 3.7 as your runtime.
  5. Choose Create.

The following code is for your Lambda function (under the basic settings section, change the timeout from 3 sec to 45 sec):

import base64
import json
from datetime import datetime

def lambda_handler(event, context):
    
    send_back = []
    now = datetime.utcnow().isoformat()

    for record in event['records']:
        stock_rec = json.loads(base64.b64decode(record['data']))
        stock_rec["timestamp"] = now
       
        record_w_ts = {
                'recordId': record['recordId'],
                'result': 'Ok',
                'data': base64.b64encode(json.dumps(stock_rec).encode('utf-8') + b'\n').decode('utf-8')
        }
        send_back.append(record_w_ts)

    return {'records': send_back} 

Creating a Kinesis Data Firehose delivery stream

To create your delivery stream, complete the following steps:

  1. On the Kinesis Data Firehose console, under Data Firehose, choose Create Delivery Stream.
  2. Enter a name for your stream; for example, tmak-kdf-stock-delivery-stream.
  3. For source, choose Direct PUT or other sources.
  4. Choose Next.
  5. For Data transformation, choose Enabled.
  6. Choose the Lambda function you created.
  7. Choose Next.
  8. Choose Amazon Elasticsearch Service as the destination for your delivery stream.
  9. For Index, enter stockdata.

The VPC section populates automatically. Make sure you use the security group you created for Kinesis Data Firehose (kdf-sec-grp).

  1. For Backup Mode, choose Failed records only.

You can select an existing S3 bucket or create a new one. The following screenshot shows an example of your delivery stream settings.

  1. Choose Next.
  2. Review the buffering settings and set any tags to identify your stream.

A delivery stream that delivers to VPC destinations needs permissions to manage ENIs, list VPCs, and subnets. The console gives you the option to create a new role based on a template that includes all the needed permissions. You can also use an existing role if you already created one.

  1. Choose Next.
  2. Review the settings and choose Create Stream.

It may take up to a few minutes to see the stream status show as Active. See the following screenshot.

On the Amazon EC2 console, under Network and Security, you can see the endpoints created in your VPC by Kinesis Data Firehose and Amazon ES. See the following screenshot.

Configuring Kibana fine-grained access for Kinesis Data Firehose

You need to give Kinesis Data Firehose permissions to deliver stock data to your Amazon Elasticsearch Service endpoint. You can accomplish this via the Kibana console or API. For more information, see API on the Open Distro for Elasticsearch website.

For more information about controlling access to your Amazon Elasticsearch Service endpoint, see How to Control Access to Your Amazon Elasticsearch Service Domain.

Because your Amazon Elasticsearch Service endpoint is in the VPC to access Kibana, you must first connect to the VPC. This process varies by network configuration, but likely involves connecting to a VPN or corporate network. For this post, create a remote desktop EC2 instance public subnet of your VPC. The newly created security group (rdp-sec-grp) protects the instance. You can modify the es-sec-grp security group and allow inbound RDP traffic from rdp-sec-grp so you can access the Kibana URL. The following diagram illustrates this architecture.

Kinesis Data Firehose uses the delivery role to sign HTTP (Signature Version 4) requests before sending the data to the Amazon Elasticsearch Service endpoint. You manage Amazon Elasticsearch Service fine-grained access control permissions using roles, users, and mappings. This section describes how to create roles and set permissions for Kinesis Data Firehose.

The roles you create in this section are different from IAM roles. For more information, see Key Concepts.

Complete the following steps:

  1. Navigate to Kibana (you can find the URL on the Amazon Elasticsearch Service console).
  2. Enter the master user and password that you set up when you created the Amazon Elasticsearch Service endpoint.
  3. Under Security, choose Roles.
  4. Choose Add New Role.
  5. Name your role; for example, firehose-role.
  6. For cluster permissions, add cluster_composite_ops and cluster_monitor.
  7. Under Index permissions, choose Index Patterns and enter stockdata*.
  8. Under Permissions, add three action groups: crud, create_index, and manage.
  9. Choose Save Role Definition.

In the next step, you map the IAM role that Kinesis Data Firehose uses to the role you just created.

  1. Under Security, choose Role Mappings.
  2. Choose the role you just created (firehose-role).
  3. For Backend Roles, choose Add Backend Role.
  4. Enter the IAM ARN of the role Kinesis Data Firehose uses: arn:aws:iam::123456789012:role/firehose_stream_role_name.

You can find your delivery stream ARN on the Kinesis Data Firehose console.

Streaming stock data through Kinesis Data Firehose

To stream your stock data, complete the following steps:

  1. On the Kinesis Data Firehose console, choose the stream you created.
  2. Choose Test with demo data.
  3. Choose Start sending demo data.

If everything is working, you see message Demo data is being sent to your delivery stream. Wait a few minutes before you choose Stop sending demo data.

Analyzing and visualizing data

To analyze and visualize your data, complete the following steps:

  1. On the Kibana console, choose Management.
  2. Choose Index patterns.
  3. For Index pattern, enter stockdata*.
  4. Choose Next.
  5. For the Time filter field, choose timestamp.
  6. Choose Visualize.
  7. Create a new visualization and choose Line.
  8. For Index pattern, choose stockdata*.
  9. For Y-Axis, choose Aggregation=Average and Field=price.
  10. For X-Axis, choose Aggregation=Data Histogram, Field=timestamp, and Interval=seconds.
  11. Under X-Axis, choose Add Sub-buckets.
  12. Choose Split Series.
  13. Set Sub-Aggregation=Terms and Field=ticker_symbol.keyword.
  14. Choose Apply Changes.

The following screenshot shows an example visualization.

You can see the raw data by choosing Discover on the Kibana dashboard. See the following screenshot.

Summary

This post demonstrated how you can move an Amazon Elasticsearch Service endpoint inside your VPC with Kinesis Data Firehose. Additionally, you do not need to enable and secure public access to your Amazon Elasticsearch Service endpoint. If you have been reluctant to expose your Amazon Elasticsearch Service endpoint to the internet but want to stream data, you can now do so with Kinesis Data Firehose.

 


About the Authors

Tarik Makota is a Principal Solutions Architect with the Amazon Web Services. He provides technical guidance, design advice and thought leadership to AWS’ customers across US Northeast. He holds an M.S. in Software Development and Management from Rochester Institute of Technology.

 

Achieve finer-grained data security with column-level access control in Amazon Redshift

Post Syndicated from BP Yau original https://aws.amazon.com/blogs/big-data/achieve-finer-grained-data-security-with-column-level-access-control-in-amazon-redshift/

Amazon Redshift is the most popular cloud data warehouse because it provides fast insights at a low cost. Customers can confidently run mission critical workloads, even in highly regulated industries, because Amazon Redshift comes with out of the box security and compliance. The security features, combined with the ability to easily analyze data in-place and in open formats, along with compute and storage elasticity, and ease of use are what makes tens of thousands of customers choose Amazon Redshift.

Many organizations store sensitive data, commonly classified as personally identifiable information (PII) or sensitive personal information (SPI) in Amazon Redshift and this data will have restricted access from different persona in the organization. For example, your human resources, finance, sales, data science, and marketing departments may all have the required access privileges to view customer data, whereas only the finance department should have access to sensitive data like personally identifiable information (PII) or payment card industry (PCI).

Views or AWS Lake Formation on Amazon Redshift Spectrum was used previously to manage such scenarios, however this adds extra overhead in creating and maintaining views or Amazon Redshift Spectrum. View based approach is also difficult to scale and can lead to lack of security controls. Amazon Redshift column-level access control is a new feature that supports access control at a column-level for data in Amazon Redshift. You can use column-level GRANT and REVOKE statements to help meet your security and compliance needs similar to managing any database object.

This post shows you how to setup Amazon Redshift column-level access control on table, view and materialized view.

Use Case

There are two tables that store customer demographic and account balance data. Finance department can see all customer data while Sales department can only view and update market segment and account balance data as the rest of customer demographic data like customer name, phone and nation are considered PII data and should have restricted access. This is a good use case for column-level access control to secure the PII data. Below is a simple entity relation diagram for the 2 tables.

Prerequisites

Before trying out the illustration in this blog, note the following prerequisites:

  1. Amazon Redshift cluster.
  2. Database user with permission to create table or superuser.

Setting up the environment

To setup the environment and implement the use case, complete the following steps:

  1. Connect to your Amazon Redshift cluster using any SQL client of your choice with user with permission to create table or superuser.
  2. Create two tables with the following code:

    CREATE TABLE customer 
    (
      customerid       INT8 NOT NULL,
      customername     VARCHAR(25) NOT NULL,
      phone            CHAR(15) NOT NULL,
      nationid        INT4 NOT NULL,
      marketsegment    CHAR(10) NOT NULL,
      accountbalance   NUMERIC(12,2) NOT NULL
    );
    CREATE TABLE nation 
    (
      nationid    INT4 NOT NULL,
      nationname   CHAR(25) NOT NULL
    );

  3. Populate some sample data into the two tables with the following code:

    INSERT INTO customer VALUES
    (1, 'Customer#000000001', '33-687-542-7601', 3, 'HOUSEHOLD', 2788.52),
    (2, 'Customer#000000002', '13-806-545-9701', 1, 'MACHINERY', 591.98),
    (3, 'Customer#000000003', '13-312-472-8245', 1, 'HOUSEHOLD', 3332.02),
    (4, 'Customer#000000004', '23-127-851-8031', 2, 'MACHINERY', 9255.67),
    (5, 'Customer#000000005', '13-137-193-2709', 1, 'BUILDING', 5679.84)
    ;
    INSERT INTO nation VALUES
    (1, 'UNITED STATES'),
    (2, 'AUSTRALIA'),
    (3, 'UNITED KINGDOM')
    ;

  4. Create a view and a materialized view with the following code:

    CREATE OR REPLACE VIEW customer_vw AS SELECT customername, phone, marketsegment, accountbalance, CASE WHEN accountbalance < 1000 THEN 'low' WHEN accountbalance > 1000 AND accountbalance < 5000 THEN 'middle' ELSE 'high' END AS incomegroup FROM customer;
    
    CREATE MATERIALIZED VIEW customernation_mv AS SELECT customername, phone, nationname, marketsegment, sum(accountbalance) AS accountbalance FROM customer c INNER JOIN nation n ON c.nationid = n.nationid GROUP BY customername, phone, nationname, marketsegment;

  5. The purpose of the view, customer_vw is to implement business rule of customer income group categorization based on customer dataset.
  6. Analytical dashboards frequently access this dataset by joining and aggregating tables customer and nation and thus, the materialized view customernation_mv is created to speed up the performance such query significantly.
  7. Create and grant table level permissions to user finance which represent finance department users. Note that below users are created only for illustration purpose. We recommend you to use AWS IAM Federation to bring your corporate users without creating them manually in Amazon Redshift. For more information, please refer to https://docs.aws.amazon.com/redshift/latest/mgmt/redshift-iam-authentication-access-control.html#authentication.
    CREATE USER finance WITH password 'Abcd1234!';
    CREATE USER sales WITH password 'Abcd1234!'; 
    GRANT SELECT, UPDATE ON customer TO finance;
    GRANT SELECT ON customer_vw TO finance;
    GRANT SELECT ON customernation_mv TO finance;

  8. Note that user finance has SELECT and UPDATE permission on all columns on customer table.
  9. You need to test and validate user finance is able to view all data from the customer table, customer_vw view and customernation_mv materialized view and update data on customer table.
  10. Enter the following code:
    SET SESSION AUTHORIZATION 'finance';
    SELECT CURRENT_USER;
    SELECT * FROM customer;
    SELECT * FROM customer_vw;
    SELECT * FROM customernation_mv;
    UPDATE customer SET accountbalance = 1000 WHERE marketsegment = 'BUILDING';
    RESET SESSION AUTHORIZATION;

    Note that SQL statement SET SESSION AUTHORIZATION 'finance' is used to impersonate user finance in above code.

    Each select statement should return five rows and the update statement should return one row updated. See the following code:

    dev=# SET SESSION AUTHORIZATION 'finance';
    SET
    dev=> SELECT CURRENT_USER;
     current_user 
    --------------
     finance
    (1 row)
     
    dev=> SELECT * FROM customer;
     customerid |    customername    |      phone      | nationid | marketsegment | accountbalance 
    ------------+--------------------+-----------------+----------+---------------+----------------
              1 | Customer#000000001 | 33-687-542-7601 |        3 | HOUSEHOLD     |        2788.52
              2 | Customer#000000002 | 13-806-545-9701 |        1 | MACHINERY     |         591.98
              3 | Customer#000000003 | 13-312-472-8245 |        1 | HOUSEHOLD     |        3332.02
              4 | Customer#000000004 | 23-127-851-8031 |        2 | MACHINERY     |        9255.67
              5 | Customer#000000005 | 13-137-193-2709 |        1 | BUILDING      |        5679.84
    (5 rows)
     
    dev=> SELECT * FROM customer_vw;
        customername    |      phone      | marketsegment | accountbalance | incomegroup 
    --------------------+-----------------+---------------+----------------+-------------
     Customer#000000001 | 33-687-542-7601 | HOUSEHOLD     |        2788.52 | middle
     Customer#000000002 | 13-806-545-9701 | MACHINERY     |         591.98 | low
     Customer#000000003 | 13-312-472-8245 | HOUSEHOLD     |        3332.02 | middle
     Customer#000000004 | 23-127-851-8031 | MACHINERY     |        9255.67 | high
     Customer#000000005 | 13-137-193-2709 | BUILDING      |        5679.84 | high
    (5 rows)
     
    dev=> SELECT * FROM customernation_mv;
        customername    |      phone      |        nationname         | marketsegment | accountbalance 
    --------------------+-----------------+---------------------------+---------------+----------------
     Customer#000000005 | 13-137-193-2709 | UNITED STATES             | BUILDING      |        5679.84
     Customer#000000004 | 23-127-851-8031 | AUSTRALIA                 | MACHINERY     |        9255.67
     Customer#000000003 | 13-312-472-8245 | UNITED STATES             | HOUSEHOLD     |        3332.02
     Customer#000000002 | 13-806-545-9701 | UNITED STATES             | MACHINERY     |         591.98
     Customer#000000001 | 33-687-542-7601 | UNITED KINGDOM            | HOUSEHOLD     |        2788.52
    (5 rows)
     
    dev=> UPDATE customer SET accountbalance = 1000 WHERE marketsegment = 'BUILDING';
    UPDATE 1
    dev=>  
    dev=> RESET SESSION AUTHORIZATION;
    RESET

You have now successfully setup table level permissions for user finance to view and update all customer data.

Setting up Amazon Redshift column-level access control

Column-level access control can be enabled and disabled by using GRANT and REVOKE statements with the following syntax:

GRANT { { SELECT | UPDATE } ( column_name [, ...] ) [, ...] | ALL [ PRIVILEGES ] ( column_name [,...] ) }
ON { [ TABLE ] table_name [, ...] }
TO { username | GROUP group_name | PUBLIC } [, ...]
 
REVOKE { { SELECT | UPDATE } ( column_name [, ...] ) [, ...] | ALL [ PRIVILEGES ] ( column_name [,...] ) }
ON { [ TABLE ] table_name [, ...] }
FROM { username | GROUP group_name | PUBLIC } [, ...]
[ CASCADE | RESTRICT ]

To set up column-level privileges, complete the following steps:

  1. To determine which users have column-level access control, you can query PG_ATTRIBUTE_INFO system view. Enter the following code:
    SELECT b.attacl, b.attname, c.relname FROM pg_catalog.pg_attribute_info b JOIN pg_class c ON c.oid=b.attrelid WHERE c.relname in ('customer','customer_vw','customernation_mv') AND b.attacl IS NOT NULL ORDER BY c.relname, b.attname;

  2. The query should return zero records as we have not implemented column-level access control yet.
  3. Grant user sales SELECT permission on columns marketsegment and accountbalance on table customer, view customer_vw and materialized view customernation_mv. We also grant UPDATE permission on column marketsegment and accountbalance on table customer by entering the following code:
    RESET SESSION AUTHORIZATION;
    GRANT SELECT (marketsegment, accountbalance) ON customer TO sales WITH GRANT OPTION;
    GRANT SELECT (marketsegment, accountbalance),UPDATE (marketsegment, accountbalance) ON customer TO sales;
    GRANT SELECT (marketsegment, accountbalance) ON customer_vw TO sales;
    GRANT SELECT (marketsegment, accountbalance) ON customernation_mv TO sales;

  4. Error message “Grant options are not supported for column privileges” should be returned for the first statement. This is because only a table’s owner or a superuser can grant column-level privileges and to maintain simple security model.
  5. Validate if above permissions have been granted with the following code:
    SELECT b.attacl, b.attname, c.relname FROM pg_catalog.pg_attribute_info b  JOIN pg_class c ON c.oid=b.attrelid WHERE c.relname in ('customer','customer_vw','customernation_mv') AND b.attacl IS NOT NULL ORDER BY c.relname, b.attname;

  6. The query should return six rows. See the following code:
    dev=# SELECT b.attacl, b.attname, c.relname FROM pg_catalog.pg_attribute_info b  JOIN pg_class c ON c.oid=b.attrelid WHERE c.relname in ('customer','customer_vw','customernation_mv') AND b.attacl IS NOT NULL ORDER BY c.relname, b.attname;
          attacl       |    attname     |      relname      
    -------------------+----------------+-------------------
     {sales=rw/fqdemo} | accountbalance | customer
     {sales=rw/fqdemo} | marketsegment  | customer
     {sales=r/fqdemo}  | accountbalance | customer_vw
     {sales=r/fqdemo}  | marketsegment  | customer_vw
     {sales=r/fqdemo}  | accountbalance | customernation_mv
     {sales=r/fqdemo}  | marketsegment  | customernation_mv
    (6 rows)

    The output above shows:
    Users: sales (attacl column)
    Permissions: read/write (attacl column value “rw”)
    On Column: accountbalance, marketsegment (attname column)
    Of table: customer (relname column)
    Granted by: fqdemo (attacl column)

    Users: sales (attacl column)
    Permissions: read (attacl column value “r”)
    On Column: accountbalance, marketsegment (attname column)
    Of table: customer_vw, customernation_mv (relname column)
    Granted by: fqdemo (attacl column)

  7. After you confirmed the column-level access control are correct, run as user sales to query table customer, view customer_vw and materialized view customernation_mv using the following code:
    SET SESSION AUTHORIZATION 'sales';
    SELECT CURRENT_USER;
    SELECT * FROM customer;
    SELECT * FROM customer_vw;
    SELECT * FROM customernation_mv;

  8. Each select statement should return permission denied error as the user does not have permissions to all columns of the objects being queried. See the following code:
    dev=# SET SESSION AUTHORIZATION 'sales';
    SET
    dev=> SELECT CURRENT_USER;
     current_user 
    --------------
     sales
    (1 row)
     
    dev=> SELECT * FROM customer;
    ERROR:  permission denied for relation customer
    dev=> SELECT * FROM customer_vw;
    ERROR:  permission denied for relation customer_vw
    dev=> SELECT * FROM customernation_mv;
    ERROR:  permission denied for relation customernation_mv

  9. Query only the columns marketsegment and accountbalance from table customer, view customer_vw and materialized view customernation_mv with the following code:
    SELECT marketsegment, accountbalance FROM customer;
    SELECT marketsegment, accountbalance FROM customer_vw;
    SELECT marketsegment, accountbalance FROM customernation_mv;

  10. Each select statement should return five rows as user sales has permission to query columns marketsegment and accountbalance. See the following code:
    dev=> SELECT marketsegment, accountbalance FROM customer;
     marketsegment | accountbalance 
    ---------------+----------------
     HOUSEHOLD     |        2788.52
     MACHINERY     |         591.98
     HOUSEHOLD     |        3332.02
     MACHINERY     |        9255.67
     BUILDING      |        1000.00
    (5 rows)
     
    dev=> SELECT marketsegment, accountbalance FROM customer_vw;
     marketsegment | accountbalance 
    ---------------+----------------
     HOUSEHOLD     |        2788.52
     MACHINERY     |         591.98
     HOUSEHOLD     |        3332.02
     MACHINERY     |        9255.67
     BUILDING      |        1000.00
    (5 rows)
     
    dev=> SELECT marketsegment, accountbalance FROM customernation_mv;
     marketsegment | accountbalance 
    ---------------+----------------
     MACHINERY     |        9255.67
     BUILDING      |        5679.84
     MACHINERY     |         591.98
     HOUSEHOLD     |        3332.02
     HOUSEHOLD     |        2788.52
    (5 rows)

  11. Update the accountbalance column with the following code:
    UPDATE customer SET accountbalance = 2000 WHERE marketsegment = 'BUILDING';
    SELECT accountbalance FROM customer WHERE marketsegment = 'BUILDING';

  12. The select statement should return one row that shows value 2000. See the following code:
    dev=> UPDATE customer SET accountbalance = 2000 WHERE marketsegment = 'BUILDING';
    UPDATE 1
    dev=> SELECT accountbalance FROM customer WHERE marketsegment = 'BUILDING';
     accountbalance 
    ----------------
            2000.00
    (1 row)

  13. Update the accountbalance column with condition nationid=1 by using the following code:
    UPDATE customer SET accountbalance = 3000 WHERE nationid = 1;

  14. The update statement should return permission denied error as user sales does not have column-level privileges on column nationid in the where clause.
  15. Query the count of record group by nationid with the following code:
    SELECT COUNT(*) FROM customer GROUP BY nationid;

  16. The select statement should return permission denied error as user sales doesn’t have column-level privileges on column nationid in the group by clause.
  17. Please also note that column-level privileges are checked for columns not only in the select list but also where clause, order by clause, group by clause, having clause and other clauses of a query that require SELECT/UPDATE privileges on a column.
  18. Remove column marketsegment from column-level access control for user sales using REVOKE command and see what happens. Enter the following code:
    RESET SESSION AUTHORIZATION;
    REVOKE SELECT (marketsegment) ON customer FROM sales;
    SET SESSION AUTHORIZATION 'sales';
    SELECT CURRENT_USER;
    SELECT marketsegment, accountbalance FROM customer;
    SELECT accountbalance FROM customer;

  19. As you can see, user sales is no longer able to view marketsegment from table customer.
    dev=> SELECT marketsegment, accountbalance FROM customer;
    ERROR:  permission denied for relation customer
    dev=> SELECT accountbalance FROM customer;
     accountbalance 
    ----------------
            2788.52
             591.98
            3332.02
            9255.67
            2000.00
    (5 rows)

  20. Enter the following code to query column marketsegment from view customer_vw:
    SELECT marketsegment FROM customer_vw;

  21. The statement should return five rows as user sales still has access to column marketsegment on the view even though column-level privileges have been revoked from table customer. Views execute with the permissions of the view owner so it will still continue to work as long as the view’s owner still has column or table-level privileges on the base tables used by the view. To prevent unauthorized access of the sensitive data, the column-level privileges for user sales should be revoked from the view as well.
  22. Revoke all permissions for user sales with the following code:
    RESET SESSION AUTHORIZATION;
    SELECT CURRENT_USER;
    REVOKE SELECT ON customernation_mv FROM sales;
    REVOKE SELECT ON customer_vw FROM sales;
    REVOKE SELECT ON customer FROM sales;
    REVOKE UPDATE ON customer FROM sales;

  23. Query the table, view and materialized view again with user sales using the following code:
    SET SESSION AUTHORIZATION 'sales';
    SELECT CURRENT_USER;
    SELECT marketsegment, accountbalance FROM customer;
    SELECT marketsegment, accountbalance FROM customer_vw;
    SELECT marketsegment, accountbalance FROM customernation_mv;

  24. Permission denied error should be returned and this shows that REVOKE is able to remove all permissions.

As summary, a simple GRANT statement will enable column-level access control on Amazon Redshift table, view and materialized view. A REVOKE statement is what you need to remove the permission. This eliminates the complexity of legacy views-based access control to achieve fine-grained read and write access control.

Clean up

Once you are done with above testing, you can remove the objects and users with the following code:

RESET SESSION AUTHORIZATION;
REVOKE SELECT ON customernation_mv FROM finance;
REVOKE SELECT ON customer_vw FROM finance;
REVOKE SELECT ON customer FROM finance;
REVOKE UPDATE ON customer FROM finance;
DROP VIEW customer_vw;
DROP MATERIALIZED VIEW customernation_mv;
DROP TABLE nation;
DROP TABLE customer;
DROP USER IF EXISTS sales;
DROP USER IF EXISTS finance;

Summary

Amazon Redshift is secure by default and security doesn’t cost extra. It provides Authentication (Active Directory, Okta, Ping Federate, and Azure AD), Federation and comes pre-integrated with AWS IAM and KMS. It also supports table-based access control for data in Amazon Redshift and column-level access control for data in Amazon S3 through Amazon Redshift Spectrum since September 2019. Amazon Redshift now supports access control at a column-level for local tables, eliminating the need to implement view-based access control or using another system.

This post showed you how easy it is to setup Amazon Redshift column-level access control. The use case in this post demonstrated how to confirm that you have fine-grained access on the table, view, and materialized view. You can adopt this feature to support your business needs.

If you have any questions or suggestions, please leave a comment below.

 


About the Authors

BP Yau is a Data Warehouse Specialist Solutions Architect at AWS. His role is to help customers architect big data solutions to process data at scale. Before AWS, he helped Amazon.com Supply Chain Optimization Technologies migrate the Oracle Data Warehouse to Amazon Redshift and built the next generation big data analytics platform using AWS technologies.

 

 

 

Srikanth Sopirala is a Sr. Specialist Solutions Architect focused on Analytics at AWS. He is passionate about helping customers build scalable data and analytics solutions in the cloud.

Speed up your ELT and BI queries with Amazon Redshift materialized views

Post Syndicated from Juan Yu original https://aws.amazon.com/blogs/big-data/speed-up-your-elt-and-bi-queries-with-amazon-redshift-materialized-views/

The Amazon Redshift materialized views function helps you achieve significantly faster query performance on repeated or predictable workloads such as dashboard queries from Business Intelligence (BI) tools, such as Amazon QuickSight. It also speeds up and simplifies extract, load, and transform (ELT) data processing. You can use materialized views to store frequently used precomputations and seamlessly use them to achieve lower latency on subsequent analytical queries.

This post demonstrates how to create a materialized view, refresh it after data ingestion, and speed up your BI workload.

Setting up your sample datasets

This walkthrough uses the Amazon Customer Reviews Dataset. It is a public dataset stored in the us-east-1 Region. You will create the following three tables:

  • product_reviews – Contains customer reviews for a specific product
  • customer – Contains customer profile data
  • customer_address – Contains customer address information

The following diagram shows the relationship of the three tables.

To download the script and set up the tables, choose mv_blog.sql.

Creating and using materialized views

For this use case, your marketing team wants to build a report that shows how many customers per state like your products. You also want to drill down to each product category when needed.

In this first step, you create a regular view. See the following code:

CREATE VIEW v_reviews_byprod_and_state AS
SELECT PR.product_category,
       A.ca_state AS customer_state,
       count(PR.star_rating) AS cnt
FROM product_reviews PR,
     customer C,
     customer_address A
WHERE PR.customer_id = C.c_customer_sk
  AND C.c_current_addr_sk = A.ca_address_sk
  AND PR.marketplace = 'US'
GROUP BY 1,
         2;

The following code is a report to analyze the product review count per state:

SELECT customer_state,
       sum(cnt)
FROM v_reviews_byprod_and_state
GROUP BY 1
ORDER BY 2;

The following code is a report to analyze the product review count per state for specific categories:

SELECT customer_state,
       sum(cnt)
FROM v_reviews_byprod_and_state
WHERE product_category IN ('Home',
                           'Grocery')
GROUP BY 1
ORDER BY 2;

The preceding reports take approximately 4 seconds to run. As you sell more products and get more reviews, this elapsed time gradually gets longer. To speed up those reports, you can create a materialized view to precompute the count of reviews per product category and per state. See the following code:

CREATE MATERIALIZED VIEW mv_reviews_byprod_and_state AS
SELECT PR.product_category,
       A.ca_state AS customer_state,
       count(PR.star_rating) AS cnt
FROM product_reviews PR,
     customer C,
     customer_address A
WHERE PR.customer_id = C.c_customer_sk
  AND C.c_current_addr_sk = A.ca_address_sk
  AND PR.marketplace = 'US'
GROUP BY 1,
         2;

The following code are the reports to analyze the product review against the materialized view.

SELECT customer_state,
       sum(cnt)
FROM mv_reviews_byprod_and_state
GROUP BY 1
ORDER BY 2;

SELECT customer_state,
       sum(cnt)
FROM mv_reviews_byprod_and_state
WHERE product_category IN ('Home',
                           'Grocery')
GROUP BY 1
ORDER BY 2;

The same reports against materialized views take less than 200 milliseconds because the new queries access precomputed joins, filters, grouping, and partial sums instead of the multiple, larger base tables.

Speeding up and simplifying ELT data processing

To achieve similar performance without the use of materialized views, many users use the CREATE TABLE AS (CTAS) command. However, as you update base tables with new data inserts, updates, or deletes, the CTAS tables become stale; you must recreate them to keep them up-to-date with the latest changes from the base tables. Now with Amazon Redshift materialized views, you can overcome this problem by efficiently and incrementally refreshing the materialized views with supported SQL. For example, the following code ingests another 10,000 reviews:

INSERT INTO product_reviews
SELECT   marketplace, 
  cast(customer_id as bigint) customer_id, 
  review_id, 
  product_id, 
  cast(product_parent as bigint) product_parent, 
  product_title, 
  star_rating, 
  helpful_votes, 
  total_votes, 
  vine, 
  verified_purchase, 
  review_headline, 
  review_body, 
  review_date, 
  year,
  product_category
FROM demo.products_reviews
WHERE review_date = '2015-07-01' LIMIT 10000;

Now the materialized view is out-of-date. To refresh the materialized view, enter the following code:

REFRESH MATERIALIZED VIEW mv_reviews_byprod_and_state;

Within 200 milliseconds, the materialized view is up-to-date again. Your report queries have the same consistent, fast performance.

The following screenshot is the query log that shows query performance. The log shows newer statements at the top.

The materialized views refresh is much faster because it’s incremental: Amazon Redshift only uses the new data to update the materialized view instead of recomputing the entire materialized view again from the base tables.  For more information, see REFRESH MATERIALIZED VIEW.

Materialized views also simplify and make ELT easier and more efficient. Without materialized views, you might create an ELT job and use CTAS to precompute the product analysis data. The ELT job recomputes this data after new data is ingested and stores the data in the precomputed product analysis table to meet the dashboard latency requirement.

In particular, the ETL job drops and recreates the precomputed product analysis table after each ingestion. See the following code:

BEGIN;
    DROP TABLE IF EXISTS latest_product_analysis;
    CREATE TABLE latest_product_analysis as SELECT ...;
END;

With materialized views, you just need to create the materialized view one time and refresh to keep it up-to-date. To refresh materialized views after ingesting new data, add REFRESH MATERIALIZED VIEW to the ELT data ingestion scripts. Redshift will automatically and incrementally bring the materialized view up-to-date.

Achieving faster performance for BI dashboards

You can use materialized views to help your BI team build a dashboard to analyze product trends.

For example, to create a materialized view to join customer and customer_address dimension tables and precompute reviews and ratings, enter the following code:

CREATE MATERIALIZED VIEW mv_product_analysis 
    sortkey(product_category, Customer_State, review_date) 
AS
SELECT PR.product_category,
       A.ca_state AS Customer_State,
       PR.review_date AS Review_Date,
       PR.product_title,
       COUNT(1) AS review_total,
       SUM(PR.star_rating) AS rating
FROM product_reviews PR,
     customer C,
     customer_address A
WHERE PR.customer_id = C.c_customer_sk
  AND C.c_current_addr_sk = A.ca_address_sk
  AND marketplace = 'US'
GROUP BY 1,2,3,4;

You access materialized views the same as you do a regular table. For this walkthrough, choose a materialized view as the source for an Amazon QuickSight dataset. As showing by the following screenshot.

You can preview data of the materialized view in Amazon QuickSight to understand what information can be used to build the dashboard. The following screenshot shows the sample data of mv_product_analysis.

To track how many reviews customers post over time, use review_date as the X-axis and Sum(review_total) as the Y-axis. The following graph shows this visualization.

The following screenshot shows a complete dashboard “Product trend” that analyzes the top product category, product popularity by state, and more.

Because you are using materialized views, the product trend dashboard loads in seconds and is always up-to-date. You can gain the latest insights, understand customer purchase behavior, and identify business opportunities and optimizations.

You can compare the performance of materialized views with other possible alternatives, such as using regular views and using CTAS. The following graph shows the overall query execution for the product trend dashboard. Materialized views not only improve query performance by more than an order of magnitude compared to using a regular view, but also have low maintenance costs compared to using a CTAS because the incremental refresh time is proportional to the delta of changes in the base tables. In contrast, the CTAS recreate approach needs to processes all the data in the base tables.

The following animated gif shows the actual response time for the product trend dashboard built using Amazon QuickSight in direct query mode.

Conclusion

This post showed how to create Amazon Redshift materialized views with one or more base tables to speed up both BI queries and ELT. You can easily build and maintain efficient data processing pipelines and seamlessly extend the low latency query execution benefits of materialized views to data analysis.

 


About the Authors

Juan Yu is a Data Warehouse Specialist Solutions Architect at AWS.

 

 

 

 

Jose Kunnackal John is principal product manager for Amazon QuickSight, AWS’ cloud-native, fully managed BI service. Jose started his career with Motorola, writing software for telecom and first responder systems. Later he was Director of Engineering at Trilibis Mobile, where he built a SaaS mobile web platform using AWS services. Jose is excited by the potential of cloud technologies and looks forward to helping customers with their transition to the cloud.

 

 

 

Accelerate Amazon Redshift Federated Query adoption with AWS CloudFormation

Post Syndicated from BP Yau original https://aws.amazon.com/blogs/big-data/accelerate-amazon-redshift-federated-query-adoption-with-aws-cloudformation/

Amazon Redshift Federated Query allows you to combine the data from one or more Amazon RDS for PostgreSQL and Amazon Aurora PostgreSQL databases with data already in Amazon Redshift. You can also combine such data with data in an Amazon S3 data lake.

This post shows you how to set up Aurora PostgreSQL and Amazon Redshift with a 10 GB TPC-H dataset, and Amazon Redshift Federated Query using AWS CloudFormation. For more information about using Federated Query, see Build a Simplified ETL and Live Data Query Solution using Redshift Federated Query. You can use the environment you set up in this post to experiment with various use cases in the preceding post.

Benefits of using CloudFormation templates

The standard workflow of setting up Amazon Redshift Federated Query involves six steps. For more information, see Querying Data with Federated Query in Amazon Redshift. With a CloudFormation template, you can condense these manual procedures into a few steps listed in a text file. The declarative code in the file captures the intended state of the resources to create and allows you to automate the creation of AWS resources to support Amazon Redshift Federated Query. You can further enhance this template to become the single source of truth for your infrastructure.

A CloudFormation template acts as an accelerator. It helps you automate the deployment of technology and infrastructure in a safe and repeatable manner across multiple Regions and multiple accounts with the least amount of effort and time.

Architecture overview

The following diagram illustrates the solution architecture.

The CloudFormation templates provision the following components in the architecture:

  • VPC
  • Subnets
  • Route tables
  • Internet gateway
  • Amazon Linux Bastion host
  • Secrets
  • Aurora PostgreSQL cluster with TPC-H dataset preloaded
  • Amazon Redshift cluster with TPC-H dataset preloaded
  • Amazon Redshift IAM role with required permissions

Prerequisites

Before you create your resources in AWS CloudFormation, you must complete the following prerequisites:

  • Have an IAM user with sufficient permissions to interact with the AWS Management Console and related AWS services. Your IAM permissions must also include access to create IAM roles and policies via the CloudFormation template.
  • Create an Amazon EC2 key pair in the us-east-1 Region. Make sure that you save the private key; this is the only time you can do so. You use this key pair as an input parameter when you set up the CloudFormation stack.

Setting up the resources with AWS CloudFormation

This post provides a CloudFormation template as a general guide. You can review and customize it to suit your needs. Some of the resources that this stack deploys incur costs when in use.

To create these resources, complete the following steps:

  1. Sign in to the console.
  2. Choose the us-east-1 Region in which to create the stack.
  3. Choose Launch Stack:
  4. Choose Next.This automatically launches AWS CloudFormation in your AWS account with a template. It prompts you to sign in as needed. You can view the CloudFormation template from within the console.
  5. For Stack name, enter a stack name.
  6. For Session, leave as the default.
  7. For ec2KeyPair, choose the key pair you created earlier.
  8. Choose Next.
  9. On the next screen, choose Next.
  10. Review the details on the final screen and select I acknowledge that AWS CloudFormation might create IAM resources.
  11. Choose Create.Stack creation can take up to 45 minutes.
  12. After the stack creation is complete, in the Outputs section, record the value of the key for the following components, which you use in a later step:
  • AuroraClusterEndpoint
  • AuroraSecretArn
  • RedshiftClusterEndpoint
  • RedshiftClusterRoleArn

You are now ready to log in to both the Aurora PostgreSQL and Amazon Redshift cluster and run some basic commands to test them.

Logging in to the clusters using the Amazon Linux Bastion host

The following steps assume that you use a computer with an SSH client to connect to the Bastion host. For more information about connecting using various clients, see Connect to Your Linux Instance.

  1. Move the private key of the EC2 key pair (that you saved previously) to a location on your SSH client, where you are connecting to the Amazon Linux Bastion host.
  2. Change the permission of the private key using the following code, so that it’s not publicly viewable:chmod 400 <private key file name; for example, bastion-key.pem>
  3. On the Amazon EC2 console, choose Instances.
  4. Choose the Amazon Linux Bastion host that the CloudFormation stack created.
  5. Choose Connect.
  6. Copy the value for SSHCommand.
  7. On the SSH client, change the directory to the location where you saved the EC2 private key, and paste the SSHCommand value.
  8. On the console, open the Secrets Manager dashboard.
  9. Choose the secret secretAuroraMasterUser-*.
  10. Choose Retrieve secret value.
  11. Record the password under Secret key/value, which you use to log in to the Aurora PostgreSQL cluster.
  12. Choose the secret SecretRedshiftMasterUser.
  13. Choose Retrieve secret value.
  14. Record the password under Secret key/value, which you use to log in to the Amazon Redshift cluster.
  15. Log in to both the Aurora PostgreSQL and Amazon Redshift database using PSQL Client.The CloudFormation template has already set up PSQL Client binaries on the Amazon Linux Bastion host.
  16. Enter the following code in the command prompt of the Bastion host (substitute <RedshiftClusterEndpoint> with the value from the AWS CloudFormation output):psql -h <RedshiftClusterEndpoint> -d dev -p 5439 -U fqdemo
  17. When prompted, enter the database user password you recorded earlier.
  18. Enter the following SQL command:
    select "table" from svv_table_info where schema='public';

    You should see the following eight tables as the output:

    dev=# select "table" from svv_table_info where schema='public';
     table   
    ----------
     orders
     customer
     region
     nation
     supplier
     part
     lineitem
     partsupp
    (8 rows)

  19. Launch another command prompt session of the Bastion host and enter the following code (substitute <AuroraClusterEndpoint> with the value from the AWS CloudFormation output):psql -h <AuroraClusterEndpoint> -d dev -p 5432 -U awsuser
  20. When prompted, enter the database user password you recorded earlier.
  21. Enter the following SQL command:
    select tablename from pg_tables where schemaname='public';

    You should see the following eight tables as the output:

    dev=# select tablename from pg_tables where schemaname='public';
     tablename 
    -----------
     region
     nation
     lineitem
     orders
     part
     supplier
     partsupp
     customer
    (8 rows)

Completing Federated Query setup

The final step is to create an external schema to connect to the Aurora PostgreSQL instance. The following example code creates an external schema statement that you need to run on your Amazon Redshift cluster to complete this step:

CREATE EXTERNAL SCHEMA IF NOT EXISTS pg 
FROM POSTGRES 
DATABASE 'dev' 
SCHEMA 'public' 
URI '<AuroraClusterEndpoint>' 
PORT 5432 
IAM_ROLE '<IAMRole>' 
SECRET_ARN '<SecretARN>'

Use the following parameters:

  • URI – AuroraClusterEndpoint value from the CloudFormation stack outputs. Value is in the format <stackname>-cluster.<randomcharacter>.us-east-1.rds.amazonaws.com
  • IAMRoleRedshiftClusterRoleArn value from the CloudFormation stack outputs. Value is in the format arn:aws:iam::<accountnumber>:role/<stackname>-RedshiftClusterRole-<randomcharacter>
  • SecretARNAuroraSecretArn value from the CloudFormation stack outputs. Value is in the format arn:aws:secretsmanager:us-east-1:<accountnumber>: secret:secretAuroraMasterUser-<randomcharacter>

Testing Federated Query

Now that you have set up Federated Query, you can start testing the feature using the TPC-H dataset that was preloaded into both Aurora PostgreSQL and Amazon Redshift.

The following query shows the parts and supplier relationship. Tables PARTSUPP and PART are stored in Amazon Redshift, and the SUPPLIER table in the subquery is from Aurora PostgreSQL:

SELECT TOP 10 P_BRAND,
       P_TYPE,
       P_SIZE,
       COUNT(DISTINCT PS_SUPPKEY) AS SUPPLIER_CNT
FROM PARTSUPP,
     PART
WHERE P_PARTKEY = PS_PARTKEY
AND   P_BRAND <> 'Brand#23'
AND   P_TYPE NOT LIKE 'MEDIUM ANODIZED%'
AND   P_SIZE IN (1,32,33,46,7,42,21,40)
AND   PS_SUPPKEY NOT IN (SELECT S_SUPPKEY
                         FROM pg.SUPPLIER
                         WHERE S_COMMENT LIKE '%Customer%Complaints%')
GROUP	BY P_BRAND,
         P_TYPE,
         P_SIZE
ORDER	BY SUPPLIER_CNT DESC,
         P_BRAND,
         P_TYPE,
         P_SIZE;

The following query shows the order priority by combining ORDERS table data from Amazon Redshift and Aurora PostgreSQL. This demonstrates the use case of live data query from an OLTP source federated with historical data on a data warehouse:

SELECT O_ORDERPRIORITY,
       COUNT(*) AS ORDER_COUNT
FROM (SELECT O_ORDERPRIORITY
      FROM ORDERS o
      WHERE O_ORDERDATE < '1997-07-01'       AND O_ORDERDATE >= CAST(DATE '1997-07-01' - INTERVAL '3 months' AS DATE)
      UNION ALL
      SELECT O_ORDERPRIORITY
      FROM pg.ORDERS o
      WHERE O_ORDERDATE >= '1997-07-01'
      AND   O_ORDERDATE < CAST(DATE '1997-07-01' +INTERVAL '1 day' AS DATE))
GROUP	BY O_ORDERPRIORITY
ORDER	BY O_ORDERPRIORITY;

You can continue to experiment with the dataset and explore the three main use cases from the post, Build a Simplified ETL and Live Data Query Solution using Redshift Federated Query.

Deleting the CloudFormation stack

When you are finished, delete the CloudFormation stack; some of the AWS resources in this walkthrough incur a cost if you continue to use them. Complete the following steps:

  1. On the AWS CloudFormation console, choose Stacks.
  2. Choose the stack you launched in this walkthrough. The stack must be currently running.
  3. In the stack details pane, choose Delete.
  4. Choose Delete stack.

Summary

This post showed you how to automate the creation of an Aurora PostgreSQL and Amazon Redshift cluster preloaded with the TPC-H dataset, the prerequisites of the new Amazon Redshift Federated Query feature using AWS CloudFormation, and a single manual step to complete the setup. The post also provided some example federated queries using the TPC-H dataset, which you can use to accelerate your learning and adoption of the new features. You can continue to modify the CloudFormation templates from this post to support your business needs.

If you have any questions or suggestions, please leave a comment.

 


About the Authors

BP Yau is a Data Warehouse Specialist Solutions Architect at AWS. His role is to help customers architect big data solutions to process data at scale. Before AWS, he helped Amazon.com Supply Chain Optimization Technologies migrate the Oracle Data Warehouse to Amazon Redshift and built the next generation big data analytics platform using AWS technologies.

 

 

 

Srikanth Sopirala is a Sr. Specialist Solutions Architect focused on Analytics at AWS. He is passionate about helping customers build scalable data and analytics solutions in the cloud.

Build a Simplified ETL and Live Data Query Solution using Redshift Federated Query

Post Syndicated from Tito Mijares original https://aws.amazon.com/blogs/big-data/build-a-simplified-etl-and-live-data-query-solution-using-redshift-federated-query/

You may have heard the saying that the best ETL is no ETL. Amazon Redshift now makes this possible with Federated Query. In its initial release, this feature lets you query data in Amazon Aurora PostgreSQL or Amazon RDS for PostgreSQL using Amazon Redshift external schemas. Federated Query also exposes the metadata from these source databases through system views and driver APIs, which allows business intelligence tools like Tableau and Amazon Quicksight to connect to Amazon Redshift and query data in PostgreSQL without having to make local copies. This enables a new data warehouse pattern—live data query—in which you can seamlessly retrieve data from PostgreSQL databases, or build data into a late binding view, which combines operational PostgreSQL data, analytical Amazon Redshift local data, and historical Amazon Redshift Spectrum data in an Amazon S3 data lake.

Simplified ETL use case

For this ETL use case, you can simplify the familiar upsert pattern with a federated query. You can bypass the need for incremental extracts in Amazon S3 and the subsequent load via COPY by querying the data in place within its source database. This change can be a single line of code that replaces the COPY command with a query to an external table. See the following code:

BEGIN;
CREATE TEMP TABLE staging (LIKE ods.store_sales);
-- replace the following COPY from S3 
COPY staging FROM 's3://yourETLbucket/daily_store_sales/' 
     IAM_ROLE 'arn:aws:iam::<account_id>:role/<s3_reader_role>' DELIMITER '|' COMPUPDATE OFF;
-- with this federated query to load staging data from PostgreSQL source
INSERT INTO staging SELECT * FROM pg.store_sales p
	WHERE p.last_updated_date > (SELECT MAX(last_updated_date) FROM ods.store_sales)
DELETE FROM ods.store_sales USING staging s WHERE ods.store_sales.id = s.id;
INSERT INTO ods.store_sales SELECT * FROM staging;
DROP TABLE staging;
COMMIT;

In the preceding example, the table pg.store_sales resides in PostgreSQL, and you use a federated query to retrieve fresh data to load into a staging table in Amazon Redshift, keeping the actual delete and insert operations unchanged. This pattern is likely the most common application of federated queries.

Setting up an external schema

The external schema pg in the preceding example was set up as follows:

CREATE EXTERNAL SCHEMA IF NOT EXISTS pg                                                                         
FROM POSTGRES                                                                                                           
DATABASE 'dev' 
SCHEMA 'retail'                                                                                     
URI 'database-1.cluster-ro-samplecluster.us-east-1.rds.amazonaws.com'                                                    
PORT 5432                                                                                                               
IAM_ROLE 'arn:aws:iam::555566667777:role/myFederatedQueryRDS'                                                           
SECRET_ARN 'arn:aws:secretsmanager:us-east-1:555566667777:secret:MyRDSCredentials-TfzFSB'

If you’re familiar with the CREATE EXTERNAL SCHEMA command from using it in Spectrum, note some new parameter options to enable federated queries.

FROM POSTGRES                                                                                                           
DATABASE 'dev' 
SCHEMA 'retail'

Whereas Amazon Redshift Spectrum references an external data catalog that resides within AWS Glue, Amazon Athena, or Hive, this code points to a Postgres catalog. Also, expect more keywords used with FROM, as Amazon Redshift supports more source databases for federated querying. By default, if you do not specify SCHEMA, it defaults to public.

Within the target database, you identify DATABASE ‘dev’ and SCHEMA ‘retail’, so any queries to the Amazon Redshift table pg.<some_table> get issued to PostgreSQL as a request for retail.<some_table> in the dev database. For Amazon Redshift, query predicates are pushed down and run entirely in PostgreSQL, which reduces the result set returned to Amazon Redshift for subsequent operations. Going further, the query planner derives cardinality estimates for external tables to optimize joins between Amazon Redshift and PostgreSQL. From the preceding example:

URI 'database-1.cluster-ro-samplecluster.us-east-1.rds.amazonaws.com'                                                    
PORT 5432

The URI and PORT parameters that reference both the PostgreSQL endpoint and port are self-explanatory, but there are a few things to consider in your configuration:

  • Use a read replica endpoint in Aurora or Amazon RDS for PostgreSQL to reduce load on the primary instance.
  • Set up your Amazon RDS for PostgreSQL instance, Aurora serverless or provisioned instances, and Amazon Redshift clusters to use the same VPC and subnet groups. That way, you can add the security group for the cluster to the inbound rules of the security group for the Aurora or Amazon RDS for PostgreSQL instance.
  • If both Amazon Redshift and Aurora or Amazon RDS for PostgreSQL are on different VPCs, set up VPC peering. For more information, see What is VPC Peering?

Configuring AWS Secrets Manager for remote database credentials

To retrieve AWS Secrets Manager remote database credentials, our example uses the following code:

IAM_ROLE 'arn:aws:iam::555566667777:role/myFederatedQueryRDS'                                                           
SECRET_ARN 'arn:aws:secretsmanager:us-east-1:555566667777:secret:MyRDSCredentials-TfzFSB'

These two parameters are interrelated because the SECRET_ARN is also embedded in the IAM policy for the role.

If a service like Secrets Manager didn’t exist and you wanted to issue a federated query from Amazon Redshift to PostgreSQL, you would need to supply the database credentials to the CREATE EXTERNAL SCHEMA command via a parameter like CREDENTIALS, which you also use with the COPY command. However, this hardcoded approach doesn’t take into account that the PostgreSQL credentials could expire.

You avoid this problem by keeping PostgreSQL database credentials within Secrets Manager, which provides a centralized service to manage secrets. Because Amazon Redshift retrieves and uses these credentials, they are transient and not stored in any generated code and are discarded after query execution.

Storing credentials in Secrets Manager takes up to a few minutes. To store a new secret, complete the following steps:

 

  1. On the Secrets Manager console, choose Secrets.
  2. Choose Store a new secret.
  3. In the Store a new secret section, complete the following:

 

  • Supply your PostgreSQL database credentials
  • Name the secret; for example, MyRDSCredentials
  • Configure rotation (you can enable this at a later time)
  • Optionally, copy programmatic code for accessing your secret using your preferred programming languages (which is not needed for this post)
  1. Choose Next.

You can also retrieve the credentials easily.

  1. On the Secrets Manager console, choose your secret.
  2. Choose Retrieve secret value.

The following screenshot shows you the secret value details.

This secret is now an AWS resource referenced via a secret ARN. See the following screenshot.

Setting up an IAM role

You can now pull everything together by embedding the secret ARN into an IAM policy, naming the policy, and attaching it to an IAM role. See the following code:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AccessSecret",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetResourcePolicy",
                "secretsmanager:GetSecretValue",
                "secretsmanager:DescribeSecret",
                "secretsmanager:ListSecretVersionIds"
            ],
            "Resource": "arn:aws:secretsmanager:us-east-1:555566667777:secret:MyRDSCredentials-TfzFSB"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetRandomPassword",
                "secretsmanager:ListSecrets"
            ],
            "Resource": "*"
        }
    ]
}

The following screenshot shows the details of the IAM role called myFederatedQueryRDS, which contains the MyRDSSecretPolicy policy. It’s the same role that’s supplied in the IAM_ROLE parameter of the CREATE EXTERNAL SCHEMA DDL.

Finally, attach the same IAM role to your Amazon Redshift cluster.

  1. On the Amazon Redshift console, choose your cluster.
  2. From the Actions drop-down menu, choose Manage IAM roles.
  3. Choose and add the IAM role you just created.

You have now completed the following steps:

  1. Create an IAM policy and role
  2. Store your PostgreSQL database credentials in Secrets Manager
  3. Create an Amazon Redshift external schema definition that uses the secret and IAM role to authenticate with a PostgreSQL endpoint
  4. Apply a mapping between an Amazon Redshift database and schema to a PostgreSQL database and schema so Amazon Redshift may issue queries to PostgreSQL tables.

You only need to complete this configuration one time.

Querying live operational data

This section explores another use case: querying operational data across multiple source databases. In this use case, a global online retailer has databases deployed by different teams across distinct geographies:

  • Region us-east-1 runs serverless Aurora PostgreSQL.
  • Region us-west-1 runs provisioned Aurora PostgreSQL, which is also configured as a global database with a read replica in us-east-1.
  • Region eu-west-1 runs an Amazon RDS for PostgreSQL instance with a read replica in us-east-1.

Serverless and provisioned Aurora PostgreSQL and Amazon RDS for PostgreSQL are visible in the Amazon RDS console in Region us-east-1. See the following screenshot:

For this use case, assume that you configured the read replicas for Aurora and Amazon RDS to share the same VPC and subnets in us-east-1 with the local serverless Aurora PostgreSQL. Furthermore, you have already created secrets for each of these instances’ credentials, and also an IAM role MyCombinedRDSSecretPolicy, which is more permissive and allows Amazon Redshift to retrieve the value of any Amazon RDS secret within any Region. Be mindful of security in actual production use, however, and explicitly specify the resource ARNs for each secret in separate statements in your IAM policy. See the following code:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AccessSecret",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetResourcePolicy",
                "secretsmanager:GetSecretValue",
                "secretsmanager:DescribeSecret",
                "secretsmanager:ListSecretVersionIds"
            ],
            "Resource": "arn:aws:secretsmanager:*:555566667777:secret:*"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetRandomPassword",
                "secretsmanager:ListSecrets"
            ],
            "Resource": "*"
        }
    ]
}

External schema DDLs in Amazon Redshift can then reference the combined IAM role and individual secret ARNs. See the following code:

CREATE EXTERNAL SCHEMA IF NOT EXISTS useast
FROM POSTGRES
DATABASE 'dev'
URI 'us-east-1-aurora-pg-serverless.cluster-samplecluster.us-east-1.rds.amazonaws.com'
PORT 5432
IAM_ROLE 'arn:aws:iam::555566667777:role/MyCombinedRDSFederatedQuery'
SECRET_ARN 'arn:aws:secretsmanager:us-east-1:555566667777:secret:MyEastUSAuroraServerlessCredentials-dXOlEq'
;

CREATE EXTERNAL SCHEMA IF NOT EXISTS uswest
FROM POSTGRES
DATABASE 'dev'
URI 'global-aurora-pg-west-coast-stores-instance-1.samplecluster.us-east-1.rds.amazonaws.com'
PORT 5432
IAM_ROLE 'arn:aws:iam::555566667777:role/MyCombinedRDSFederatedQuery'
SECRET_ARN 'arn:aws:secretsmanager:us-west-1:555566667777:secret:MyWestUSAuroraGlobalDBCredentials-p3sV9m'
;

CREATE EXTERNAL SCHEMA IF NOT EXISTS europe
FROM POSTGRES
DATABASE 'dev'
URI 'eu-west-1-postgres-read-replica.samplecluster.us-east-1.rds.amazonaws.com'
PORT 5432
IAM_ROLE 'arn:aws:iam::555566667777:role/MyCombinedRDSFederatedQuery'
SECRET_ARN 'arn:aws:secretsmanager:eu-west-1:555566667777:secret:MyEuropeRDSPostgresCredentials-mz2u9L'
;

This late binding view abstracts the underlying queries to TPC-H lineitem test data within all PostgreSQL instances. See the following code:

CREATE VIEW global_lineitem AS
SELECT 'useast' AS region, * from useast.lineitem
UNION ALL
SELECT 'uswest', * from uswest.lineitem
UNION ALL
SELECT 'europe', * from europe.lineitem
WITH NO SCHEMA BINDING
;

Amazon Redshift can query live operational data across multiple distributed databases and aggregate results into a unified view with this feature. See the following code:

dev=# SELECT region, extract(month from l_shipdate) as month,
      sum(l_extendedprice * l_quantity) - sum(l_discount) as sales
      FROM global_lineitem
      WHERE l_shipdate >= '1997-01-01'
      AND l_shipdate < '1998-01-01'
      AND month < 4
      GROUP BY 1, 2
      ORDER BY 1, 2
;
 region | month |      sales
--------+-------+------------------
 europe |     1 | 16036160823.3700
 europe |     2 | 15089300790.7200
 europe |     3 | 16579123912.6700
 useast |     1 | 16176034865.7100
 useast |     2 | 14624520114.6700
 useast |     3 | 16645469098.8600
 uswest |     1 | 16800599170.4600
 uswest |     2 | 14547930407.7000
 uswest |     3 | 16595334825.9200
(9 rows)

If you examine Remote PG Seq Scan in the following query plan, you see that predicates are pushed down for execution in all three PostgreSQL databases. Unlike your initial simplified ETL use case, no ETL is performed because data is queried and filtered in place. See the following code:

dev=# EXPLAIN SELECT region, extract(month from l_shipdate) as month,
      sum(l_extendedprice * l_quantity) - sum(l_discount) as sales
FROM global_lineitem
WHERE l_shipdate >= '1997-01-01'
AND l_shipdate < '1998-01-01'
AND month < 4
GROUP BY 1, 2
ORDER BY 1, 2
;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 XN Merge  (cost=1000000060145.67..1000000060146.17 rows=200 width=100)
   Merge Key: derived_col1, derived_col2
   ->  XN Network  (cost=1000000060145.67..1000000060146.17 rows=200 width=100)
         Send to leader
         ->  XN Sort  (cost=1000000060145.67..1000000060146.17 rows=200 width=100)
               Sort Key: derived_col1, derived_col2
               ->  XN HashAggregate  (cost=60136.52..60138.02 rows=200 width=100)
                     ->  XN Subquery Scan global_lineitem  (cost=20037.51..60130.52 rows=600 width=100)
                           ->  XN Append  (cost=20037.51..60124.52 rows=600 width=52)
                                 ->  XN Subquery Scan "*SELECT* 1"  (cost=20037.51..20041.51 rows=200 width=52)
                                       ->  XN HashAggregate  (cost=20037.51..20039.51 rows=200 width=52)
                                             ->  XN PG Query Scan lineitem  (cost=0.00..20020.84 rows=1667 width=52)
                                                   ->  Remote PG Seq Scan useast.lineitem  (cost=0.00..20000.00 rows=1667 width=52)
                                                         Filter: ((l_shipdate < '1998-01-01'::date) AND (l_shipdate >= '1997-01-01'::date) AND ("date_part"('month'::text, l_shipdate) < 4))
                                 ->  XN Subquery Scan "*SELECT* 2"  (cost=20037.51..20041.51 rows=200 width=52)
                                       ->  XN HashAggregate  (cost=20037.51..20039.51 rows=200 width=52)
                                             ->  XN PG Query Scan lineitem  (cost=0.00..20020.84 rows=1667 width=52)
                                                   ->  Remote PG Seq Scan uswest.lineitem  (cost=0.00..20000.00 rows=1667 width=52)
                                                         Filter: ((l_shipdate < '1998-01-01'::date) AND (l_shipdate >= '1997-01-01'::date) AND ("date_part"('month'::text, l_shipdate) < 4))
                                 ->  XN Subquery Scan "*SELECT* 3"  (cost=20037.51..20041.51 rows=200 width=52)
                                       ->  XN HashAggregate  (cost=20037.51..20039.51 rows=200 width=52)
                                             ->  XN PG Query Scan lineitem  (cost=0.00..20020.84 rows=1667 width=52)
                                                   ->  Remote PG Seq Scan europe.lineitem  (cost=0.00..20000.00 rows=1667 width=52)
                                                         Filter: ((l_shipdate < '1998-01-01'::date) AND (l_shipdate >= '1997-01-01'::date) AND ("date_part"('month'::text, l_shipdate) < 4))
(24 rows)

Combining the data lake, data warehouse, and live operational data

In this next use case, you join Amazon Redshift Spectrum historical data with current data in Amazon Redshift and live data in PostgreSQL. You use a 3TB TPC-DS dataset and unload data from 1998 through 2001 from the store_sales table in Amazon Redshift to Amazon S3. The unloaded files are stored in Parquet format with ss_sold_date_sk as partitioning key.

To access this historical data via Amazon Redshift Spectrum, create an external table. See the following code:

CREATE EXTERNAL TABLE spectrum.store_sales_historical
(
  ss_sold_time_sk int ,
  ss_item_sk int ,
  ss_customer_sk int ,
  ss_cdemo_sk int ,
  ss_hdemo_sk int ,
  ss_addr_sk int ,
  ss_store_sk int ,
  ss_promo_sk int ,
  ss_ticket_number bigint,
  ss_quantity int ,
  ss_wholesale_cost numeric(7,2) ,
  ss_list_price numeric(7,2) ,
  ss_sales_price numeric(7,2) ,
  ss_ext_discount_amt numeric(7,2) ,
  ss_ext_sales_price numeric(7,2) ,
  ss_ext_wholesale_cost numeric(7,2) ,
  ss_ext_list_price numeric(7,2) ,
  ss_ext_tax numeric(7,2) ,
  ss_coupon_amt numeric(7,2) ,
  ss_net_paid numeric(7,2) ,
  ss_net_paid_inc_tax numeric(7,2) ,
  ss_net_profit numeric(7,2)
)
PARTITIONED BY (ss_sold_date_sk int)
STORED AS PARQUET
LOCATION 's3://mysamplebucket/historical_store_sales/';   

The external spectrum schema is defined as the following:

CREATE EXTERNAL SCHEMA spectrum
FROM data catalog DATABASE 'spectrumdb'
IAM_ROLE 'arn:aws:iam::555566667777:role/mySpectrumRole'
CREATE EXTERNAL DATABASE IF NOT EXISTS;

Instead of an Amazon S3 read-only policy, the IAM role mySpectrumRole contains both AmazonS3FullAccess and AWSGlueConsoleFullAccess policies, in which the former allows Amazon Redshift writes to Amazon S3. See the following code:

UNLOAD ('SELECT * FROM tpcds.store_sales WHERE ss_sold_date_sk < 2452276')
TO 's3://mysamplebucket/historical_store_sales/'
IAM_ROLE 'arn:aws:iam::555566667777:role/mySpectrumRole'
FORMAT AS PARQUET
PARTITION BY (ss_sold_date_sk) ALLOWOVERWRITE;

To make partitioned data visible, the ALTER TABLE ... ADD PARTITION command needs to specify all partition values. For this use case, 2450816 through 2452275 correspond to dates 1998-01-02 through 2001-12-31, respectively. To generate these DDLs quickly, use the following code:

WITH partitions AS (SELECT * FROM generate_series(2450816, 2452275))
SELECT 'ALTER TABLE spectrum.store_sales_historical ADD PARTITION (ss_sold_date_sk='|| generate_series || ') '
    || 'LOCATION \'s3://mysamplebucket/historical_store_sales/ss_sold_date_sk=' || generate_series || '/\';'
FROM partitions;

You can run the generated ALTER TABLE statements individually or as a batch to make partition data visible. See the following code:

ALTER TABLE spectrum.store_sales_historical 
ADD PARTITION (ss_sold_date_sk=2450816)
LOCATION 's3://mysamplebucket/historical_store_sales/ss_sold_date_sk=2450816/';
-- repeated for all partition values

The three combined sources in the following view consist of historical data in Amazon S3 for 1998 through 2001, current data local to Amazon Redshift for 2002, and live data for two months of 2003 in PostgreSQL. When you create this late binding view, you have to re-order Amazon Redshift Spectrum external table columns because the previous UNLOAD operation specifying ss_sold_date_sk as partition key shifted that column’s order to last. See the following code:

CREATE VIEW store_sales_integrated AS
SELECT * FROM uswest.store_sales_live
UNION ALL
SELECT * FROM tpcds.store_sales_current
UNION ALL
SELECT ss_sold_date_sk, ss_sold_time_sk, ss_item_sk, ss_customer_sk, ss_cdemo_sk, 
       ss_hdemo_sk, ss_addr_sk, ss_store_sk, ss_promo_sk, ss_ticket_number, 
       ss_quantity, ss_wholesale_cost, ss_list_price, ss_sales_price, 
       ss_ext_discount_amt, ss_ext_sales_price, ss_ext_wholesale_cost, 
       ss_ext_list_price, ss_ext_tax, ss_coupon_amt, ss_net_paid, 
       ss_net_paid_inc_tax, ss_net_profit
FROM spectrum.store_sales_historical
WITH NO SCHEMA BINDING;

You can now run a query on the view to aggregate date and join tables across the three sources. See the following code:

dev=# SELECT extract(year from b.d_date), count(a.ss_sold_date_sk)
FROM store_sales_integrated a
JOIN tpcds.date_dim b on (a.ss_sold_date_sk = b.d_date_sk)
GROUP BY 1
ORDER BY 1
;
 date_part |   count
-----------+------------
      1998 | 1632403114
      1999 | 1650163390
      2000 | 1659168880
      2001 | 1641184375
      2002 | 1650209644
      2003 |   17994540
(6 rows)

Time: 77624.926 ms (01:17.625)

This following federated query ran on a two-node DC2.8XL cluster and took 1 minute and 17 seconds to join store sales in Amazon S3, PostgreSQL, and Amazon Redshift, with the date dimension table in Amazon Redshift, aggregating and sorting row counts by year:

dev=# EXPLAIN SELECT extract(year from b.d_date), count(a.ss_sold_date_sk)
FROM store_sales_integrated a
JOIN tpcds.date_dim b on (a.ss_sold_date_sk = b.d_date_sk)
GROUP BY 1
ORDER BY 1;

QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 XN Merge  (cost=1461036320912.29..1461036321094.91 rows=73049 width=8)
   Merge Key: "date_part"('year'::text, b.d_date)
   ->  XN Network  (cost=1461036320912.29..1461036321094.91 rows=73049 width=8)
         Send to leader
         ->  XN Sort  (cost=1461036320912.29..1461036321094.91 rows=73049 width=8)
               Sort Key: "date_part"('year'::text, b.d_date)
               ->  XN HashAggregate  (cost=461036314645.93..461036315011.18 rows=73049 width=8)
                     ->  XN Hash Join DS_DIST_ALL_NONE  (cost=913.11..428113374829.91 rows=6584587963204 width=8)
                           Hash Cond: ("outer".ss_sold_date_sk = "inner".d_date_sk)
                           ->  XN Subquery Scan a  (cost=0.00..263498674836.70 rows=6584587963204 width=4)
                                 ->  XN Append  (cost=0.00..197652795204.66 rows=6584587963204 width=4)
                                       ->  XN Subquery Scan "*SELECT* 1"  (cost=0.00..539836.20 rows=17994540 width=4)
                                             ->  XN PG Query Scan store_sales_live  (cost=0.00..359890.80 rows=17994540 width=4)
                                                   ->  Remote PG Seq Scan uswest.store_sales_live  (cost=0.00..179945.40 rows=17994540 width=4)
                                       ->  XN Subquery Scan "*SELECT* 2"  (cost=0.00..33004193.28 rows=1650209664 width=4)
                                             ->  XN Seq Scan on store_sales_current  (cost=0.00..16502096.64 rows=1650209664 width=4)
                                       ->  XN Subquery Scan "*SELECT* 3"  (cost=0.00..197619251175.18 rows=6582919759000 width=4)
                                             ->  XN Partition Loop  (cost=0.00..131790053585.18 rows=6582919759000 width=4)
                                                   ->  XN Seq Scan PartitionInfo of spectrum.store_sales_historical  (cost=0.00..10.00 rows=1000 width=4)
                                                   ->  XN S3 Query Scan store_sales_historical  (cost=0.00..131658395.18 rows=6582919759 width=0)
                                                         ->  S3 Seq Scan spectrum.store_sales_historical location:"s3://mysamplebucket/historical_store_sales" format:PARQUET (cost=0.00..65829197.59 rows=6582919759 width=0)
                           ->  XN Hash  (cost=730.49..730.49 rows=73049 width=8)
                                 ->  XN Seq Scan on date_dim b  (cost=0.00..730.49 rows=73049 width=8)
(23 rows)

The query plan shows these are full sequential scans running on the three source tables with the number of returned rows highlighted, totaling 8.2 billion. Because Amazon Redshift Spectrum does not generate statistics for external tables, you manually set the numRows property to the row count for historical data in Amazon S3. See the following code:

ALTER TABLE spectrum.store_sales_historical SET TABLE PROPERTIES ('numRows' = '6582919759');

You can join with another dimension table local to Amazon Redshift, this time the 30 million row customer table, and filter by column c_birth_country. See the following code:

dev=# SELECT extract(year from b.d_date), count(a.ss_sold_date_sk)
FROM store_sales_integrated a
JOIN tpcds.date_dim b on (a.ss_sold_date_sk = b.d_date_sk)
JOIN tpcds.customer c on (a.ss_customer_sk = c.c_customer_sk)
AND c.c_birth_country = 'UNITED STATES'
GROUP BY 1
ORDER BY 1
;
 date_part |  count
-----------+---------
      1998 | 7299277
      1999 | 7392156
      2000 | 7416905
      2001 | 7347920
      2002 | 7390590
      2003 |   81627
(6 rows)

Time: 77878.586 ms (01:17.879)

QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 XN Merge  (cost=1363288861214.20..1363288861396.83 rows=73049 width=8)
   Merge Key: "date_part"('year'::text, b.d_date)
   ->  XN Network  (cost=1363288861214.20..1363288861396.83 rows=73049 width=8)
         Send to leader
         ->  XN Sort  (cost=1363288861214.20..1363288861396.83 rows=73049 width=8)
               Sort Key: "date_part"('year'::text, b.d_date)
               ->  XN HashAggregate  (cost=363288854947.85..363288855313.09 rows=73049 width=8)
                     ->  XN Hash Join DS_DIST_ALL_NONE  (cost=376252.50..363139873158.03 rows=29796357965 width=8)
                           Hash Cond: ("outer".ss_sold_date_sk = "inner".d_date_sk)
                           ->  XN Hash Join DS_BCAST_INNER  (cost=375339.39..362394963295.79 rows=29796357965 width=4)
                                 Hash Cond: ("outer".ss_customer_sk = "inner".c_customer_sk)
                                 ->  XN Subquery Scan a  (cost=0.00..263498674836.70 rows=6584587963204 width=8)
                                       ->  XN Append  (cost=0.00..197652795204.66 rows=6584587963204 width=8)
                                             ->  XN Subquery Scan "*SELECT* 1"  (cost=0.00..539836.20 rows=17994540 width=8)
                                                   ->  XN PG Query Scan store_sales_live  (cost=0.00..359890.80 rows=17994540 width=8)
                                                         ->  Remote PG Seq Scan uswest.store_sales_live  (cost=0.00..179945.40 rows=17994540 width=8)
                                             ->  XN Subquery Scan "*SELECT* 2"  (cost=0.00..33004193.28 rows=1650209664 width=8)
                                                   ->  XN Seq Scan on store_sales_current  (cost=0.00..16502096.64 rows=1650209664 width=8)
                                             ->  XN Subquery Scan "*SELECT* 3"  (cost=0.00..197619251175.18 rows=6582919759000 width=8)
                                                   ->  XN Partition Loop  (cost=0.00..131790053585.18 rows=6582919759000 width=8)
                                                         ->  XN Seq Scan PartitionInfo of spectrum.store_sales_historical  (cost=0.00..10.00 rows=1000 width=4)
                                                         ->  XN S3 Query Scan store_sales_historical  (cost=0.00..131658395.18 rows=6582919759 width=4)
                                                               ->  S3 Seq Scan spectrum.store_sales_historical location:"s3://mysamplebucket/historical_store_sales" format:PARQUET (cost=0.00..65829197.59 rows=6582919759 width=4)
                                 ->  XN Hash  (cost=375000.00..375000.00 rows=135755 width=4)
                                       ->  XN Seq Scan on customer c  (cost=0.00..375000.00 rows=135755 width=4)
                                             Filter: ((c_birth_country)::text = 'UNITED STATES'::text)
                           ->  XN Hash  (cost=730.49..730.49 rows=73049 width=8)
                                 ->  XN Seq Scan on date_dim b  (cost=0.00..730.49 rows=73049 width=8)
(28 rows)

Query performance hardly changed from the previous query. Because the query only scanned one column (ss_sold_date_sk), it benefits from Parquet’s columnar structure for the historical data subquery. To put it another way, if the historical data is stored as CSV, all the data is scanned, which degrades performance significantly.

Additionally, the TPC-DS model does not store date values in the store_sales fact table. Instead, a foreign key references the date_dim table. If you plan on implementing something similar but frequently filter by a date column, consider adding that column into the fact table and have it as a sort key, and also adding a partitioning column in Amazon Redshift Spectrum. That way, Amazon Redshift can more efficiently skip blocks for local data and prune partitions for Amazon S3 data, in the latter, and also push filtering criteria down to Amazon Redshift Spectrum.

Conclusion

Applications of live data integration in real-world scenarios include data discovery, data preparation for machine learning, operational analytics, IoT telemetry analytics, fraud detection, and compliance and security audits. Whereas Amazon Redshift Spectrum extends the reach of Amazon Redshift into the AWS data lake, Federated Query extends its reach into operational databases and beyond.

For more information about data type differences between these databases, see Data Type Differences Between Amazon Redshift and Supported RDS PostgreSQL or Aurora PostgreSQL Databases. For more information about accessing federated data with Amazon Redshift, see Limitations and Considerations When Accessing Federated Data with Amazon Redshift.

 


About the Authors

Tito Mijares is a Data Warehouse Specialist Solutions Architect at AWS. He helps AWS customers adopt and optimize their use of Amazon Redshift. Outside of AWS, he enjoys playing jazz guitar and working on audio recording and playback projects.

 

 

 

Entong Shen is a Senior Software Development Engineer for AWS Redshift. He has been working on MPP databases for over 8 years and has focused on query optimization, statistics and SQL language features such as stored procedures and federated query. In his spare time, he enjoys listening to music of all genres and working in his succulent garden.

 

 

 

Niranjan Kamat is a software engineer on the Amazon Redshift query processing team. His focus of PhD research was in interactive querying over large databases. In Redshift, he has worked in different query processing areas such as query optimization, analyze command and statistics, and federated querying. In his spare time, he enjoys playing with his three year old daughter, practicing table tennis (was ranked in top 10 in Ohio, USATT rating 2143), and chess.

 

 

Sriram Krishnamurthy is a Software Development Manager for AWS Redshift Query Processing team. He is passionate about Databases and has been working on Semi Structured Data Processing and SQL Compilation & Execution for over 15 years. In his free time, you can find him on the tennis court, often with his two young daughters in tow.

A public data lake for analysis of COVID-19 data

Post Syndicated from AWS Data Lake Team original https://aws.amazon.com/blogs/big-data/a-public-data-lake-for-analysis-of-covid-19-data/

As the COVID-19 pandemic continues to threaten and take lives around the world, we must work together across organizations and scientific disciplines to fight this disease. Innumerable healthcare workers, medical researchers, scientists, and public health officials are already on the front lines caring for patients, searching for therapies, educating the public, and helping to set policy. At AWS, we believe that one way we can help is to provide these experts with the data and tools needed to better understand, track, plan for, and eventually contain and neutralize the virus that causes COVID-19.

Today, we are making a public AWS COVID-19 data lake available – a centralized repository of up-to-date and curated datasets on or related to the spread and characteristics of the novel corona virus (SARS-CoV-2) and its associated illness, COVID-19. Globally, there are several efforts underway to gather this data, and we are working with partners to make this crucial data freely available and keep it up-to-date. Hosted on the AWS cloud, we have seeded our curated data lake with COVID-19 case tracking data from Johns Hopkins and The New York Times, hospital bed availability from Definitive Healthcare, and over 45,000 research articles about COVID-19 and related coronaviruses from the Allen Institute for AI. We will regularly add to this data lake as other reliable sources make their data publicly available.

The breakthroughs that can win the battle against this disease arrive faster when it’s easy for everyone to access and experiment with this vital information. The AWS COVID-19 data lake allows experimenters to quickly run analyses on the data in place without wasting time extracting and wrangling data from all the available data sources. They can use AWS or third-party tools to perform trend analysis, do keyword search, perform question/answer analysis, build and run machine learning models, or run custom analyses to meet their specific needs. Since every stakeholder in this battle brings their own perspective, users can choose to work with the public data lake, combine it with their own data, or subscribe to the source datasets directly through AWS Data Exchange.

We imagine local health authorities could build dashboards to track infections and collaborate to efficiently deploy vital resources like hospital beds and ventilators. Or epidemiologists could complement their own models and datasets to generate better forecasts of hotspots and trends.

For example, at Chan Zuckerberg Biohub, a nonprofit where leaders in science and technology collaborate to cure, prevent, or manage disease, scientists are using the AWS COVID-19 data lake for new epidemiological insights. “Our team of researchers is now analyzing trends in disease spread, its geography, and time evolution by leveraging datasets from the AWS COVID-19 data lake, combined with our own data, in order to better predict COVID epidemiology,” said Jim Karkanias, Vice President of Data Science and Information Technology at Chan Zuckerberg Biohub.

This post walks you through examples of how to use the AWS COVID-19 data lake for analysis. This data lake is comprised of data in a publicly readable Amazon S3 bucket (s3://covid19-lake). The post shows how to set up the definitions for that data in an AWS Glue Data Catalog to expose it to analytics engines. You can then query the AWS COVID-19 data lake with Amazon Athena, a serverless SQL query engine.

Prerequisites

This post assumes you have the following:

  • Access to an AWS account
  • Permissions to create an AWS CloudFormation stack
  • Permissions to create AWS Glue resources (catalog databases and tables)

Configuring access to the data using a CloudFormation template

To make the data from the AWS COVID-19 data lake available in the Data Catalog in your AWS account, create a CloudFormation stack using the following template. If you are signed in to your AWS account, the following link fills out most of the stack creation form for you. All you need to do is choose Create stack. For instructions on creating a CloudFormation stack, see Get Started in the Cloud Formation documentation.

This template creates a covid-19 database in your Data Catalog and tables that point to the public AWS COVID-19 data lake. You do not need to host the data in your account, and you can rely on AWS to refresh the data as datasets are updated through AWS Data Exchange.

Exploring the data through the Data Catalog in your AWS account

When the CloudFormation stack shows a status of CREATE_COMPLETE, access the Glue Data Catalog to see the tables that the template created. You should see the following tables:

  • Global Coronavirus (COVID-19) Data – Tracks confirmed COVID-19 cases in provinces, states, and countries across the world with a breakdown to the county level in the US.

 

Table NameDescriptionSourceProvider
enigma_jhuConfirmed COVID-19 casesJohns HopkinsEnigma

 

 

Table NameDescriptionSourceProvider
nytimes_statesData on COVID-19 cases at US state levelNY TimesRearc
nytimes_countiesData on COVID-19 cases at US county level

 

 

Table NameDescriptionSourceProvider
covid_testing_states_dailyUSA total test daily trend by stateCOVID Tracking ProjectRearc
covid_testing_us_dailyUSA total test daily trend
covid_testing_us_totalUSA total tests

 

 

Table NameDescriptionSourceProvider
hospital_bedsHospital beds and their utilization in the USDefinitive HealthcareRearc

 

 

Table NameDescriptionSource/Provider
alleninstitute_metadataMetadata on papers pulled from the CORD-19 dataset. The sha column indicates the paper ID, which is the file name of the paper in the data lake.Allen Institute for AI
alleninstitute_comprehend_medicalResults from Amazon Comprehend Medical run against the CORD-19 dataset.

 

  • Lookup tables to support visualizations.

 

Table NameDescription
country_codesLookup table for country codes
county_populationsLookup table for the population for each county based on recent census data
us_state_abbreviationsLookup table for US state abbreviations

In addition, you can see descriptions of the columns in these tables. For example, the following screenshot shows the metadata of the table containing COVID-19 cases from Johns Hopkins.

Querying data via Amazon Athena

This section demonstrates how to query these tables using Athena. Athena is a serverless interactive query service that makes it easy to analyze the data in the AWS COVID19 data lake. Athena supports SQL, a common language that data analysts use for analyzing structured data. To query the data, complete the following steps:

  1. Sign in to the Athena console.

If this is the first time you are using Athena, you must specify a query result location on Amazon S3.

  1. From the drop-down menu, choose the covid-19 database.
  2. Enter your query.

The following query returns the growth of confirmed cases for the past 7 days joined side-by-side with hospital bed availability, broken down by US county:

SELECT 
  cases.fips, 
  admin2 as county, 
  province_state, 
  confirmed,
  growth_count, 
  sum(num_licensed_beds) as num_licensed_beds, 
  sum(num_staffed_beds) as num_staffed_beds, 
  sum(num_icu_beds) as num_icu_beds
FROM 
  "covid-19"."hospital_beds" beds, 
  ( SELECT 
      fips, 
      admin2, 
      province_state, 
      confirmed, 
      last_value(confirmed) over (partition by fips order by last_update) - first_value(confirmed) over (partition by fips order by last_update) as growth_count,
      first_value(last_update) over (partition by fips order by last_update desc) as most_recent,
      last_update
    FROM  
      "covid-19"."enigma_jhu" 
    WHERE 
      from_iso8601_timestamp(last_update) > now() - interval '7' day AND country_region = 'US') cases
WHERE 
  beds.fips = cases.fips AND last_update = most_recent
GROUP BY cases.fips, confirmed, growth_count, admin2, province_state
ORDER BY growth_count desc

The following screenshot shows the results of this query.

Athena also allows you to run these queries through REST APIs, for example, for building your own visualizations. Moreover, Athena is just one of the many engines that you can use on the data lake. For example, you can use Amazon Redshift Spectrum to join lake data with other datasets in your Redshift data warehouse, or use Amazon QuickSight to visualize your datasets.

We have also created a public Amazon QuickSight dashboard from the COVID-19 case tracking data, testing data, and hospital bed data. You can track daily updates with this dashboard. You can also drill-down to see breakdowns by country, province, and county without having to write a line of SQL. The following is a recent screenshot of the dashboard.

CORD-19 research articles

The CORD-19 dataset is a collection of metadata and full-text of research articles about COVID-19, SARS-CoV-2, and related coronaviruses. You can index this data with Amazon Kendra for question/answer exploration, or enrich the data with Amazon Comprehend Medical. We have already done the latter and put it in the table called alleninstitute_comprehend_medical.

The alleninsitute_metadata table provides detailed fields for each paper, such as the title, authors, journal, and URL. The alleninstitute_comprehend_medical table contains key medical concepts such as medical condition, medication, dosage, strength, and frequency. With this metadata, you can quickly query over concepts, analyze or aggregate over authors and journals, and locate papers.

Aggregating over journals

Using IL-6 inhibitors is a possible therapy for COVID-19, and clinical trials are underway. To demonstrate how to use these tables, this post presents a use case in which you want to understand which journals discuss IL-6 the most by counting the papers they published. You can do this by running the following query:

SELECT m.journal,
       count(distinct(cm.paper_id)) as paper_count
FROM "covid-19".alleninstitute_metadata m
JOIN "covid-19".alleninstitute_comprehend_medical cm
    ON (contains(split(m.sha, '; '), cm.paper_id))
WHERE contains(generic_name, 'IL-6')
GROUP BY  m.journal
ORDER BY paper_count desc

The following screenshot shows an example of the results. The data provider updates this dataset over time, so your results may look different (here, we notice that the second highest count has no journal information).

Drilling down into papers

To see the URLs and the titles of the papers in one of these journals, you simply query both these tables again. For example, to drill into IL-6 related papers in the Crit Care journal, enter the following query:

SELECT distinct m.url, m.title
FROM "covid-19".alleninstitute_metadata m
JOIN "covid-19".alleninstitute_comprehend_medical cm
    ON (contains(split(m.sha, '; '), cm.paper_id))
WHERE contains(generic_name, 'IL-6')
      AND m.journal = 'Crit Care'

The following screenshot shows an example of the results.

These examples are a few of the innumerable analyses you can run on the public data lake. You incur no additional cost for accessing the AWS COVID-19 data lake beyond the standard charges for the AWS services that you use. For example, if you use Athena, you will incur the costs for running queries and the data storage in the S3 query result location, but incur no costs for accessing the data lake. In addition, if you want this data in raw form, you can subscribe to, download, and stay up-to-date through AWS Data Exchange. We encourage you to try using the public AWS COVID-19 data lake yourself.

Conclusion

Combining our efforts across organizations and scientific disciplines can help us win the fight against the COVID-19 pandemic. With the AWS COVID-19 data lake, anyone can experiment with and analyze curated data related to the disease, as well as share their own data and results. We believe that through an open and collaborative effort that combines data, technology, and science, we can inspire insights and foster breakthroughs necessary to contain, curtail, and ultimately cure COVID-19.

For daily updates on how AWS is addressing the crisis, see Amazon’s COVID-19 blog.

 


About the Authors

The AWS Data Lake Team members are Roy Ben-Alta, Jason Berkowitz, Chris Casey, Patrick Combes, Lucy Friedmann, Fred Lee, Megan Maxwell, Rourke McNamara, Herain Oberoi, Stephen Orban, Brian Ross, Nikki Rouda, Noah Schwartz, Noritaka Sekiyama, Mehul A. Shah, Ben Snively, and Ying Wang.

Simplify your Spark dependency management with Docker in EMR 6.0.0

Post Syndicated from Paul Codding original https://aws.amazon.com/blogs/big-data/simplify-your-spark-dependency-management-with-docker-in-emr-6-0-0/

Apache Spark is a powerful data processing engine that gives data analyst and engineering teams easy to use APIs and tools to analyze their data, but it can be challenging for teams to manage their Python and R library dependencies. Installing every dependency that a job may need before it runs and dealing with library version conflicts is time-consuming and complicated. Amazon EMR 6.0.0 simplifies this by allowing you to use Docker images from Docker Hub and Amazon ECR to package your dependencies. This allows you to package and manage your dependencies for individual Spark jobs or notebooks, without having to manage a spiderweb of dependencies across your cluster.

This post shows you how to use Docker to manage notebook dependencies with Amazon EMR 6.0.0 and EMR Notebooks. You will launch an EMR 6.0.0 cluster and use notebook-specific Docker images from Amazon ECR with your EMR Notebook.

Creating a Docker image

The first step is to create a Docker image that contains Python 3 and the latest version of the numpy Python package. You create Docker images by using a Dockerfile, which defines the packages and configuration to include in the image. Docker images used with Amazon EMR 6.0.0 must contain a Java Development Kit (JDK); the following Dockerfile uses Amazon Linux 2 and the Amazon Corretto JDK 8:

FROM amazoncorretto:8

RUN yum -y update
RUN yum -y install yum-utils
RUN yum -y groupinstall development

RUN yum list python3*
RUN yum -y install python3 python3-dev python3-pip python3-virtualenv

RUN python -V
RUN python3 -V

ENV PYSPARK_DRIVER_PYTHON python3
ENV PYSPARK_PYTHON python3

RUN pip3 install --upgrade pip
RUN pip3 install numpy 

RUN python3 -c "import numpy as np"

You will use this Dockerfile to create a Docker image, and then tag and upload it to Amazon ECR. After you upload it, you will launch an EMR 6.0.0 cluster that is configured to use this Docker image as the default image for Spark jobs. Complete the following steps to build, tag, and upload your Docker image:

  1. Create a directory and a new file named Dockerfile using the following commands:
    $ mkdir pyspark-latest
    $ vi pyspark-latest/Dockerfile

  2. Copy, and then paste the contents of the Dockerfile, save it, and run the following command to build a Docker image:
    $ docker build -t 'local/pyspark-latest' pyspark-latest/

  3. Create the emr-docker-examples Amazon ECR repository for this walkthrough using the following command:
    $ aws ecr create-repository --repository-name emr-docker-examples

  4. Tag the locally built image and replace 123456789123.dkr.ecr.us-east-1.amazonaws.com with your Amazon ECR endpoint, using the following command:
    $ docker tag local/pyspark-latest 123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:pyspark-latest

    Before you can push the Docker image to Amazon ECR, you need to log in.

  5. To get the login line for your Amazon ECR account, use the following command:
    $ aws ecr get-login --region us-east-1 --no-include-email

  6. Enter and run the output from the get-login command:
    $ docker login -u AWS -p <password> https://123456789123.dkr.ecr.us-east-1.amazonaws.com 

  7. Upload the locally built image to Amazon ECR and replace 123456789123.dkr.ecr.us-east-1.amazonaws.com with your Amazon ECR endpoint. See the following command:
    $ docker push 123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:pyspark-latest

After this push is complete, the Docker image is available to use with your EMR cluster.

Launching an EMR 6.0.0 cluster with Docker enabled

To use Docker with Amazon EMR, you must launch your EMR cluster with Docker runtime support enabled and have the right configuration in place to connect to your Amazon ECR account. To allow your cluster to download images from Amazon ECR, makes sure the instance profile for your cluster has the permissions from the AmazonEC2ContainerRegistryReadOnly managed policy associated with it. The configuration in the first step below configures your EMR 6.0.0 cluster to use Amazon ECR to download Docker images, and configures Apache Livy and Apache Spark to use the pyspark-latest Docker image as the default Docker image for all Spark jobs. Complete the following steps to launch your cluster:

  1. Create a file named emr-configuration.json in the local directory with the following configuration (replace 123456789123.dkr.ecr.us-east-1.amazonaws.com with your Amazon ECR endpoint):
    [
       {
          "Classification":"container-executor",
          "Properties":{
    
          },
          "Configurations":[
             {
                "Classification":"docker",
                "Properties":{
                   "docker.privileged-containers.registries":"local,centos,123456789123.dkr.ecr.us-east-1.amazonaws.com",
                   "docker.trusted.registries":"local,centos,123456789123.dkr.ecr.us-east-1.amazonaws.com"
               }
             }
          ]
       },
       {
          "Classification":"livy-conf",
          "Properties":{
             "livy.spark.master":"yarn",
             "livy.spark.deploy-mode":"cluster",
             "livy.server.session.timeout":"16h"
          }
       },
       {
          "Classification":"hive-site",
          "Properties":{
             "hive.execution.mode":"container"
          }
       },
       {
          "Classification":"spark-defaults",
          "Properties":{
             "spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE":"docker",
             "spark.yarn.am.waitTime":"300s",
             "spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE":"docker",
             "spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG":"hdfs:///user/hadoop/config.json",
             "spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE":"123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:pyspark-latest",
             "spark.executor.instances":"2",
             "spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG":"hdfs:///user/hadoop/config.json",
             "spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE":"123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:pyspark-latest"
          }
       }
    ]

    You will use that configuration to launch your EMR 6.0.0 cluster using the AWS CLI.

  2. Enter the following commands (replace myKey with the name of the EC2 key pair you use to access the cluster using SSH, and subnet-1234567 with the subnet ID the cluster should be launched in):
    aws emr create-cluster \
    --name 'EMR 6.0.0 with Docker' \
    --release-label emr-6.0.0 \
    --applications Name=Livy Name=Spark \
    --ec2-attributes "KeyName=myKey,SubnetId=subnet-1234567" \
    --instance-type m5.xlarge --instance-count 3 \
    --use-default-roles \
    --configurations file://./emr-configuration.json

    After the cluster launches and is in the Waiting state, make sure that the cluster hosts can authenticate themselves to Amazon ECR and download Docker images.

  3. Use your EC2 key pair to SSH into one of the core nodes of the cluster.
  4. To generate the Docker CLI command to create the credentials (valid for 12 hours) the cluster uses to download Docker images from Amazon ECR, enter the following command:
    $ aws ecr get-login --region us-east-1 --no-include-email

  5. Enter and run the output from the get-login command:
    $ sudo docker login -u AWS -p <password> https://123456789123.dkr.ecr.us-east-1.amazonaws.com 

    This command generates a config.json file in the /root/.docker folder.

  6. Place the generated config.json file in the HDFS location /user/hadoop/ using the following command:
    $ sudo hdfs dfs -put /root/.docker/config.json /user/hadoop/

Now that you have an EMR cluster that is configured with Docker and an image in Amazon ECR, you can use EMR Notebooks to create and run your notebook.

Creating an EMR Notebook

EMR Notebooks are serverless Jupyter notebooks available directly through the Amazon EMR console. They allow you to separate your notebook environment from your underlying cluster infrastructure, and access your notebook without spending time setting up SSH access or configuring your browser for port-forwarding. You can find EMR Notebooks in the left-hand navigation of the EMR console.

To create your notebook, complete the following steps:

  1. Click on Notebooks in the EMR Console
  2. Choose a name for your notebook
  3. Click Choose an existing cluster and select the cluster you just created
  4. Click Create notebook
  5. Once your notebook is in a Ready status, you can click the Open in JupyterLab button to open it in a new browser tab. A default notebook with the name of your EMR Notebook is created by default. When you click on that notebook, you’ll be asked to choose a Kernel. Choose PySpark.
  6. Enter the following configuration into the first cell in your notebook and click ▸(Run):
    %%configure -f
    {"conf": { "spark.pyspark.virtualenv.enabled": "false" }}

  7. Enter the following PySpark code into your notebook and click ▸(Run):
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("docker-numpy").getOrCreate()
    sc = spark.sparkContext
    
    import numpy as np
    a = np.arange(15).reshape(3, 5)
    print(a)
    print(np.__version__)

The output should look like the following screenshot; the numpy version in use is the latest (at the time of this writing, 1.18.2).

This PySpark code was run on your EMR 6.0.0 cluster using YARN, Docker, and the pyspark-latest image that you created. EMR Notebooks connect to EMR clusters using Apache Livy. The configuration specified in emr-configuration.json configured your EMR cluster’s Spark and Livy instances to use Docker and the pyspark-latest Docker image as the default Docker image for all Spark jobs submitted to this cluster. This allows you to use numpy without having to install it on any cluster nodes. The following section looks at how you can create and use different Docker images for specific notebooks.

Using a custom Docker image for a specific notebook

Individual workloads often require specific versions of library dependencies. To allow individual notebooks to use their own Docker images, you first create a new Docker image and push it to Amazon ECR. You then configure your notebook to use this Docker image instead of the default pyspark-latest image.

Complete the following steps:

  1. Create a new Dockerfile with a specific version of numpy: 1.17.5.
    FROM amazoncorretto:8
    
    RUN yum -y update
    RUN yum -y install yum-utils
    RUN yum -y groupinstall development
    
    RUN yum list python3*
    RUN yum -y install python3 python3-dev python3-pip python3-virtualenv
    
    RUN python -V
    RUN python3 -V
    
    ENV PYSPARK_DRIVER_PYTHON python3
    ENV PYSPARK_PYTHON python3
    
    RUN pip3 install --upgrade pip
    RUN pip3 install 'numpy==1.17.5'
    
    RUN python3 -c "import numpy as np"

  2. Create a directory and a new file named Dockerfile using the following commands:
    $ mkdir numpy-1-17
    $ vi numpy-1-17/Dockerfile

  3. Enter the contents of your new Dockerfile and the following code to build a Docker image:
    $ docker build -t 'local/numpy-1-17' numpy-1-17/

  4. Tag and upload the locally built image to Amazon ECR and replace 123456789123.dkr.ecr.us-east-1.amazonaws.com with your Amazon ECR endpoint using the following commands:
    $ docker tag local/numpy-1-17 123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:numpy-1-17 
    $ docker push 123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:numpy-1-17

    Now that the numpy-1-17 Docker image is available in Amazon ECR, you can use it with a new notebook.

  1. Create a new Notebook by returning to your EMR Notebook and click File, New, Notebook, and choose the PySpark kernel.To tell your EMR Notebook to use this specific Docker image instead of the default, you need to use the following configuration parameters.
  1. Enter the following code in your notebook (replace 123456789123.dkr.ecr.us-east-1.amazonaws.com with your Amazon ECR endpoint) and choose ▸(Run):
     %%configure -f
    {"conf": 
     { 
      "spark.pyspark.virtualenv.enabled": "false",
      "spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE": "123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:numpy-1-17",
      "spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE":"123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:numpy-1-17"
     }
    }

    To check if your PySpark code is using version 1.17.5, enter the same PySpark code as before to use numpy and output the version.

  1. Enter the following code into your notebook and choose Run:
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("docker-numpy").getOrCreate()
    sc = spark.sparkContext
    
    import numpy as np
    a = np.arange(15).reshape(3, 5)
    print(a)
    print(np.__version__)

The output should look like the following screenshot; the numpy version in use is the version you installed in your numpy-1-17 Docker image: 1.17.5.

Summary

This post showed you how to simplify your Spark dependency management using Amazon EMR 6.0.0 and Docker. You created a Docker image to package your Python dependencies, created a cluster configured to use Docker, and used that Docker image with an EMR Notebook to run PySpark jobs. To find out more about using Docker images with EMR, refer to the EMR documentation on how to Run Spark Applications with Docker Using Amazon EMR 6.0.0. Stay tuned for additional updates on new features and further improvements with Apache Spark on Amazon EMR.

 


About the Author

Paul Codding is a senior product manager for EMR at Amazon Web Services.

 

 

 

 

Suthan Phillips is a big data architect at AWS. He works with customers to provide them architectural guidance and helps them achieve performance enhancements for complex applications on Amazon EMR. In his spare time, he enjoys hiking and exploring the Pacific Northwest.

 

 

Apache Hive is 2x faster with Hive LLAP on EMR 6.0.0

Post Syndicated from Suthan Phillips original https://aws.amazon.com/blogs/big-data/apache-hive-is-2x-faster-with-hive-llap-on-emr-6-0-0/

Customers use Apache Hive with Amazon EMR to provide SQL-based access to petabytes of data stored on Amazon S3. Amazon EMR 6.0.0 adds support for Hive LLAP, providing an average performance speedup of 2x over EMR 5.29, with up to 10x improvement on individual Hive TPC-DS queries. This post shows you how to enable Hive LLAP, and outlines the performance gains we’ve observed using queries from the TPC-DS benchmark.

Twice as fast compared to Amazon EMR 5.29.0

To evaluate the performance benefits of running Hive with Amazon EMR release 6.0.0, we’re using 70 TCP-DS queries with a 3 TB Apache Parquet dataset on a six-node c4.8xlarge EMR cluster to compare the total runtime and geometric mean with results from EMR release 5.29.0.

The results show that the TPC-DS queries run twice as fast in Amazon EMR 6.0.0 (Hive 3.1.2) compared to Amazon EMR 5.29.0 (Hive 2.3.6) with the default Amazon EMR Hive configuration.

The following graph shows performance improvements measured as total runtime for 70 TPC-DS queries. Amazon EMR 6.0.0 has the better (lower) runtime.

The following graph shows performance improvements measured as geometric mean for 70 TPC-DS queries. Amazon EMR 6.0.0 has the better (lower) geometric mean.

The following graph shows the performance improvements on a per-query basis sorted by highest performance gain. In this comparison, the higher numbers are better.

Hive Live Long and Process (LLAP)

Hive LLAP enhances the execution model of Hive, using persistent daemons with dynamic in-memory caching for low-latency queries. These daemons run on the core and task nodes in EMR clusters, caching data and metadata, and avoid the container startup overhead of traditional Hive queries because they are long lived processes.

By default, LLAP daemons do not start as part of EMR cluster start-up. You can enable LLAP in Amazon EMR 6.0.0 with the following hive configuration:

[
  {
    "Classification": "hive",
    "Properties": {
      "hive.llap.enabled": "true"
    }
  }
]

Since Hive LLAP uses persistent daemons that run on YARN, a percentage of the EMR cluster’s YARN resources will be reserved for Hive LLAP daemons when LLAP is enabled. You can override the following properties, which are predefined/calculated by EMR, using the hive configuration when launching an EMR cluster.

PropertyDescriptionDefault
hive.llap.num-instancesDefines the number of LLAP instances to run on the EMR cluster. There can be at max one LLAP instance per Node manager node (Core/Task).Number of Core/Task nodes in the cluster
hive.llap.percent-allocationDefines percentage of YARN NodeManager resources allocated to LLAP instance. This only applies to nodes running an LLAP instance defined by hive.llap.num-instances.0.6 (60%)

For example, to define 80% of YARN NodeManager resources to LLAP, use the following configuration:

 [
      {
        "classification": "hive",
        "properties": {
          "hive.llap.percent-allocation": "0.8",
          "hive.llap.enabled": "true"
        },
        "configurations": []
      }
    ]

You can override the following properties which are predefined/calculated by EMR using hive-site classification when launching an EMR cluster.

PropertyDescription
hive.llap.daemon.yarn.container.mbYARN container size for each LLAP daemon
hive.llap.daemon.memory.per.instance.mbLLAP Xmx
hive.llap.io.memory.sizeSize of LLAP IO cache in an LLAP daemon
hive.llap.daemon.num.executorsNumber of executors (tasks that can execute in parallel) per LLAP daemon

For more details on configuring Hive LLAP in Amazon EMR 6.0.0, please refer to Using Hive LLAP.

Resizing LLAP Daemons

You can modify the number of LLAP instances using the YARN CLI. Here’s a sample command:

$ yarn app -flex <Application Name> -component llap -1

  • Application Name – llap0 (Default)

You can check the status of the Hive LLAP daemons with the following command:

$ hive --service llapstatus

Hive LLAP Web Services

The Hive LLAP Monitor listens on port 15002 in each core and task node running the LLAP daemon.

The following table provides an overview of the Web Services available in LLAP.

URIDescription
http://coretask-public-dns-name:15002Shows the overview of heap, cache, executor and system metrics
http://coretask-public-dns-name:15002/confShows the current LLAP configuration
http://coretask-public-dns-name:15002/peersShows the details of LLAP nodes in the cluster extracted from the Zookeeper server
http://coretask-public-dns-name:15002/iomemShows details about the cache contents and usage
http://coretask-public-dns-name:15002/jmxShows the LLAP daemon’s JVM metrics
http://coretask-public-dns-name:15002/stacksShows JVM stack traces of all threads
http://coretask-public-dns-name:15002/conflogShows the current log levels
http://coretask-public-dns-name:15002/statusShows the status of LLAP

 Hive Performance (LLAP vs Container)

In EMR 6.0.0, Hive LLAP is optional and all Hive queries are executed using dynamically allocated containers when Hive LLAP is disabled. To illustrate the differences of running Hive queries with persistent Hive LLAP daemons versus dynamically allocated containers, we’ve used a subset of the TCP-DS benchmark. The results show an overall performance improvement of 27%, with some queries sped up by up to 4x.

The following graph shows performance improvements measured as total runtime for 50 TPC-DS queries. Amazon EMR 6.0.0 using LLAP has the better (lower) runtime.

The following graph shows performance improvements measured as geometric mean for 50 TPC-DS queries. Amazon EMR 6.0.0 using LLAP has the better (lower) runtime.

The following graph shows the performance improvements on a per-query basis sorted by highest performance gain. In this comparison, the higher numbers are better.

Summary

This post demonstrated the performance improvement of Hive on Amazon EMR 6.0.0 in comparison to the previous Amazon EMR 5.29 release. This improvement in performance helps to reduce query runtime and cost. You also learned about to use Hive LLAP with Amazon EMR 6.0.0, how to configure it, how to view the status and metrics using LLAP Monitor, and saw the performance gains when Hive LLAP is enabled. Stay tuned for additional updates on new features and further improvements in Apache Hive on Amazon EMR.

 


About the Author

Suthan Phillips is a big data architect at AWS. He works with customers to provide them architectural guidance and helps them achieve performance enhancements for complex applications on Amazon EMR. In his spare time, he enjoys hiking and exploring the Pacific Northwest.

 

 

Expertise validation in AWS data analytics with AWS Certification

Post Syndicated from Beth Shepherd original https://aws.amazon.com/blogs/big-data/expertise-validation-in-aws-data-analytics-with-aws-certification/

AWS Training and Certification now has a new exam version for the AWS Certified Data Analytics – Specialty certification, which validates expertise in designing, building, and maintaining analytics solutions that are efficient, cost-effective, and secure.

The new exam version includes updated content across all domains: collection, storage and data management, processing, analysis and visualization, and security. Earning AWS Certified Data Analytics – Specialty shows that you meet the standard set by AWS data analytics experts.

This certification was previously named AWS Certified Big Data – Specialty. The new name highlights the breadth of the data and analytics technical skills and experience that this certification validates.

This credential helps your organization innovate faster by finding and developing validated expertise, so you can be confident that your team has both the breadth and depth to deliver insights from data. Rob Koch, lead architect at S&P Global, says, “Unlocking the value behind data is crucial to any organization. It is important to strategize our abilities to access and analyze data for a specific purpose, be it for a new product, enhancing an existing product, or discovering a new connection between two disparate datasets.” Koch is also an AWS Data Hero, which is a worldwide group of IT leaders and educators with a shared passion for analytics, database, and blockchain technologies.

AWS Certifications highlight your role-based and specific technical skills to help you stand out and get noticed by colleagues and organizational leaders. AWS Certified Data Analytics – Specialty shows your ability to define AWS data analytics services and understand how they work together throughout the data life cycle. Koch, who has earned Foundational, Associate-level, and Specialty AWS Certifications, encourages professionals with expertise in data analytics services to pursue this credential. Koch says, “This certification will give your peers the confidence of working with someone who can help provide ideal architectural scenarios leveraging the right cloud technologies, while meeting business needs.”

Who should take the exam?

Candidates should have 2 years of hands-on experience working with AWS to design, build, secure, and maintain analytics solutions, and 5 years of general experience working with data analytics technology. For a comprehensive list of recommended knowledge and experience, see the AWS Certified Data Analytics – Specialty exam guide.

How should I prepare for the exam?

If you’re new to data analytics on AWS or looking to add to your skills, AWS offers resources to help you build expertise and prepare for AWS Certification exams, such as recommended AWS whitepapers and FAQs. For more information, see Prepare for Your AWS Certification Exam. You can also follow the AWS Data Analytics Learning Path, which includes the 3-day classroom course Big Data on AWS and the digital courses Data Analytics Fundamentals and Exam Readiness: AWS Certified Data Analytics – Specialty.

Resources for organizations

Are you a leader within your organization looking to challenge your team to meet the standard set by AWS experts? AWS Training and Certification offers the resources you need to develop your team, innovate in the cloud, and transform your organization. You can take advantage of comprehensive training plans and AWS certification exam vouchers for your team.

Getting AWS certified

As of this writing, you can schedule the AWS Certified Data Analytics – Specialty certification exam to take from your home or office, or at available testing centers worldwide, for $300. A practice exam is also available for $40.

Learn more about this certification, or schedule your exam today.


About the Authors

Beth Shepherd is the Product Marketing Manager for AWS Certification. She joined Amazon in 2019 and is based in Boston.

 

 

 

Imtiaz (Taz) Sayed is the World Wide Tech Leader for Data Analytics at AWS.

 

 

 

 

Speeding up Etleap models at AXS with Amazon Redshift materialized views

Post Syndicated from Christian Romming original https://aws.amazon.com/blogs/big-data/speeding-up-etleap-models-at-axs-with-amazon-redshift-materialized-views/

The materialized views feature in Amazon Redshift is now generally available and has been benefiting customers and partners in preview since December 2019. One customer, AXS, is a leading ticketing, data, and marketing solutions provider for live entertainment venues in the US, UK, Europe, and Japan. Etleap, an Amazon Redshift partner, is an extract, transform, load, and transform (ETLT) service built for AWS. AXS uses Etleap to ingest data into Amazon Redshift from a variety of sources, including file servers, Amazon S3, relational databases, and applications. These ingestion pipelines parse, structure, and load data into Amazon Redshift tables with appropriate column types and sort and distribution keys.

Improving dashboard performance with Etleap models

To analyze data, AXS typically runs queries against large tables that originate from multiple sources. One of the ways that AXS uses Amazon Redshift is to power interactive dashboards. To achieve fast dashboard load times, AXS pre-computes partial answers to the queries dashboards use. These partial answers are orders of magnitude smaller in terms of the number of rows than the tables on which they are based. Dashboards can load much faster than they would if they were querying the base tables directly by querying Amazon Redshift tables that hold the pre-computed partial answers.

Etleap supports creating and managing such pre-computations through a feature called models. A model consists of a SELECT query and triggers for when it should be updated. An example of a trigger is a change to a base table, that is, a table the SELECT statement uses that defines the model. This way, the model can remain consistent with its base tables.

The following screenshot shows an Etleap model with two base table dependencies.

Etleap represents their models as tables in Amazon Redshift. To create the model table, Etleap wraps the SELECT statement in a CREATE TABLE AS (CTAS) query. When an update is triggered, for example, due to base table inserts, updates, or deletes, Etleap recomputes the model table through the following code:

CREATE TABLE model_temporary AS SELECT …
DROP TABLE model;
RENAME TABLE model_temporary TO model;

Analyzing CTAS performance as data grows

AXS manages a large number of Etleap models. For one particular model, the CTAS query takes over 6 minutes, on average. This query performs an aggregation on a join of three different tables, including an event table that is constantly ingesting new data and contains over a billion rows. The following graph shows that the CTAS query time increases as the event table increases in number of rows.

There are two key problems with the query taking longer:

  • There’s a longer delay before the updated model is available to analysts
  • The model update consumes more Amazon Redshift cluster resources

To address this, AXS would have to resort to workarounds that are either inconvenient or costly, such as archiving older data from the event table or expanding the Amazon Redshift cluster to increase available resources.

Comparing CTAS to materialized views

Etleap decided to run an experiment to verify that Amazon Redshift’s materialized views feature is an improvement over the CTAS approach for this AXS model. First, they built the materialized view by wrapping the SELECT statement in a CREATE MATERIALIZED VIEW AS query. For updates, instead of recreating the materialized view every time that data in a base table changes, a REFRESH MATERIALIZED VIEW query is sufficient. The expectation was that using materialized views would be significantly faster than the CTAS-based procedure. The following graph compares query times of CTAS to materialized view refresh.

Running REFRESH MATERIALIZED VIEW was 7.9 times faster than the CTAS approach—it took 49 seconds instead of 371 seconds on average at the current scale. Additionally, the update time was roughly proportional to the number of rows that were added to the base table since the last update, rather than the total size of the base table. In this use case, this number is 3.8 million, which corresponds to the approximate number of events ingested per day.

This is great news. The solution solves the previous problems because the delay the model update caused stays constant as new data comes in, and so do the resources that Amazon Redshift consume (assuming the growth of the base table is constant). In other words, using materialized views eliminates the need for workarounds, such as archiving or cluster expansion, as the dataset grows. It also simplifies the refresh procedure for model updates by reducing the number of SQL statements from three (CREATE, DROP, and RENAME) to one (REFRESH).

Achieving fast refresh performance with materialized views

Amazon Redshift can refresh a materialized view efficiently and incrementally. It keeps track of the last transaction in the base tables up to which the materialized view was previously refreshed. During subsequent refreshes, Amazon Redshift processes only the newly inserted, updated, or deleted tuples in the base tables, referred to as a delta, to bring the materialized view up-to-date with its base tables. In other words, Amazon Redshift can incrementally maintain the materialized view by reading only base table deltas, which leads to faster refresh times.

For AXS, Amazon Redshift analyzed their materialized view definitions, which join multiple tables, filters, and aggregates, to figure out how to incrementally maintain their specific materialized view. Each time AXS refreshes the materialized view, Amazon Redshift quickly determines if a refresh is needed, and if so, incrementally maintains the materialized view. As records are ingested into the base table, the materialized view refresh times shown are much faster and grow very slowly because each refresh reads a delta that is small and roughly the same size as the other deltas. In comparison, the refresh times using CTAS are much slower because each refresh reads all the base tables. Moreover, the refresh times using CTAS grow much faster because the amount of data that each refresh reads grows with the ingest rate.

You are in full control of when to refresh your materialized views. For example, AXS refreshes their materialized views based on triggers defined in Etleap. As a result, transactions that are run on base tables do not incur additional cost to maintain dependent materialized views. Decoupling the base tables’ updates from the materialized view’s refresh gives AXS an easy way to insulate their dashboard users and offers them a well-defined snapshot to query, while ingesting new data into base tables. When AXS vets the next batch of base table data via their ETL pipelines, they can refresh their materialized views to offer the next snapshot of dashboard results.

In addition to efficiently maintaining their materialized views, AXS also benefits from the simplicity of Amazon Redshift storing each materialized view as a plain table. Queries on the materialized view perform with the same world-class speed that Amazon Redshift runs any query. You can organize a materialized view like other tables, which means that you can exploit distribution key and sort columns to further improve query performance. Finally, when you need to process many queries at peak times, Amazon Redshift’s concurrency scaling kicks in automatically to elastically scale query processing capacity.

Conclusion

Now that the materialized views feature is generally available, Etleap gives you the option of using materialized views rather than tables when creating models. You can use models more actively as part of your ETLT strategies, and also choose more frequent update schedules for your models, due to the performance benefits of incremental refreshes.

For more information about Amazon Redshift materialized views, see Materialize your Amazon Redshift Views to Speed Up Query Execution and Creating Materialized Views in Amazon Redshift.

 


About the Author

Christian Romming is the founder and CEO of Etleap.  Etleap is a managed ETL solution for AWS that doesn’t require extensive engineering work to set up, maintain, and scale.

 

 

 

 

Prasad Varakur is a Database, Big Data & Distributed Systems enthusiast, and Product Manager at Amazon Web Services. Prior to this, he has developed Database and Storage engines at SAP/Sybase, Couchbase, Huawei, Novell, EMC, and Veritas. He holds 11 patents in database systems and distributed computing, and his thesis has contributed foundational works of Parametric Query Optimization. He holds Master’s degree in Computer Science from IIT, Kanpur.

 

Vuk Ercegovac is a principal engineer for Redshift at AWS.

 

 

 

AWS Architecture Monthly Magazine: Data Lakes

Post Syndicated from Annik Stahl original https://aws.amazon.com/blogs/architecture/aws-architecture-monthly-magazine-data-lakes/

A data lake is the fastest way to get answers from all your data to all your users. It’s a centralized repository that allows you to store all your structured and unstructured data at any scale. You can store your data as-is, without having to first structure the data, and run different types of analytics—from dashboards and visualizations to big data processing, real-time analytics, and machine learning—to guide better decisions.

In This Issue

In this month’s Architecture Monthly, we speak to AWS Analytics Tech Leader, Taz Sayed, about general architecture trends in data lakes, the questions customers need to ask themselves before considering a data lake, and we get his outlook on the role the cloud will play in future development efforts.

We also introduce you to two companies that are utilizing data lakes for deep analytics, point you to an AWS managed solution, provide some real-world videos, and more.

  • Ask an Expert: Taz Sayed, Tech Leader, AWS Analytics
  • Blog: Kayo Sports builds real-time view of the customer on AWS
  • Case Study: Yulu Uses a Data Lake on AWS to Pedal a Change
  • Solution: Data Lake on AWS
  • Managed Solution: AWS Lake Formation
  • Whitepaper: Building Big Data Storage Solutions (Data Lakes) for Maximum Flexibility

How to Access the Magazine

We hope you’re enjoying Architecture Monthly, and we’d like to hear from you—leave us star rating and comment on the Amazon Kindle Newsstand page or contact us anytime at [email protected].

Ingest Excel data automatically into Amazon QuickSight

Post Syndicated from Ying Wang original https://aws.amazon.com/blogs/big-data/ingest-excel-data-automatically-into-amazon-quicksight/

Amazon QuickSight is a fast, cloud-powered, business intelligence (BI) service that makes it easy to deliver insights to everyone in your organization. This post demonstrates how to build a serverless data ingestion pipeline to automatically import frequently changed data into a SPICE (Super-fast, Parallel, In-memory Calculation Engine) dataset of Amazon QuickSight dashboards.

It is sometimes quite difficult to be agile in BI development. For example, end-users that perform self-service analytics may want to add their additional ad hoc data into an existing dataset and have a view of the corresponding updated dashboards and reports in a timely fashion. However, dashboards and reports are usually built on top of a single online analytic processing (OLAP) data warehouse with a rigid schema. Therefore, an end-user (who doesn’t have permission to update the dataset directly) has to go through a complicated and time-consuming procedure to have their data updated in the warehouse. Alternatively, they could open a ticket for you to edit the dataset manually, but it is still a very inconvenient solution that involves a significant amount of repetitive manual effort, especially if they frequently need to update the data.

Therefore, an automated data processing tool that can perform real-time data ingestion is very useful. This post discusses a tool that, when an end-user uploads Excel files into Amazon S3 or any other data file sharing location, performs the following end-to-end process:

  • Cleans the raw data from the Excel files, which might contain a lot of formatting and redundant information.
  • Ingests the cleaned data.
  • Performs a status check to monitor the data cleaning and ingestion process.
  • Sends a notification of the results to the end-user and BI development team.

With the recently launched feature cross data source joins, you can join across all data sources that Amazon QuickSight supports, including file-to-file, file-to-database, and database-to-database joins. For more information, see Joining across data sources on Amazon QuickSight.

In addition to cross data source joins, Amazon QuickSight has also launched new APIs for SPICE ingestion. For more information, see Importing Data into SPICE and Evolve your analytics with Amazon QuickSight’s new APIs and theming capabilities.

This post shows how you can combine these features to build an agile solution that cleans and ingests an Excel file into a SPICE dataset of Amazon QuickSight automatically. In SPICE, the real-time data from Excel joins with the Amazon Redshift OLAP data warehouse, and end-users receive Amazon SNS messages about its status throughout the process.

Solution overview

The following diagram illustrates the workflow of the solution.

The workflow includes the following steps:

  1. An end-user uploads an Excel file into an on-premises shared folder.
  2. The Excel files upload to the Amazon S3 bucket excel-raw-data.Alternatively, the end-user can skip this step and upload the Excel file into this Amazon S3 bucket directly.
  3. This upload event triggers the SNS message Excel-Raw-Data-Uploaded.
  4. Both the end-user and the BI team receive a message about the new upload event.
  5. The upload event also triggers the AWS Lambda function DataClean to process the Excel data.
  6. The Lambda function removes the formatting and redundant information of the Excel file, saves the cleaned data as a CSV file into the S3 bucket autoingestionqs, and publishes an SNS message to notify end-users about the data cleansing status.
  7. This cleaned CSV file is mapped as an Amazon Athena table.
  8. In the Amazon QuickSight SPICE dataset, this table joins with the Amazon Redshift table through the cross data source join functionality.
  9. The CSV file creation event in the S3 bucket autoingestionqs triggers the Lambda function qsAutoIngestion.
  10. This function calls the data ingestion API of Amazon QuickSight and checks the data ingestion status.
  11. When the data ingestion is complete, end-users receive the Ingestion-Finished SNS message.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Creating resources

Create your resources by launching the following AWS CloudFormation stack:

During the stack creation process, you have to provide a valid email address as the endpoint of Amazon SNS services. After the stack creation is successful, you have three SNS topics, two S3 buckets, and the corresponding IAM policies.

Walkthrough

To implement this solution, complete the following steps:

  1. Enable SNS notification of new object creation event in S3 bucket excel-raw-data. For more information, see How Do I Enable and Configure Event Notifications for an S3 Bucket? When an end-user uploads an Excel file into the excel-raw-data S3 bucket, the event triggers an Amazon SNS message.The following screenshot shows the example Excel file that this post uses.The following screenshot shows the SNS message Excel-Raw-Data-Upload, which includes details of the upload event.
  2. Download the sample code DataClean.py in Python 3.7 from the GitHub repo.
  3. Create a Lambda function named DataClean.
  4. Configure the function to be a subscriber of the SNS topic Excel-Raw-Data-Uploaded.
  5. Edit the SNS topic Cleaning-is-Done, and add the following code to the access policy:
    "Sid": "example-statement-ID",
          "Effect": "Allow",
          "Principal": {
            "AWS": "*"
          },
          "Action": "SNS:Publish",
          "Resource": " arn:aws:sns:us-east-1:AWS Account ID: SNS Topic Name",
          "Condition": {
            "ArnLike": {
              "aws:SourceArn": "arn:aws:lambda:us-east-1:AWSAccountID:function:DataClean"
            }
          }

    The policy allows the Lambda function DataClean to trigger the SNS message Cleaning-is-Done.

    The function DataClean saves the CSV file of the cleaned data into the S3 bucket autoingestionqs. You should see the new CSV file in this bucket. See the following screenshot.

    When the Lambda function ends, it triggers the SNS message Cleaning-is-Done. The following screenshot shows the text of the notification message.

  6. Add an event notification into the S3 bucket autoingestionqs to trigger a Lambda function named qsAutoIngestion.This function calls the Amazon QuickSight data API to ingest data into the SPICE dataset.The cleaned CSV file in the S3 bucket autoingestionqs is mapped as an Athena table. The following screenshot shows the sample data in the CSV file.

    In the Amazon QuickSight SPICE dataset, the Athena table joins with the Amazon Redshift table through the cross data source join functionality.

  7. Create the SPICE dataset. For more information, see Joining across data sources on Amazon QuickSight.The following screenshot shows the Data page in Amazon QuickSight where your data set details appear. The Athena table joins a Redshift table.The new object creation event in the Amazon S3 bucket autoingestionqs triggers another Lambda function named qsAutoIngestion. This function calls the data ingestion API of Amazon QuickSight and checks the data ingestion status. If the data ingestion is completed successfully, end-users receive the SNS message Ingestion-Finished. You can download the sample code of qsAutoIngestion from the GitHub repo.

Cleaning up

To avoid incurring future charges, delete the resources you created: the two Lambda functions, three SNS topics, two S3 buckets, and corresponding IAM policies.

Conclusion

This post discussed how BI developers and architects can use data API, Lambda functions, and other AWS services to complete an end-to-end automation process. End-users can have their real-time data ingested and joined with OLAP data warehouse tables and visualize their data in a timely fashion without the need to wait for nightly or hourly ETL or the need to understand the complex technical development steps. You should now be fully equipped to construct a solution in a development environment and demo it to non-technical business end-users.

 


About the Author

Ying Wang is a Data Visualization Engineer with the Data & Analytics Global Specialty Practice in AWS Professional Services.