Tag Archives: Amazon Redshift Spectrum

Speed up data ingestion on Amazon Redshift with BryteFlow

Post Syndicated from Pradnya Bhandary original https://aws.amazon.com/blogs/big-data/speed-up-data-ingestion-on-amazon-redshift-with-bryteflow/

This is a guest post by Pradnya Bhandary, Co-Founder and CEO at Bryte Systems.

Data can be transformative for an organization. How and where you store your data for analysis and business intelligence is therefore an especially important decision that each organization needs to make. Should you choose an on-premises data warehouse solution or embrace the cloud?

On-premises data warehouses require servers to be in your data center, a team to manage them, and a large initial investment that tries to accommodate current and future data needs. Cloud data warehouses, on the other hand, are fully managed and can start with a small investment and grow as your business demands. You don’t need to provision for the future, but for present needs. This knowledge brings peace of mind that extra capacity can be added overnight, if needed.

One such cloud data warehouse is Amazon Redshift. Amazon Redshift is the most popular cloud data warehouse. It’s a fully managed, petabyte-scale cloud-based data warehouse product designed for large-scale dataset storage and analysis. It provides agility, flexibility, and cost-effectiveness.

To use Amazon Redshift effectively, getting your data efficiently from various data silos to the data warehouse is critical, because this determines how quickly you can access the data. Do you have to wait until the data is loaded and make do with stale data, or can you access your data across the organization in near-real time to derive fresh and rich data insights?

In this post, we explain how Origin Energy followed some of the best practices for data ingestion for Amazon Redshift and how they achieved faster ingestion rates into Amazon Redshift using the BryteFlow software.

Origin Energy

Origin Energy, a leading energy provider, was finding that its on-premises data warehouse was struggling to support growing data demands. They also needed to unlock siloed data from legacy databases like SAP, Oracle, SQL Server, MySQL, and more. Access to data was fragmented and time consuming.

Moving to a cloud enterprise analytics environment with centralized data access was the only viable option to support their data initiatives. For more information, see the Origin Energy case study.

To speed up data ingestion on Amazon Redshift, they followed data ingestion best practices.

Log-based CDC mechanism to get data to Amazon Redshift

When data is replicated from a source database to a target that could be another database, data warehouse, or cloud data storage object, changes to the data in the source need to be captured and replicated to the destination to keep data consistent and trustworthy. Change data capture (CDC) makes this possible. CDC captures the changes in the source data and updates only the data in the destination that has changed. This does away with the tedious task of bulk load updating and enables real-time data integration. It’s a continual, extremely reliable process and has no impact on source systems.

Parallel multi-threaded initial sync

When doing an initial ingest of data, especially of exceptionally large datasets to Amazon Redshift, parallel, multi-thread syncing to replicate data is extremely helpful in cutting down total data ingestion time. Data replication proceeds in parallel on multiple threads with optimized extraction and loading.

Parallel multi-threaded log-based capture and merge for Oracle

When source systems generate a large amount of incremental data, and transactional logs are written very often, the CDC mechanism or mining of logs for incremental data can lag behind if it can’t keep up with the source throughput. You can configure BryteFlow to have multiple parallel threads for mining. Furthermore, you can configure BryteFlow so that the logs can be mined on a completely different server and therefore there is zero load on the source and data replication is much faster with parallelism. You can make the transaction logs available on a shared mount on a remote server you spin up, and mine logs on this server. This puts zero load on the source systems and operational systems and they aren’t impacted even with mining huge volumes of data (or volumes of incremental data).

Best practices for loading data to Amazon Redshift

The following are the three best practices for loading data into Amazon Redshift.

Split large files into multiple files for high-performance loads

Amazon Redshift is a massively parallel processing (MPP) data warehouse, where several compute nodes work in parallel to ingest the data. Each node is further subdivided into slices, with each slice having one or more dedicated cores, equally dividing the processing capacity. The number of slices per node depends on the node type of the cluster. The following table shows the different node types and the number of slices.

Node TypeDefault Slices per Node
Dense Compute DC2
dc2.large2
dc2.8xlarge16
Dense Storage DS2
ds2.xlarge2
ds2.8xlarge16
RA3 Nodes
ra3.4xlarge4
ra3.16xlarge16

After you extract data into files, you can compress the files and split a single file to multiple files according to the number of slices, so that files are loaded with the compute being distributed evenly across the slices on Amazon Redshift. The number of multiple files is a configurable parameter in BryteFlow that can be set depending on the Amazon Redshift node types. The COPY command that ingests data into Amazon Redshift is configured optimally for fast data loads.

Automatic creation of tables, default distribution keys, and distribution style

Data is distributed among the nodes on the basis of distribution style and distribution key of a particular table in Amazon Redshift. An even distribution of data enables Amazon Redshift to assign the workload evenly to slices and maximizes the benefit of parallel processing. This also helps during data ingestion. When ingesting data, BryteFlow automatically creates tables with the right DDL on Amazon Redshift. It also creates default distribution keys and distribution style, so that table ingestion is highly performant.

Optimum sort keys used for optimum loads support efficient columnar storage

Data in the Amazon Redshift data warehouse is stored in a columnar fashion, which drastically reduces the number of disk I/O requests and minimizes the amount of data loaded into the memory to run a query. Reduction in I/O speeds up queries, and loading less data means Amazon Redshift can perform more in-memory processing. Using the optimum table sort keys is the best practice for efficient loads.

Automatic sync on Amazon Redshift

CDC is an important element of syncing data with Amazon Redshift. BryteFlow automatically merges changes on Amazon Redshift, with type2 history (if configured) with high performance. This means that data is ready to use as soon as it is ingested, without running lengthy ETL processes on it. The following diagram illustrates the type2 history feature.

Metadata for every extract and load can be captured on Amazon Aurora

Details on each extract and load process (for example, tables names), the number of records affected, the start and end times, and various other details is critical operational metadata that’s very useful in determining performance, tuning, and triggering other ETL processes. If this operational metadata is maintained on Amazon Redshift with every extract and load, constant single row inserts and updates can hamper performance drastically because Amazon Redshift is a columnar database and not an OLTP system.

At BryteFlow, we found that the best practice is to keep the operational metadata in an Amazon Aurora database, which is OLTP in nature and can store this metadata with constant updates and inserts and low latency.

Build your data lake on Amazon S3 and automatically query using Amazon Redshift Spectrum

BryteFlow enables you to build a continually refreshing data lake at scale on Amazon Simple Storage Service (Amazon S3) with continual replication and transformation of data. You can configure BryteFlow to use Amazon EMR on the incremental data with data on Amazon S3 and automatically merge and transform your data with an intuitive GUI. The EMR cluster can scale up or down depending on your data needs. You can then query data automatically on Amazon S3 using Amazon Redshift Spectrum.

Offload data ingestion and data preparation on Amazon S3 and load to Amazon Redshift

BryteFlow Blend helps with real-time data preparation of data ingested by BryteFlow, using Apache Spark on Amazon EMR with an intuitive GUI. You can load data prepared on Amazon S3 to Amazon Redshift via BryteFlow Blend or make it accessible to Amazon Redshift via Amazon Redshift Spectrum. This helps reserve the computational resources of Amazon Redshift for the actual querying (queries run much faster) while the Amazon S3 data lake handles data integration. The following diagram illustrates the distributed data integration architecture.

BryteFlow software supports all of the preceding ingestion best practices. Origin Energy used the BryteFlow software to build their analytics platform on Amazon Redshift. Origin’s data access has improved from several days to mere hours. BryteFlow software has helped achieve accurate data replication on the AWS Cloud with low latency, facilitating faster time-to-market for new and highly personalized customer offerings and a significant reduction in data costs.

Conclusion

Amazon Redshift delivers fast performance at scale for the most demanding workloads. Ingesting and preparing your data to Amazon Redshift using the BryteFlow software makes this an extremely attractive value proposition. You can liberate your data across data silos and quickly unlock the value on Amazon Redshift.

To see how this works for your project, you can get a free trial from our website. We offer complete support on your free trial including screen sharing, online support, and consultation.

Alternatively, you can go to AWS Marketplace for a free trial of BryteFlow Standard Edition or BryteFlow Enterprise Edition. Contact us to let us know so we can assist you through the trial.

The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.

 


About the Authors

Pradnya Bhandary is the Co-Founder and CEO at Bryte Systems.

 

 

 

 

Manage and control your cost with Amazon Redshift Concurrency Scaling and Spectrum

Post Syndicated from Vince Marchillo original https://aws.amazon.com/blogs/big-data/manage-and-control-your-cost-with-amazon-redshift-concurrency-scaling-and-spectrum/

Amazon Redshift is a fast, fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing business intelligence tools.

This post shares the simple steps you can take to use the new Amazon Redshift usage controls feature to monitor and control your usage and associated cost for Amazon Redshift Spectrum and Concurrency Scaling features. Redshift Spectrum enables you to power a lake house architecture to directly query and join data across your data warehouse and data lake, and Concurrency Scaling enables you to support thousands of concurrent users and queries with consistently fast query performance.

Why this feature is important

With tens of thousands of customers, the Amazon Redshift team has the benefit of observing the workloads and behavior across a large variety of customers, including the internal teams at Amazon. We observed that across both internal and external customers, Amazon Redshift is running demanding workloads, such as extract, transform, and load (ETL) pipelines that condense several massive datasets, many of which are hundreds of terabytes in size, into single consumable tables for reporting and analytic purposes. These jobs take advantage of Concurrency Scaling to automatically scale Amazon Redshift query processing to handle burst workloads, and Redshift Spectrum to perform analytics on the transformed datasets by joining them with external data stored in a variety of open data formats in the data lake backed by Amazon Simple Storage Service (Amazon S3). Both these features are charged based on the usage, making cost management important, especially during peak periods when there is more activity for an organization. Although a combination of Amazon CloudWatch alarms and workload management (WLM) query monitoring rules can help to keep track and monitor usage, customers have asked to control their cost at the Amazon Redshift cluster level based on usage.

Usage limits to control Concurrency Scaling and Redshift Spectrum costs

With the new usage controls feature, you can now monitor and control the usage and associated costs for Redshift Spectrum and Concurrency Scaling. You can create daily, weekly, and monthly usage limits, and define actions to take if those limits are reached to maintain predictable spending. Actions include logging usage stats as an event to a system table, generating Amazon Simple Notification Service (Amazon SNS) alerts, and disabling Redshift Spectrum or Concurrency Scaling based on your defined thresholds. The new usage controls feature allows you to continue reaping the benefits provided by both Concurrency Scaling and Redshift Spectrum with the peace of mind that you can stay within budget simply by configuring the appropriate thresholds.

Setting up and managing usage controls

You can configure Amazon Redshift usage control options on the Amazon Redshift console or by using the AWS Command Line Interface (AWS CLI) or API. You can choose to set up to four limits per feature, allowing for multiple levels of logging or notifications before you disable Redshift Spectrum or Concurrency Scaling. The usage limit settings available are largely the same for both Concurrency Scaling and Redshift Spectrum usage—the main difference is that Concurrency Scaling usage limits are based on time spent (hours and minutes), while Redshift Spectrum usage limits are based on terabytes of data scanned. The fields you can adjust and select include the following:

  • Time – The time range for which your usage limits should be applied. You can choose daily, weekly, or monthly.
  • Usage limit – For Concurrency Scaling, this allows you to enter an integer value for hours and minutes to limit the amount of time this feature can be used before the usage limit kicks in. For Redshift Spectrum, you enter an integer value for the total number of terabytes you want to allow to be scanned before the limits apply.
  • Action – The action you want to take when your usage control limit has been reached. You can choose from Log to system table, Alert, or Disable feature. The Alert and Disable feature actions trigger a CloudWatch metric alarm and you can optionally set to send Amazon SNS-based notifications.

The ability to configure up to four limits per feature, combined with the three available actions that you can take, provides accurate visibility into your current usage and a way to generate metrics of your usage patterns. You can use the Disable feature option to easily prevent going over budget, and the Alert and Log actions can provide valuable insights, such as how you are currently using Redshift Spectrum and Concurrency Scaling. For example, configuring a usage limit with a daily time period and an action to log to a system table allows you to easily generate metrics on which days you had higher utilization of Redshift Spectrum or Concurrency Scaling. These metrics can provide insights into high-traffic days and potential areas where your pipelines can be adjusted to better distribute your traffic. Another option is to configure a weekly alert limit at 25% of your desired monthly usage as a way to ensure that you are within your monthly expected budget.

Setting usage control limits on the Amazon Redshift console

To set usage limits for Concurrency Scaling and Redshift Spectrum using the new Amazon Redshift console, perform the following steps:

  1. On the Amazon Redshift console, choose
  2. Select your desired cluster.
  3. From the Actions drop-down menu, choose Configure usage limit.

  1. To configure usage limits for Concurrency Scaling, choose Configure usage limit in the Concurrency scaling usage limit
  2. To configure usage limits for Redshift Spectrum, choose Configure usage limit in the Redshift Spectrum usage limit

  1. In the Configure usage limit section, select or deselect Concurrency scaling and Redshift Spectrum.

Selecting one of those options brings up the corresponding configuration windows.

  1. Choose a Time period (Daily, Weekly, or Monthly) from the drop-down menu.
  2. Enter your desired Usage limit.
  3. From the Action drop-down menu, choose an action (Alert, Log to system table, or Disable feature).

  1. To configure additional usage limits, choose Add another limit and action.
  2. When you have configured all your desired usage limits, choose Configure to confirm your usage limit settings.

Your configurations are now visible in the usage limit dashboard.

Managing usage control limits via the Amazon Redshift console

You can edit and delete usage limits on the Amazon Redshift console. The Edit option allows you to add limits or modify existing limit settings, and the Delete option deletes all configured limits for the corresponding service. To manage your configurations, perform the following steps:

  1. On the Amazon Redshift console, choose
  2. Select your desired cluster.
  3. From the Actions drop-down menu, choose Configure usage limit.
  4. To edit your existing usage limit configurations, choose Edit in the corresponding service box.

The editing option lets you add a new usage limit, remove a usage limit, or modify an existing usage limit and corresponding action.

To modify the time period of an existing usage limit, you can remove and add it as a new usage limit.

To delete your Concurrency Scaling limits, choose Delete usage limit in the Concurrency scaling usage limit section.

To delete your Redshift Spectrum limits, choose Delete usage limit in the Redshift Spectrum usage limit section. Choosing Delete usage limit removes all limits configured for that service.

Setting usage control limits via the AWS CLI

You can also use the AWS CLI to add, edit, describe, or remove usage control configurations. The following examples outline the required CLI commands for each use case:

  • create-usage-limit – This command adds a new usage limit configuration for your Amazon Redshift cluster. The command should include the following parameters:
    • –cluster-identifier – The name of the cluster on which to apply the usage control.
    • –period – The time range for your usage limit. You can enter daily, weekly, or monthly for this parameter.
    • –feature-type – The service to which you want to apply this usage control. You can enter spectrum or concurrency-scaling for this parameter.
    • –limit-type – For Redshift Spectrum, this parameter should be set to data-scanned. For Concurrency Scaling, this should be set to time.
    • –amount – For Redshift Spectrum, this parameter should equal the total terabytes allowed to be scanned in increments of 1 TB. For Concurrency Scaling, this parameter should be set to the total minutes (on the console, you can do this in hh:mm) allowed before limits actions are applied.
    • –breach-action – The action to take when you reach your configured limit. Possible values are log, emit-metric, or disable. emit-metric sends metrics for CloudWatch.

See the following example code:

aws redshift create-usage-limit --cluster-identifier <yourClusterIdentifier> --period <daily|weekly|monthly> --feature-type <spectrum|concurrency-scaling> --limit-type <data-scanned|time> --amount <yourDesiredAmount> --breach-action <log|emit-metric|disable>
  • describe-usage-limits – This command returns a JSON response that lists the configured usage limits for the cluster you choose. The response includes all the configurable fields, such as the limit type and breach actions, and includes a usage limit ID, which is required for the modify and delete commands. The describe command should include the following parameter:
    • –cluster-identifier – The cluster identifier for which you want to obtain the configured usage limits.

See the following example code:

aws redshift describe-usage-limits --cluster-identifier <yourClusterIdentifier>
{
    "UsageLimits": [
        {
            "LimitType": "data-scanned",
            "Period": "daily",
            "BreachAction": "log",
            "FeatureType": "spectrum",
            "UsageLimitId": "4257b96e-5b12-4348-adc2-4922d2ceddd2",
            "Amount": 1,
            "ClusterIdentifier": "cost-controls-demo"
        },
  • modify-usage-limit – This command allows you to modify an existing usage limit configuration on your Amazon Redshift cluster. This command requires the UsageLimitID for the limit you want to modify, which you can obtain by running the describe-usage-limits The modify-usage-limit command should include the following parameters:
    • –usage-limit-id – The ID of the usage limit that you want to modify. You can obtain this by running the describe-usage-limits.
    • –amount – The new value for your limit threshold.
    • –breach-action – The new action to take if you reach your limit threshold.

See the following example code:

aws redshift modify-usage-limit --usage-limit-id "<yourUsageLimitID>" --amount <newAmount> --breach-action <newBreachAction>
  • delete-usage-limit – This command deletes a configured usage limit from your Amazon Redshift cluster. This command requires the UsageLimitID, which you can obtain by running the describe-usage-limits This command should have the following parameter:
    • –usage-limit-id – The ID of the limit that you want to delete.

See the following example code:

aws redshift delete-usage-limit --usage-limit-id "<yourUsageLimitID>"

For more information, see Managing usage limits in Amazon Redshift.

Summary

The Amazon Redshift usage controls provide you with an easy way to monitor, alert, and limit the cost you incur when using Concurrency Scaling and Redshift Spectrum features. With up to four limits configurable per feature and options to log events, trigger Amazon SNS notifications, or disable the features altogether from the Redshift console and AWS CLI, you have all the tools needed to make sure you stay within your budget.

 


About the Authors

Vince Marchillo is a Solutions Architect within Amazon’s Business Data Technologies organization. Vince guides customers to leverage scalable, secure, and easy-to-use data lake architecture powered by AWS services and other technologies.

 

 

 

 

Maor Kleider is a product and database engineering leader for Amazon Redshift. Maor is passionate about collaborating with customers and partners, learning about their unique big data use cases and making their experience even better. In his spare time, Maor enjoys traveling and exploring new restaurants with his family.

 

 

 

Satish Sathiya is a Product Engineer at Amazon Redshift. He is an avid big data enthusiast who collaborates with customers around the globe to achieve success and meet their data warehousing and data lake architecture needs.

Develop an application migration methodology to modernize your data warehouse with Amazon Redshift

Post Syndicated from Po Hong original https://aws.amazon.com/blogs/big-data/develop-an-application-migration-methodology-to-modernize-your-data-warehouse-with-amazon-redshift/

Data in every organization is growing in volume and complexity faster than ever. However, only a fraction of this invaluable asset is available for analysis. Traditional on-premises MPP data warehouses such as Teradata, IBM Netezza, Greenplum, and Vertica have rigid architectures that do not scale for modern big data analytics use cases. These traditional data warehouses are expensive to set up and operate, and require large upfront investments in both software and hardware. They cannot support modern use cases such as real-time or predictive analytics and applications that need advanced machine learning and personalized experiences.

Amazon Redshift is a fast, fully managed, cloud-native and cost-effective data warehouse that liberates your analytics pipeline from these limitations. You can run queries across petabytes of data in your Amazon Redshift cluster, and exabytes of data in-place on your data lake. You can set up a cloud data warehouse in minutes, start small for just $0.25 per hour, and scale to over a petabyte of compressed data for under $1,000 per TB per year – less than one-tenth the cost of competing solutions.

With tens of thousands of current global deployments (and rapid growth), Amazon Redshift has experienced tremendous demand from customers seeking to migrate away from their legacy MPP data warehouses. The AWS Schema Conversion Tool (SCT) makes this type of MPP migration predictable by automatically converting the source database schema and a majority of the database code objects, including views, stored procedures, and functions, to equivalent features in Amazon Redshift. SCT can also help migrate data from a range of data warehouses to Amazon Redshift by using built-in data migration agents.

Large-scale MPP data warehouse migration presents a challenge in terms of project complexity and poses a risk to execution in terms of resources, time, and cost. You can significantly reduce the complexity of migrating your legacy data warehouse and workloads with a subject- and object-level consumption-based data warehouse migration roadmap.

AWS Professional Services has designed and developed this tool based on many large-scale MPP data warehouse migration projects we have performed in the last few years. This approach is derived from lessons learned from analyzing and dissecting your ETL and reporting workloads, which often have intricate dependencies. It breaks a complex data warehouse migration project into multiple logical and systematic waves based on multiple dimensions: business priority, data dependency, workload profiles and existing service level agreements (SLAs).

Consumption-based migration methodology

An effective and efficient method to migrate an MPP data warehouse is the consumption-based migration model, which moves workloads from the source MPP data warehouse to Amazon Redshift in a series of waves. You should run both the source MPP data warehouse and Amazon Redshift production environments in parallel for a certain amount of time before you can fully retire the source MPP data warehouse. For more information, see How to migrate a large data warehouse from IBM Netezza to Amazon Redshift with no downtime.

A data warehouse has the following two logical components:

  • Subject area – A data source and data domain combination. It is typically associated with a business function, such as sales or payment.
  • Application – An analytic that consumes one or more subject areas to deliver value to customers.

The following diagram illustrates the workflow of data subject areas and information consumption.

Figure 1: Consuming applications (reports/analytics) and subject areas (data source/domain).

This methodology helps to facilitate a customer’s journey to build a Data Driven Enterprise (D2E). The benefits are: 1) helping to deeply understand the customer’s business context and use-cases and 2) contributing to shaping an enterprise data migration roadmap.

Affinity mapping between subject area and application

To decide which applications and their associated subject areas go into which wave, you need a detailed mapping between applications and subject areas. The following table shows an example of this type of mapping.

Figure 2: The affinity mapping between applications (analytics) and subject areas.

The basis for this mapping is the query execution metadata often stored in system tables of legacy data warehouses. This mapping is the basis for the creation of each wave — a single-step migration of an application’s objects and associated subject areas. You can similarly derive another potential second step, which results in a more detailed mapping between data sources and subject areas (to the level of individual tables) and helps with detailed project planning.

The sorting method in the preceding table is important. The right-most column shows the total number of times a subject area appears in applications (from the most common subject area to the least common subject area, top to bottom). The bottom row displays the number of subject areas that appear in an application (from the most dense application to the least dense, from left to right).

Given all other conditions being equal, which applications and analytics should you start with for the first wave? The best practice is to start somewhere in the middle (such as Analytic 8 or 9 in the preceding table). If you start from the left-most column (Analytic 1), the wave includes numerous objects (sources and tables, views, ETL scripts, data formatting, cleansing and exposing routines), which makes it unwieldy and inordinately long to execute and complete. Alternatively, if you start from the right-most column (Analytic 19), it covers very few subject areas and increases the number of waves required to complete the entire migration to a longer time frame. This choice also fails to offer good insight into the complexity of the whole project.

Migration wave and subject area integration

The following table illustrates the wave-based migration approach (stair steps) for the preceding affinity map. In each wave (which may include one or more applications or analytics), there are always new subject areas (in green) to onboard and subject areas that were migrated in the previous waves (in blue). An optimal wave-based migration approach is to design migration waves to have fewer new builds with each subsequent wave. In the following example, as earlier waves finish, there are fewer new subject areas to integrate in the subsequent waves — another reason to start in the middle and work to the left on the affinity chart. This ultimately results in accelerated delivery of your migration outcomes.

Figure 3: Stair step model and subject area integration.

Wave 0 typically includes the shared or foundational dimensional data or tables that every application uses (for example, Time and Organization). Each wave should have at least one anchor application, and anchor applications need to include new subject areas or data sources. What factors should you consider when choosing anchor applications in a wave? An anchor application in one wave should have minimal dependencies on other anchor applications in other waves, and it is often considered important from the business perspective. The combination of anchor applications across all waves should cover all subject areas.

In the preceding example, there are six different migration waves. The following table summarizes their anchor applications:

Migration WaveAnchor Applications
Wave 1Analytic 9
Wave 2Analytic 8
Wave 3Analytic 7
Wave 4Analytic 6
Wave 5Analytic 4 and Analytic 5
Wave 6Analytic 3

All the other applications (analytics) will be automatically taken care of because the subject areas they are dependent upon will have already been built in the above mentioned waves.

Application onboarding best practices

To determine how many waves there should be and what applications should go into each wave, consider the following factors:

  • Business priorities – An application’s value as part of a customer’s Data Driven Enterprise (D2E) journey
  • Workload profiles – Whether a workload is mostly ETL (write intensive) or query (read only)
  • Data-sharing requirements – Different applications may use data in the same tables
  • Application SLAs – The promised performance metric to end-users for each application
  • Dependencies – Functional dependencies among different applications

 The optimal number of waves is determined based on the above criteria, and the best practice is to have up to 10 waves in one migration project so that you can effectively plan, execute and monitor.

Regarding what applications should go into which wave and why, interactions among applications and their performance impact are typically too complex to understand from first principles. The following are some best practices:

  • Perform experiments and tests to develop an understanding about how applications interact with each other and their performance impact.
  • Group applications based on common data-sharing requirements.
  • Be aware that not all workloads benefit from a large cluster. For example, simple dashboard queries may run faster on small clusters, while complex queries can take advantage of all the slices in a large Amazon Redshift cluster.
  • Consider grouping applications with different workload and access patterns.
  • Consider using a dedicated cluster for different waves of applications.
  • Develop a workload profile for each application.

Amazon Redshift cluster sizing guide

The Amazon Redshift node type determines the CPU, RAM, storage capacity, and storage drive type for each node.  The RA3 node type enables you to scale compute and storage independently. You pay separately for the amount of compute and Amazon Redshift Managed Storage (RMS) that you use.  DS2 node types are optimized to store large amounts of data and use hard disk drive (HDD) storage. If you currently run on DS2 nodes, you should upgrade to RA3 cluster to get up to 2x better performance and 2x more storage for the same cost. The dense compute (DC) node types are optimized for compute.  DC2 node types are optimized for performance-intensive workloads because they use solid state drive (SSD) storage.

Amazon Redshift node types are available in different sizes. Node size and the number of nodes determine the total storage for a cluster. We recommend 1) if you have less than 1TB of compressed data size, you should choose DC2 node types; 2) for more than 1TB of compressed data size choose RA3 node types (RA3.4xlarge or RA3.16xlarge). For more information, see Clusters and Nodes in Amazon Redshift.

The node type that you choose depends on several factors:

  • The compute needs of downstream systems to meet Service Level Agreements (SLAs)
  • The complexity of the queries and concurrent operations that you need to support in the database
  • The trade-off between achieving the best performance for your workload and budget
  • The amount of data you want to store in a cluster

For more detailed information about Amazon Redshift cluster node type and cluster sizing, see Clusters and nodes in Amazon Redshift.

As your data and performance needs change over time, you can easily resize your cluster to make the best use of the compute and storage options that Amazon Redshift provides.  You can use Elastic Resize to scale your Amazon Redshift cluster up and down in a matter of minutes to handle predictable spiky workloads and use the automated Concurrency Scaling feature to improve the performance of ad-hoc query workloads.

Although you may be migrating all the data in your traditional MPP data warehouses into the managed storage of Amazon Redshift, it is also common to send data to different destinations.  You might send cold or historical data to an Amazon S3 data lake to save costs, and send hot or warm data to an Amazon Redshift cluster for optimal performance.  Amazon Redshift Spectrum allows you to easily query and join data across your Amazon Redshift data warehouse and Amazon S3 data lake.  The powerful serverless data lake approach using AWS Glue and AWS Lambda functions enables the lake house architecture that combines data in an Amazon S3 data lake with data warehousing in the cloud using a simplified ETL data pipeline, minimizing the need to load data into an Amazon Redshift cluster.  For more detailed information, see ETL and ELT design patterns for lake house architecture using Amazon Redshift: Part 1 and Build and automate a serverless data lake using an AWS Glue trigger for the Data Catalog and ETL jobs.

Conclusion

This post demonstrated how to develop a comprehensive, wave-based application migration methodology for a complex project to modernize a traditional MPP data warehouse with Amazon Redshift. It provided best practices and lessons learned by considering business priority, data dependency, workload profiles and existing service level agreements (SLAs).

We would like to acknowledge AWS colleagues Corina Radovanovich, Jackie Jiang, Hunter Grider, Srinath Madabushi, Matt Scaer, Dilip Kikla, Vinay Shukla, Eugene Kawamoto, Maor Kleider, Himanshu Raja, Britt Johnston and Jason Berkowitz for their valuable feedback and suggestions.

If you have any questions or suggestions, please leave your feedback in the comment section. For more information about modernizing your on-premises data warehouses by migrating to Amazon Redshift and finding a trusted AWS partner who can assist you on this endeavor, see Modernize your Data Warehouse.

 


About the authors

Po Hong, PhD, is a Principal Data Architect of Data & Analytics Global Specialty Practice, AWS Professional Services.

 

 

 

Anand Rajaram is the global head of the Amazon Redshift Professional Services Practice at AWS.

 

 

 

Restrict Amazon Redshift Spectrum external table access to Amazon Redshift IAM users and groups using role chaining

Post Syndicated from Harsha Tadiparthi original https://aws.amazon.com/blogs/big-data/restrict-amazon-redshift-spectrum-external-table-access-to-amazon-redshift-iam-users-and-groups-using-role-chaining/

With Amazon Redshift Spectrum, you can query the data in your Amazon Simple Storage Service (Amazon S3) data lake using a central AWS Glue metastore from your Amazon Redshift cluster. This capability extends your petabyte-scale Amazon Redshift data warehouse to unbounded data storage limits, which allows you to scale to exabytes of data cost-effectively. Like Amazon EMR, you get the benefits of open data formats and inexpensive storage, and you can scale out to thousands of Redshift Spectrum nodes to pull data, filter, project, aggregate, group, and sort. Like Amazon Athena, Redshift Spectrum is serverless and there’s nothing to provision or manage. You only pay $5 for every 1 TB of data scanned. This post discusses how to configure Amazon Redshift security to enable fine grained access control using role chaining to achieve high-fidelity user-based permission management.

As you start using the lake house approach, which integrates Amazon Redshift with the Amazon S3 data lake using RedShift Spectrum, you need more flexibility when it comes to granting access to different external schemas on the cluster. For example, in the following use case, you have two Redshift Spectrum schemas, SA and SB, mapped to two databases, A and B, respectively, in an AWS Glue Data Catalog, in which you want to allow access for the following when queried from Amazon Redshift:

  • Select access for SA only to IAM user group Grp1
  • Select access for database SB only to IAM user group Grp2
  • No access for IAM user group Grp3 to databases SA and SB

By default, the policies defined under the AWS Identity and Access Management (IAM) role assigned to the Amazon Redshift cluster manages Redshift Spectrum table access, which is inherited by all users and groups in the cluster. This IAM role associated to the cluster cannot easily be restricted to different users and groups. This post details the configuration steps necessary to achieve fine-grained authorization policies for different users in an Amazon Redshift cluster and control access to different Redshift Spectrum schemas and tables using IAM role chaining. When using role chaining, you don’t have to modify the cluster; you can make all modifications on the IAM side. Adding new roles doesn’t require any changes in Amazon Redshift. Even when using AWS Lake Formation, as of this writing, you can’t achieve this level of isolated, coarse-grained access control on the Redshift Spectrum schemas and tables. For more information about cross-account queries, see How to enable cross-account Amazon Redshift COPY and Redshift Spectrum query for AWS KMS–encrypted data in Amazon S3.

Prerequisites

This post uses a TPC-DS 3 TB public dataset from Amazon S3 cataloged in AWS Glue by an AWS Glue crawler and an example retail department dataset. To get started, you must complete the following prerequisites. The first two prerequisites are outside of the scope of this post, but you can use your cluster and dataset in your Amazon S3 data lake.

  1. Create an Amazon Redshift cluster with or without an IAM role assigned to the cluster.
  2. Create an AWS Glue Data Catalog with a database using data from the data lake in Amazon S3, with either an AWS Glue crawler, Amazon EMR, AWS Glue, or Athena.The database should have one or more tables pointing to different Amazon S3 paths. This post uses an industry standard TPC-DS 3 TB dataset, but you can also use your own dataset.
  3. Create IAM users and groups to use later in Amazon Redshift:
    • Create new IAM groups named grpA and grpB without any policies.
    • Create users a1 and b1 and add them to groups grpA and grpB, respectively. Use lower-case usernames.
    • Add the following policy to all the groups you created to allow IAM users temporary credentials when authenticating against Amazon Redshift:
      {
         			 "Version": "2012-10-17",
          			"Statement": {
              			"Effect": "Allow",
              			"Action": "redshift:GetClusterCredentials",
              			"Resource": "arn:aws:redshift:us-east-1:0123456789:dbuser:*/*"
          			}
      }

      You may want to use more restricted access by allowing specific users and groups in the cluster to this policy for additional security.

  1. Create the IAM users and groups locally on the Amazon Redshift cluster without any password.
    • To create user a1, enter the following code:
      create user a1 password disable;

    • To create grpA, enter the following code:
      create group grpA with user a1;

    • Repeat these steps for user b1 and add the user to grpB.
  1. Install a jdbc sql query client such as SqlWorkbenchJ on the client machine.

Use case

In the following use case, you have an AWS Glue Data Catalog with a database named tpcds3tb. Tables in this database point to Amazon S3 under a single bucket, but each table is mapped to a different prefix under the bucket. The following screenshot shows the different table locations.

You use the tpcds3tb database and create a Redshift Spectrum external schema named schemaA. You create groups grpA and grpB with different IAM users mapped to the groups. The goal is to grant different access privileges to grpA and grpB on external tables within schemaA.

This post presents two options for this solution:

  • Use the Amazon Redshift grant usage statement to grant grpA access to external tables in schemaA. The groups can access all tables in the data lake defined in that schema regardless of where in Amazon S3 these tables are mapped to.
  • Configure role chaining to Amazon S3 external schemas that isolate group access to specific data lake locations and deny access to tables in the schema that point to a different Amazon S3 locations.

Isolating user and group access using the grant usage privilege

You can use the Amazon Redshift grant usage privilege on schemaA, which allows grpA access to all objects under that schema. You don’t grant any usage privilege to grpB; users in that group should see access denied when querying.

  1. Create an IAM role named mySpectrum for the Amazon Redshift cluster to allow Redshift Spectrum to read Amazon S3 objects using the following policy:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:Get*",
                    "s3:List*"
                ],
                "Resource": "*"
            }
        ]
    }

  2. Add a trust relationship to allow users in Amazon Redshift to assume roles assigned to the cluster. See the following code:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "redshift.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }

  3. Choose your cluster name.
  4. Choose Properties.
  5. Choose Manage IAM roles.
  6. For Available IAM roles, choose your new role.If you don’t see the role listed, select Enter ARN and enter the role’s ARN.
  7. Choose Done as seen in screenshot below.
  8. Use the Amazon Redshift JDBC driver that has AWS SDK, which you can download from the Amazon Redshift console (see the following screenshot) and connect to the cluster using the IAM connection string from a SQL client such as SqlWorkbenchJ.Screenshot below depicts the jar file type you select.Following screenshot depicts the connection configuration using Workbenchj.
  9. As an Amazon Redshift admin user, create external schemas with schemaA mapped to the AWS Glue database tpcds3tb (you use the IAM role you created earlier to allow Redshift Spectrum access to Amazon S3). See the following code:
    create external schema schemaA from data catalog
    database 'tpcds3tb'
    iam_role 'arn:aws:iam::0123456789:role/mySpectrum' region 'us-east-1';

  10. Verify the schema is in the Amazon Redshift catalog with the following code:
    select * from svv_external_schemas;

  11. Grant usage privilege to grpA. See the following code:
    Grant usage on schema schemaA to group grpA;

  12. Query tables in schemaA as user a1 in grpA using your SQL client. See the following code:
    Select * from schemaA.customer limit 3;
    Select * from schemaA.call_center limit 3;

    The following screenshot shows the successful query results.

  13. Query tables in schemaA as user b1 in grpB.The following screenshot shows the error message you receive.

This option gives great flexibility to isolate user access on Redshift Spectrum schemas, but what if user b1 is authorized to access one or more tables in that schema but not all tables? The second option creates coarse-grained access control policies.

Isolating user and group access using IAM policies and role chaining

You can use IAM policies mapped to IAM roles with a trust relationship to specific users and groups based on Amazon S3 location access and assign it to the cluster. For this use case, grpB is authorized to only access the table catalog_page located at s3://myworkspace009/tpcds3t/catalog_page/, and grpA is authorized to access all tables but catalog_page located at s3://myworkspace009/tpcds3t/*. The following steps help you configure for the given security requirement.

The following diagram depicts how role chaining works.

You first create IAM roles with policies specific to grpA and grpB. The first role is a generic cluster role that allows users to assume this role using a trust relationship defined in the role.

  1. On the IAM console, create a new role. See the following code:
    RoleName: myblog-redshift-assumeRole

  2. Add the following two policies to this role:
    • Add a managed policy named AWSAWS GlueConsoleFullAccess. You might consider adding your inline policy with least privileges instead of this managed role for more restricted security needs.
    • Add an inline policy called myblog-redshift-assumerole-inline with the following rules:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "Stmt1487639602000",
                  "Effect": "Allow",
                  "Action": [
                      "sts:AssumeRole"
                  ],
                  "Resource": "arn:aws:iam::0123456789:role/*"
              }
          ]
      }

  1. Add a trust relationship that allows the users in the cluster to assume this role. You can choose to limit this to specific users as necessary. See the following code:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "redshift.amazonaws.com"
          },
          "Action": "sts:AssumeRole",
          "Condition": {
            "StringLike": {
              "sts:ExternalId": "arn:aws:redshift:us-east-1:0123456789:dbuser:<YourRedshiftClusterName>/*"
            }
          }
        }
      ]
    }

  2. Create a new Redshift-customizable role specific to grpA with a policy allowing access to Amazon S3 locations for which this group is only allowed access. Make sure you omit the Amazon S3 location for the catalog_page table; you don’t want to authorize this group to view that data.
    • Name the role myblog-grpA-role.
  1. Add the following two policies to this role:
    • Add a managed policy named AWSAWS GlueConsoleFullAccess to the role. (Note: You might consider adding your inline policy with least privileges instead of this managed role for more restricted security needs.)
    • Add an inline policy named myblog-grpA-access-policy with the following rules (modify it to fit your security needs and allow minimal permissions):
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "AllowS3",
                  "Effect": "Allow",
                  "Action": [
                      "s3:*"
                  ],
                  "Resource": [
                      "arn:aws:s3:::myworkspace009/tpcds3t/call_center/*",
                      "arn:aws:s3:::myworkspace009/tpcds3t/catalog_returns/*",
                      "arn:aws:s3:::myworkspace009/tpcds3t/catalog_sales/*",
                      "arn:aws:s3:::myworkspace009/tpcds3t/customer/*",
                      "arn:aws:s3:::myworkspace009/tpcds3t/customer_address/*",
                      "arn:aws:s3:::myworkspace009/tpcds3t/customer_demographics/*",
                      "arn:aws:s3:::myworkspace009/tpcds3t/date_dim/*",
                      "arn:aws:s3:::myworkspace009/tpcds3t/item/*",
                      "arn:aws:s3:::myworkspace009/tpcds3t/promotion/*",
                      "arn:aws:s3:::myworkspace009/tpcds3t/store/*",
                      "arn:aws:s3:::myworkspace009/tpcds3t/store_sales/*",
                      "arn:aws:s3:::myworkspace009/tpcds3t/web_page/*",
                      "arn:aws:s3:::myworkspace009/tpcds3t/web_sales/*"
                  ]
              }
          ]
      }

  1. Add a trust relationship explicitly listing all users in grpA to only allow them to assume this role (choose the tab Trust relationships and edit it to add the following policy updating the relevant account details):The trust relationship has to be updated for each user added to this role, or build a new role for each user. It is fairly easy to script automate updating this trust relationship for each new user.
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "AWS": " arn:aws:iam::0123456789:role/myblog-redshift-assumeRole"
          },
          "Action": "sts:AssumeRole",
          "Condition": {
            "StringEquals": {
              "sts:ExternalId": "arn:aws:redshift:us-east-1:0123456789:dbuser:<YourRedshiftClusterName>/a1"
            }
          }
        }
      ]
    }

  2. Create another Redshift-customizable role specific to grpB with a policy restricting access only to Amazon S3 locations where this group is allowed access.
    • Name the role myblog-grpB-role.
  1. Add the following two policies to this role. Create these managed policies reflecting the data access per DB Group and attach them to the roles that are assumed on the cluster.
    • Add a managed policy named AWSAWS GlueConsoleFullAccess to the role. You might consider adding your inline policy with least privileges instead of this managed role for more restricted security needs.
    • Add an inline policy named myblog-grpB-access-policy with the following rules (modify it to fit your security needs and allow minimal permissions):
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "AllowS3",
                  "Effect": "Allow",
                  "Action": [
                      "s3:*"
                  ],
                  "Resource": [
                      "arn:aws:s3:::myworkspace009/tpcds3t/catalog_page/*"
                  ]
              }
          ]
      }

  1. Add a trust relationship explicitly listing all users in grpB to only allow them to assume this role (choose the tab Trust relationships and edit it to add the following policy updating the relevant account details):This trust relationship has to be updated for each user for this role, or build a role for each user. It is fairly easy to script automate updating this trust relationship.
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "AWS": " arn:aws:iam::0123456789:role/myblog-redshift-assumeRole"
          },
          "Action": "sts:AssumeRole",
          "Condition": {
            "StringEquals": {
              "sts:ExternalId": "arn:aws:redshift:us-east-1:0123456789:dbuser:<YourRedshiftClusterName>/b1"
            }
          }
        }
      ]
    }

  2. Attach the three roles to the Amazon Redshift cluster and remove any other roles mapped to the cluster. If you don’t find any roles in the drop-down menu, use the role ARN.
  3. As an admin user, create a new external schema for grpA and grpB, respectively, using role chaining with the two roles you created.
    • For grpA, enter the following code:
      create external schema rc_schemaA from data catalog
      database 'tpcds3tb'
      iam_role 'arn:aws:iam::0123456789:role/myblog-redshift-assumeRole,arn:aws:iam::0123456789:role/myblog-grpA-role' region 'us-east-1';
      
      grant usage on schema rc_schemaA to group grpA;

    • For grpB, enter the following code:
      create external schema rc_schemaB from data catalog
      database 'tpcds3tb'
      iam_role 'arn:aws:iam::0123456789:role/myblog-redshift-assumeRole,arn:aws:iam::0123456789:role/myblog-grpB-role' region 'us-east-1';
      
      grant usage on schema rc_schemaB to group grpB;

  1. Query the external schema as user in grpA and grpB.
    • To query the customer table and catalog_page table as user a1 in grpA, enter the following code:
      select * from rc_schemaA.customer limit 3;
      select * from rc_schemaA.catalog_page limit 3;

The following screenshot shows the query results; user a1 can access the customer table successfully.

The following screenshot shows that user a1 can’t access catalog_page.

    • Query the customer table and catalog_page table as user b1 in grpB.

The following screenshot shows that user b1 can access catalog_page.

The following screenshot shows that user b1 can’t access the customer table.

Conclusion

This post demonstrated two different ways to isolate user and group access to external schema and tables. With the first option of using Grant usage statements, the granted group has access to all tables in the schema regardless of which Amazon S3 data lake paths the tables point to. This approach gives great flexibility to grant access at ease, but it doesn’t allow or deny access to specific tables in that schema.

With the second option, you manage user and group access at the grain of Amazon S3 objects, which gives more control of data security and lowers the risk of unauthorized data access. This approach has some additional configuration overhead compared to the first approach, but can yield better data security.

In both approaches, building a right governance model upfront on Amazon S3 paths, external schemas, and table mapping based on how groups of users access them is paramount to provide the best security and allow low operational overhead.

Special acknowledgment goes to AWS colleague Martin Grund for his valuable comments and suggestions.

 


About the Authors

Harsha Tadiparthi is a Specialist Sr. Solutions Architect, AWS Analytics. He enjoys solving complex customer problems in Databases and Analytics and delivering successful outcomes. Outside of work, he loves to spend time with his family, watch movies, and travel whenever possible.

 

 

 

Harshida Patel is a Data Warehouse Specialist Solutions Architect with AWS.

 

 

Working with nested data types using Amazon Redshift Spectrum

Post Syndicated from Juan Yu original https://aws.amazon.com/blogs/big-data/working-with-nested-data-types-using-amazon-redshift-spectrum/

Redshift Spectrum is a feature of Amazon Redshift that allows you to query data stored on Amazon S3 directly and supports nested data types. This post discusses which use cases can benefit from nested data types, how to use Amazon Redshift Spectrum with nested data types to achieve excellent performance and storage efficiency, and some of the limitations of nested data types.

This post uses a data set generated with dummy data. You can view its table schema. If you’d like to try the dataset, deploy a Redshift cluster, execute the DDLs there, and use the example queries from this post or build your own.

Data modeling

In many scenarios, data is generated in a hierarchy. For example, assume a customer bought several items. For analytic purposes, there are various data modeling approaches to save storage or speed up data processing. One popular approach to achieve storage efficiency is the dimensional model.

The following table shows dummy customer data.

usernamenamesexaddressmailbirthdate
1erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/10
2shepherdlisaMark LeeM754 Michelle Gateway Port Johnstad, ME 35695[email protected]11/10/32
3palmerpaulJennifer MarshallF869 Harrell Forges Apt. 111 East Monica, MO 01243[email protected]3/11/07
4brettmcgeeTravis WilsonM535 Lisa Flat East Andrew, ID 43332[email protected]3/22/10
5torresdianaAshley HoffmanF7815 Lauren Ranch Ambertown, FL 93225[email protected]5/14/60

The following table contains dummy order data, which is linked to the customer table via a foreign key username.

usernametransaction_dateshipping_dateitemsprice
1erin1510/11/1910/13/19104794
2erin1510/11/1910/12/1971697
3erin1510/7/1910/9/19215
4erin1510/6/1910/10/1951744
5erin1510/5/1910/10/1976346

In the dimensional model, each customer’s information is stored only one time. There is no duplicated data, even though a customer could order multiple items at various times.

The dimensional model is optimal for storage. However, it can be challenging to process data efficiently. To get a full picture of your data, you need to join the two tables together to restore the hierarchy.

For example, to find out how many items customer Mark Lee bought and his total spending in the last three months, the query needs to join the customers and orders table. See the following code:

Select c.username, o.transaction_date, o.shipping_date, sum(items), sum(price) 
from customers c inner join orders o on (c.username = o.username) 
where c.name = ‘Mark Lee’ 
and transaction_date > DATEADD(month, -3, GETDATE())
group by 1,2,3;

When there are millions of customers who might buy multiple items in each transaction, the join can be very expensive. A fast-growing dataset can be so large that you need to store it in a distributed system. To perform the join, you need to shuffle data through the network, and the cost becomes even more significant.

As storage becomes cheaper and cheaper, people are starting to use a flattened model. In this model, data is pre-joined to gain processing efficiency. The following table shows that the customer and order information is stored in one record and ready to be analyzed.

usernamenamesexaddressmailbirthdatetransaction_dateshipping_dateitemsprice
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/109/14/1910/12/1921237
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/109/16/1910/9/1984824
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/109/17/1910/10/1994392
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/109/17/1910/9/1931079
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/109/25/1910/7/191208
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/1010/2/1910/5/19103689
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/1010/5/1910/10/1976346
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/1010/6/1910/10/1951744
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/1010/7/1910/9/19215
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/1010/11/1910/13/19104794
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/1010/11/1910/12/1971697
palmerpaulJennifer MarshallF869 Harrell Forges Apt. 111 East Monica, MO 01243[email protected]3/11/079/14/199/22/1964642
palmerpaulJennifer MarshallF869 Harrell Forges Apt. 111 East Monica, MO 01243[email protected]3/11/079/17/199/21/191527
palmerpaulJennifer MarshallF869 Harrell Forges Apt. 111 East Monica, MO 01243[email protected]3/11/0710/9/1910/12/195408
torresdianaAshley HoffmanF7815 Lauren Ranch Ambertown, FL 93225[email protected]5/14/609/17/199/28/1995452

This model also works well on a distributed system. Because each row contains complete information, you can process it on any node, and don’t need to shuffle data. You can also use the columnar format to store data, which allows the query engine to read only the needed columns instead of the whole row. This technique improves analytics performance and is storage efficient.

Both models have their pros and cons. The dimensional model trades compute power for storage efficiency, and the flattened model trades storage for processing efficiency.

Some new data types are available that achieve the best of both. Instead of putting child records into another table, you can nest them into the parent record and get the full information without performing a join. It effectively denormalizes the data without duplicating the parent record.

The following diagram illustrates this workflow.

You can apply this model to a schemaful hierarchy dataset. Continuing with the customer and order example, although a customer might buy multiple items, each order item contains the same type of information, such as product ID, price, and vendor.

The hierarchy is clear and consistent. You can map data to a nested structured schema, which you can store and access efficiently via SQL language.

The following table is a nested data presentation of the previous example.

usernamenamesexaddressmailbirthdatetransaction_dateshipping_dateitemsprice
erin15Sarah NewmanF795 Nancy Shoal Apt. 684 Phillipschester, MI 01979[email protected]4/24/109/14/1910/12/1921237
9/16/1910/9/1984824
9/17/1910/10/1994392
9/17/1910/9/1931079
9/25/1910/7/191208
10/2/1910/5/19103689
10/5/1910/10/1976346
10/6/1910/10/1951744
10/7/1910/9/19215
10/11/1910/13/19104794
10/11/1910/12/1971697
palmerpaulJennifer MarshallF869 Harrell Forges Apt. 111 East Monica, MO 01243[email protected]3/11/079/14/199/22/1964642
9/17/199/21/191527
10/9/1910/12/195408
torresdianaAshley HoffmanF7815 Lauren Ranch Ambertown, FL 93225[email protected]5/14/609/17/199/28/1995452

The following graph compares the storage usage for the three models (all in parquet format).

The graph shows that nested structure is as storage efficient as the dimensional model.

Using nested data types

Nested data types are structured data types for some common data patterns. Nested data types support structs, arrays, and maps.

A struct is similar to a relational table. It groups object properties together. For example, if a customer profile contains their name, address, email, and birthdate, it appears as the following schema:

customer struct<
              username:string,
              name:string,
              sex:string,
              address:string,
              mail:string,
              birthdate:string>

The data appears as the following code:

    "customer": {
        "username": "kevin35",
        "name": "Nancy Alvarez",
        "sex": "F",
        "address": "05472 Kathleen Turnpike\nNew Ashley, NV 84430",
        "mail": "[email protected]",
        "birthdate": "1961-02-05"
    }

An array stores one-to-many relationships. For example, a customer may have multiple shipping addresses or phone numbers. If a customer has several phone numbers, it appears as the following schema:

Phonenumbers array<string>

The data appears as the following code:

[‘555-5555’, ‘555-1234’]

A map is a collection of key-value pairs. You can consider it as a list of struct<key, value> elements. For example, if a customer has particular reward preferences, it appears as the following schema:

preference map<string,boolean> 

The data appears as the following code:

{one_day_delivery=true, 
 coupon=false, 
 free_shipping=true}

Nested data could have another nested data type as a member. The most common one is an array of structs. For example, an order containing multiple items could appear as the following schema:

orders array<
            struct<
              product_id:string,
              price:int,
              onsale:boolean,
              tax:int,
              weight:int,
              others:int,
              vendor:string>
            > 

You can create a complex object by combining them. For example, a customer’s online transaction appears as the following schema:

customer struct<
              username:string,
              name:string,
              sex:string,
              address:string,
              mail:string,
              birthdate:string> , 
  shipping_address array<
                      struct<
                        name:string,
                        street_address:string,
                        city:string,
                        postcode:string>
                      > , 
  creditcard string , 
  transaction_date string , 
  shipping_date string , 
  membership string , 
  preference map<string,boolean> , 
  orders array<
            struct<
              product_id:string,
              price:int,
              onsale:boolean,
              tax:int,
              weight:int,
              others:int,
              vendor:string>
            > , 
  platform string , 
  comments string 

Popular query engines such as Hive, Spark, Presto, and Redshift Spectrum support nested data types. The SQL syntax those engines support can be different. To make it straightforward and consistent, all query examples in this post use Amazon Redshift Spectrum. For more information, see Tutorial: Querying Nested Data with Amazon Redshift Spectrum.

Use cases for nested data types

Nested data types have many benefits: simplify your ETL, data modeling, and achieve the good performance. The following are some common use cases that can benefit from nested data types.

Parent-child relationship

Nested data types keep the parent-child (summary-details) relationship by storing them collocated. This often matches how you want to analyze the data. For example, to analyze customers’ purchasing habits, you may need to find the following:

  • Customers who purchase often but buy only a few items each time. They likely want an annual membership that covers the shipping cost.
  • Customers who purchase less frequently but buy many items in one transaction. They likely expect a free shipping benefit or discount.

You need support information from the orders data, such as how many items, on average, a customer buys per transaction.

To find a list of customers who order online at least once per week, with fewer than four items each time, use the following code:

with purchases as (
select co.customer.username as customer, co.transaction_date as transaction_date, co.customer.address as address,
  (select count(*) from co.orders) as total_items, 
  (select sum(case when onsale = true then 1 else 0 end) from co.orders) as items_onsale
from demo.customer_order_nested_parq co )
select customer, count(transaction_date) as tran_cnt, avg(total_items) 
from purchases 
where total_items <= 3 and items_onsale > 0 
      and transaction_date >= '2019-09-01' 
group by 1 having tran_cnt >= 4;

With the nested order details, per item information is already grouped by customer per transaction. Children aggregation is straightforward; you can aggregate order details to categorize a customer. If you use a denormalized table, you have to do GROUP BY two times. The query could also take longer. See the following code:

with purchases as (
select cc_username as customer, transaction_date, cc_address as address,
  count(*) as total_items, 
  sum(case when co_onsale = true then 1 else 0 end) as items_onsale
from demo.customer_order_flatten_parq 
group by 1,2,3)
select customer, count(transaction_date) as tran_cnt, avg(total_items) 
from purchases 
where total_items <= 3 and items_onsale > 0 
      and transaction_date > '2019-09-01'
group by 1 having tran_cnt >= 4;

To find customers who order only once per quarter with at least 10 items and high total spending, use the following code:

with purchases as (
select co.customer.username as customer, co.transaction_date as transaction_date, 
  (select count(*) from co.orders) as total_items, 
  (select sum(price) from co.orders) as total_spending
from demo.customer_order_nested_parq co )
select customer, count(transaction_date) as tran_cnt, avg(total_spending) from purchases 
where total_items >= 10 and total_spending > 5000 and transaction_date > '2019-07-01' transaction_date < '2019-09-30'
group by 1 having tran_cnt < 2
order by 3 desc;

Another benefit of using nested data types for parent-child data analysis is resource usage reduction. If there are one million customer transactions, there could be over five times the item orders. For example, to find each day how many goods ship to Michigan, use the following code:

select co.shipping_date, sum(coo.weight)
from demo.customer_order_nested_parq co, co.orders coo
where co.customer.address like '%MI 012__'
group by 1
order by 1;

Assuming that 3% of customers ship orders to Michigan, after filtering the customer data, there could be approximately 3% of matching transactions. You only need to process 150 thousand item orders instead of 5 million. This greatly reduces the data to process and the resources to use when compared to a flattened model.

For the parent-child use case, nested data types provide straightforward aggregation on children, more efficient filtering, group by, windowing, and storage saving.

Many-to-many relationship

Customers could buy many items from various vendors, and a vendor could sell a product to many customers. This is a many-to-many relationship.

In a dimensional model, you need three tables: a customers table, an orders table, and a transactions table. To find the top vendors who have the most customers, you need to join the three tables. See the following code:

select vendor, transaction_date, count(distinct cc.username)
from customers cc,
     transactions tt,
     orders oo
where cc.username = tt.username
and oo.transaction_id = tt.transaction_id
and tt.transaction_date >= '2019-01-01' 
group by 1,2
order by 3 desc;

With nested data types, the query is similar to the one using the dimensional model. However, because the orders data is collocated with customer transactions, you can join them on-the-fly without paying the cost. See the following code:

select coo.vendor, co.transaction_date, count(distinct co.customer.username)
from demo.customer_order_nested_parq co, 
co.orders coo
where co.transaction_date > '2019-01-01'
group by 1,2
order by 3 desc;

As another example, your vendor, Smith PLC, had a big sale event on October 10, 2019. You want to find out which customers bought your product during this sale and the top customers who spent the most. To do so, use the following code:

select co.customer.username, count(coo.product_id), sum(coo.price)
from demo.customer_order_nested_parq co, co.orders coo
where co.transaction_date = '2019-10-10'
and (select count(*) from co.orders 
     where vendor = 'Smith PLC' and onsale = true) > 0
group by 1
order by 3 desc;

Compared to the dimensional model query, the nested model is two-to-three times faster. This is on a relatively small dataset with only a few million rows. For a larger dataset, the performance improvement is even greater, and with less resource usage.

Sparse and frequently changed data

Assume that you want to reward customers who order from your online store. For each transaction, the customer can choose one or more rewards, such as free shipping, one-day delivery, a discount, or a coupon. Depending on how effective a reward is, you have to frequently modify the reward types, add new ones, or remove ones that aren’t popular.

If you store the data in a flattened model, there are two common options to track this data. The first method is creating a table with one column for each type of reward. You have to think of all possible rewards at the outset and create those columns. This could lead to a wide table and very sparse data. Alternatively, you can modify your table schema when you want to add or remove a reward type. That adds more maintenance work and you may lose history data. The following table demonstrates this method (all transaction_id data in below table examples are faked one).

transaction_idfree_shippingone_day_deliverydiscountcoupon
pklein35966659391853535FALSETRUETRUE
rebeccawiliams228880139768961FALSETRUE
brooke39180013629693040TRUEFALSETRUETRUE
jchapman4283556333561927FALSETRUEFALSEFALSE
mariamartin3515336516983566FALSEFALSETRUE

The second option is storing one reward per row. This avoids the wide table issue and the burden of constantly updating the schema. The approach is suitable if you only need to analyze a single reward. If you want to see whether there is any correlation between rewards, such as if more customers prefer free shipping and one-day delivery more than a discount and coupon, this option is more complicated. This model also needs more storage. The following table demonstrates this method.

transaction_idrewordtypevalue
pklein35966659391853535free_shippingFALSE
pklein35966659391853535one_day_deliveryTRUE
pklein35966659391853535couponTRUE
rebeccawiliams228880139768961one_day_deliveryFALSE
rebeccawiliams228880139768961couponTRUE
brooke39180013629693040free_shippingTRUE
brooke39180013629693040one_day_deliveryFALSE
brooke39180013629693040discountTRUE
brooke39180013629693040couponTRUE

A compromise is to use a JSON string to store selected rewards together in one column, which avoids schema change. See the following code:

preference varchar(65535)	

The following table shows how the data is stored in JSON string:

transaction_idpreference
pklein35966659391853535{“coupon”:true, “free_shipping”:false,”one_day_delivery”:true}
rebeccawiliams228880139768961{“coupon”:true, one_day_delivery”:false}
brooke39180013629693040{“coupon”:true, “discount”:true, “free_shipping”:true,”one_day_delivery”:false}
jchapman4283556333561927{“coupon”:false, “discount”:false, “free_shipping”:false, “one_day_delivery”:true}
mariamartin3515336516983566{“discount”:true, “free_shipping”:false,”one_day_delivery”:false}

You can analyze it by using a JSON function to extract the reward data. See the following code:

select correlation, count(username) from (
select username,
(case when 
    (json_extract_path_text(preference,'free_shipping')  = 'true' and  
     json_extract_path_text(preference,'one_day_delivery')  = 'true') 
     then 1
 when 
    (json_extract_path_text(preference,'discount') = 'true' and  
     json_extract_path_text(preference,'coupon')  = 'true') 
     then 2
else 0 
 end) as correlation
from demo.transactions
  )
group by 1;

This solution is acceptable, but you could be more storage efficient and more performant by using the nested data type map. See the following code:

preference map<string, boolean>

The following table shows how the data is stored in map:

transaction_idpreference
pklein35966659391853535{coupon=true, free_shipping=false,one_day_delivery=true}
rebeccawiliams228880139768961{coupon=true, one_day_delivery=false}
brooke39180013629693040{coupon=true, discount=true, free_shipping=true,one_day_delivery=false}
jchapman4283556333561927{coupon=false, discount=false, free_shipping=false, one_day_delivery=true}
mariamartin3515336516983566{discount=true, free_shipping=false,one_day_delivery=false}

 

You can analyze a single reward or multiple rewards using SQL. For example, to find how many customers prefer free shipping, use the following code:

select count(distinct co.customer.username)
from demo.customer_order_nested_parq co, co.preference cm
where cm.key = 'free_shipping' and cm.value = true;

To find how many customers prefer free shipping and one-day delivery more than a coupon or discount, use the following code:

with customer_rewards as (
select co.customer.username as customer, 
 (select count(*) from co.preference cm 
where cm.key = 'free_shipping' and cm.value = true) as shipping_pref,
 (select count(*) from co.preference cm 
where cm.key = 'one_day_delivery' and cm.value = true) as delivery_pref,
 (select count(*) from co.preference cm 
where cm.key = 'coupon' and cm.value = true) as coupon_pref,
 (select count(*) from co.preference cm 
where cm.key = 'discount' and cm.value = true) as discount_pref
from demo.customer_order_nested_parq co;
select case when shipping_pref > 0 and delivery_pref > 0 then 1
            when coupon_pref > 0 and discount_pref > 20 then 2
            else 0
       end as correlation, count(customer)
from customer_rewards
group by 1;

The map type allows you to add any key-value pair. You can add a new reward type at any time without a schema change, and you can analyze the new reward right away.

The main advantage of the map type is that it supports flexible schema and eliminates the need to update the schema frequently. However, there is not much performance benefit. If performance is your top priority, a flattened table is recommended. You can also flatten the most-often accessed columns, and use map for the less frequently accessed columns.

Limitations of nested data types

Although nested data types are useful in many use cases, they have the following limitations:

  • There is a hard limit on children size.
  • You can only append, and updating data is difficult and slow. You need to rewrite the entire nested object even if you want to modify one child attribute.
  • Processing is split at the parent record level. You may run into problems if the children data is heavily skewed.
  • The query engine may not support all types of analytics on nested data.
  • Amazon Redshift Spectrum Nested Data Limitations.

Summary

This post discussed the benefits of nested data types and use cases in which nested data types can help improve storage efficiency, performance, or simplify analysis. There are many more use cases in which nested data types can be an ideal solution. Try it out and share your experiences!

 


About the Author

 Juan Yu is a Data Warehouse Specialist Solutions Architect at AWS.

 

 

 

 

ETL and ELT design patterns for lake house architecture using Amazon Redshift: Part 2

Post Syndicated from Asim Kumar Sasmal original https://aws.amazon.com/blogs/big-data/etl-and-elt-design-patterns-for-lake-house-architecture-using-amazon-redshift-part-2/

Part 1 of this multi-post series, ETL and ELT design patterns for lake house architecture using Amazon Redshift: Part 1, discussed common customer use cases and design best practices for building ELT and ETL data processing pipelines for data lake architecture using Amazon Redshift Spectrum, Concurrency Scaling, and recent support for data lake export.

This post shows you how to get started with a step-by-step walkthrough of a few ETL and ELT design patterns of Amazon Redshift using AWS sample datasets.

Prerequisites

Before getting started, make sure that you meet the following prerequisites:

  1. This post uses two publicly available AWS sample datasets from the US-West-2 (Oregon) Region. Use the US-West-2 (Oregon) Region for your test run to reduce cross-region network latency and cost due to the data movement.
  2. You have an AWS account in the same Region.
  3. You have the AdministratorAccess policy granted to your AWS account (for production, you should restrict this further).
  4. You have an existing Amazon S3 bucket named eltblogpost in your data lake to store unloaded data from Amazon Redshift. Because bucket names are unique across AWS accounts, replace eltblogpost with your unique bucket name as applicable in the sample code provided.
  5. You have AWS CLI installed and configured to use with your AWS account.
  6. You have an IAM policy named redshift-elt-test-s3-policy with the following read and write permissions for the Amazon S3 bucket named eltblogpost:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Action": [
                    "s3:GetBucketLocation",
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListMultipartUploadParts",
                    "s3:AbortMultipartUpload",
                    "s3:PutObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "arn:aws:s3:::eltblogpost",
                    "arn:aws:s3:::eltblogpost/*"
                ],
                "Effect": "Allow"
            }
        ]
    }

  7. You have an IAM policy named redshift-elt-test-sampledata-s3-read-policy with read only permissions for the Amazon S3 bucket named awssampledbuswest2, hosting the sample data used for this walkthrough.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:Get*",
                    "s3:List*"
                ],
                "Resource": [
                    "arn:aws:s3:::awssampledbuswest2",
                    "arn:aws:s3:::awssampledbuswest2/*"
                ]
            }
        ]
    }

  8. You have an IAM role named redshift-elt-test-role that has a trust relationship with redshift.amazonaws.com and glue.amazonaws.com and the following IAM policies (for production, you should restrict this further as needed):
    • redshift-elt-test-s3-policy
    • redshift-elt-test-sampledata-s3-read-policy
    • AWSGlueServiceRole
    • AWSGlueConsoleFullAccess
  9. Make a note of the ARN for redshift-elt-test-role IAM role.
  10. You have an existing Amazon Redshift cluster with the following parameters:
    • Cluster name as rseltblogpost.
    • Database name as rselttest.
    • Four dc2.large nodes.
    • An associated IAM role named redshift-elt-test-role.
    • A publicly available endpoint.
    • A cluster parameter group named eltblogpost-parameter-group, which you use to change the Concurrency Scaling
    • Cluster workload management set to manual.
  11. You have SQL Workbench/J (or another tool of your choice) and can connect successfully to the cluster.
  12. You have an EC2 instance in the same Region with PostgreSQL client CLI (psql) and can connect successfully to the cluster.
  13. You have an AWS Glue catalog database named eltblogpost as the metadata catalog for Amazon Athena and Redshift Spectrum queries.

Loading data to Amazon Redshift local storage

This post uses the Star Schema Benchmark (SSB) dataset. It is provided publicly in an S3 bucket, which any authenticated AWS user with access to Amazon S3 can access.

To load data to Amazon Redshift local storage, complete the following steps:

  1. Connect to the cluster from the SQL Workbench/J.
  2. Execute the CREATE TABLE statements from the Github repo from your SQL Workbench/J to create tables from the SSB dataset. The following diagram shows the list of tables.
  3. Execute the COPY statements from the Github repo. This step loads data into the tables you created using the sample data available in s3://awssampledbuswest2/ssbgz/. Remember to replace your IAM role ARN noted previously.
  4. To verify that each table loaded correctly, run the following commands:
    select count(*) from LINEORDER; 
    select count(*) from PART;
    select count(*) from CUSTOMER;
    select count(*) from SUPPLIER;
    select count(*) from DWDATE;

    The following results table shows the number of rows for each table in the SSB dataset:

    Table Name    Record Count
    LINEORDER     600,037,902
    PART            1,400,000
    CUSTOMER        3,000,000
    SUPPLIER        1,000,000
    DWDATE              2,556

In addition to the record counts, you can also check for a few sample records from each table.

Performing ELT and ETL using Amazon Redshift and unload to S3

The following are the high-level steps for this walkthrough:

  1. You are looking to pre-aggregate some commonly asked measures by your end-users on the point of sales (POS) data you loaded in Amazon Redshift local storage.
  2. You want to then unload the aggregated data from Amazon Redshift to your data lake (S3) in an open and analytics optimized and compressed Parquet file format. You also want to consider an optimized partitioning for the unloaded data in your data lake to help with the end-user query performance and eventually lower cost.
  3. You want to query the unloaded data from your data lake with Redshift Spectrum. You also want to share the data with other AWS services such as Athena with its pay-per-use and serverless ad hoc and on-demand query model, AWS Glue and Amazon EMR for performing ETL operations on the unloaded data and data integration with your other datasets (such as ERP, finance, or third-party data) stored in your data lake, and Amazon SageMaker for machine learning.

Complete the following steps:

  1. To compute the necessary pre-aggregates, execute the following three ELT queries available on the Github repo from your SQL Workbench/J:
    • ELT Query 1 – This query summarizes the revenue by manufacturer, category, and brand per month per year per supplier region.
    • ELT Query 2 – This query summarizes the revenue by brand per month per year by supplier region and city.
    • ELT Query 3 – This query drills down in time by customer city, supplier city, month, and year.
  2. To unload the aggregated data to S3 with Parquet file format and proper partitioning to help with the access patterns of the unloaded data in the data lake, execute the three UNLOAD queries available on the Github repo from your SQL Workbench/J. To use Redshift Spectrum for querying the unloaded data, you need the following:
    • An Amazon Redshift cluster and a SQL client (SQL Workbench/J or another tool of your choice) that can connect to your cluster and execute SQL commands. The cluster and the data files in S3 must be in the same Region.
    • An external schema in Amazon Redshift that references a database in the external data catalog and provides the IAM role ARN that authorizes your cluster to access S3 on your behalf. It is a best practice to have an external data catalog in AWS Glue.You are now ready to create an AWS Glue crawler.
  3. From AWS CLI, run the following code (replace <Your AWS Account>):
    aws glue create-crawler --cli-input-json file://mycrawler.json --region us-west-2
    Where the file mycrawler.json contains:
    {
        "Name": "eltblogpost_redshift_spectrum_etl_elt_glue_crawler",
        "Role": "arn:aws:iam::<Your AWS Account>:role/redshift-elt-test-role",
        "DatabaseName": "eltblogpost",
        "Description": "",
        "Targets": {
            "S3Targets": [
                {
                    "Path": "s3://eltblogpost/unload_parquet/monthly_revenue_by_region_manufacturer_category_brand"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/monthly_revenue_by_region_city_brand"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/yearly_revenue_by_city"
                }
            ]
        }
    }

    You can also schedule the crawler to run periodically based on your use case. For example, you can schedule the crawler every 35 minutes to keep the AWS Glue catalog tables up to date with the data being unloaded every 30 minutes. However, this post does not configure any scheduling.

  4. After you create the AWS Glue crawler, run it manually from AWS CLI with the following command:
    aws glue start-crawler --name "eltblogpost_redshift_spectrum_etl_elt_glue_crawler" --region us-west-2
  5. When the AWS Glue crawler run is complete, go to the AWS Glue console to see the following three AWS Glue catalog tables under the database eltblogpost:
    • monthly_revenue_by_region_manufacturer_category_brand
    • monthly_revenue_by_region_city_brand
    • yearly_revenue_by_city
  6. Now that you have an external data catalog in AWS Glue named etlblogpost, create an external schema in the persistent cluster named eltblogpost using the following SQL from your SQL Workbench/J (replace <Your AWS Account>):
    create external schema spectrum_eltblogpost 
    from data catalog 
    database 'eltblogpost' 
    iam_role 'arn:aws:iam::<Your AWS Account>:role/redshift-elt-test-role'
    create external database if not exists;

    Using Spectrum, you can now query the three AWS Glue catalog tables you set up earlier.

  7. Go to SQL Workbench/J and run the following sample queries:
    • Top 10 brands by category and manufacturer contributing to revenue for the region AFRICA for the year of 1992 and month of March:
      SELECT brand, category, manufacturer, revenue 
      from "spectrum_eltblogpost"."monthly_revenue_by_region_manufacturer_category_brand"
      where year = '1992'
      and month = 'March' 
      and supplier_region = 'AFRICA'
      order by revenue desc
      limit 10;
      
      brand | category | manufacturer | revenue
      ----------+----------+--------------+-----------
      MFGR#1313 | MFGR#13 | MFGR#1 | 5170356068
      MFGR#5325 | MFGR#53 | MFGR#5 | 5106463527
      MFGR#3428 | MFGR#34 | MFGR#3 | 5055551376
      MFGR#2425 | MFGR#24 | MFGR#2 | 5046250790
      MFGR#4126 | MFGR#41 | MFGR#4 | 5037843130
      MFGR#219 | MFGR#21 | MFGR#2 | 5018018040
      MFGR#159 | MFGR#15 | MFGR#1 | 5009626205
      MFGR#5112 | MFGR#51 | MFGR#5 | 4994133558
      MFGR#5534 | MFGR#55 | MFGR#5 | 4984369900
      MFGR#5332 | MFGR#53 | MFGR#5 | 4980619214

    • Monthly revenue for the region AMERICA for the year 1995 across all brands:
      SELECT month, sum(revenue) revenue
      FROM "spectrum_eltblogpost"."monthly_revenue_by_region_city_brand"
      where year = '1992'
      and supplier_region = 'AMERICA'
      group by month;
      
      month | revenue
      ----------+--------------
      April | 4347703599195
      January | 4482598782080
      September | 4332911671240
      December | 4489411782480
      May | 4479764212732
      August | 4485519151803
      October | 4493509053843
      June | 4339267242387
      March | 4477659286311
      February | 4197523905580
      November | 4337368695526
      July | 4492092583189

    • Yearly revenue for supplier city ETHIOPIA 4 for the years 1992–1995 and month of December:
      SELECT year, supplier_city, sum(revenue) revenue
      FROM "spectrum_eltblogpost"."yearly_revenue_by_city"
      where supplier_city in ('ETHIOPIA 4')
      and year between '1992' and '1995'
      and month = 'December'
      group by year, supplier_city
      order by year, supplier_city;
      
      year | supplier_city | revenue
      -----+---------------+------------
      1992 | ETHIOPIA 4 | 91006583025
      1993 | ETHIOPIA 4 | 90617597590
      1994 | ETHIOPIA 4 | 92015649529
      1995 | ETHIOPIA 4 | 89732644163

When the data is in S3 and cataloged in the AWS Glue catalog, you can query the same catalog tables using Athena, AWS Glue, Amazon EMR, Amazon SageMaker, Amazon QuickSight, and many more AWS services that have seamless integration with S3.

Accelerating ELT and ETL using Redshift Spectrum and unload to S3

Assume that you need to pre-aggregate a set of commonly requested metrics from your end-users on a large dataset stored in the data lake (S3) cold storage using familiar SQL and unload the aggregated metrics in your data lake for downstream consumption.

The following are the high-level steps for this walkthrough:

  1. This is a batch workload that requires standard SQL joins and aggregations on a fairly large volume of relational and structured data. You want to use the power of Redshift Spectrum to perform the required SQL transformations on the data stored in S3 and unload the transformed results back to S3.
  2. You want to query the unloaded data from your data lake using Redshift Spectrum if you have an existing cluster, Athena with its pay-per-use and serverless ad hoc and on-demand query model, AWS Glue and Amazon EMR for performing ETL operations on the unloaded data and data integration with your other datasets in your data lake, and Amazon SageMaker for machine learning.

Because Redshift Spectrum allows you to query the data directly from your data lake without needing to load into Amazon Redshift local storage, you can spin up a short-lived cluster to perform ELT at a massive scale using Redshift Spectrum and terminate the cluster when the work is complete. You can automate spinning up and terminating the short-lived cluster using AWS CloudFormation. That way, you only pay for the few minutes or hours of use. A short-lived cluster can also avoid overloading the current persistent cluster serving interactive queries from live users. For this post, use your existing cluster rseltblogpost.

This post uses a publicly available sample dataset named tickit provided by AWS,  which any authenticated AWS user with access to S3 can access:

  • Sales – s3://awssampledbuswest2/tickit/spectrum/sales/
  • Event – s3://awssampledbuswest2/tickit/allevents_pipe.txt
  • Date – s3://awssampledbuswest2/tickit/date2008_pipe.txt
  • Users – s3://awssampledbuswest2/tickit/allusers_pipe.txt

It is a best practice of Redshift Spectrum for performance reasons to load the dimension tables in the local storage of your short-lived cluster and use an external table for the fact table Sales.

Complete the following steps:

  1. Connect to the cluster from the SQL Workbench/J.To use Redshift Spectrum for querying data from data lake (S3), you need to have the following:
    • An Amazon Redshift cluster and a SQL client (SQL Workbench/J or another tool of your choice) that can connect to your cluster and execute SQL commands. The cluster and the data files in S3 must be in the same Region.
    • An external schema in Amazon Redshift that references a database in the external data catalog and provides the IAM role ARN that authorizes your cluster to access S3 on your behalf. It is a best practice to have an external data catalog in AWS Glue.
    • An AWS Glue catalog database named eltblogpost that you already created.
    • An external schema in your Redshift cluster named spectrum_eltblogpost that you already created.
  2. Execute the SQL available on the Github repo to create an external table named sales in the same external schema named spectrum_eltblogpost. As shown in the previous section, you can also use an AWS Glue crawler to create the external table.
  3. Execute the SQLs available on the Github repo to create the dimension tables to load the data into Amazon Redshift local storage for Redshift Spectrum performance best practices.
  4. Execute the COPY statements available on the Github repo to load the dimension tables using the sample data available in s3://awssampledbuswest2/tickit/. Replace the IAM role ARN with the IAM role ARN you noted earlier, which is associated with your cluster.
  5. To verify that each table has the correct record count, execute the following commands:
    select count(*) from date;
    select count(*) from users;
    select count(*) from event;
    select count(*) from spectrum_eltblogpost.sales;

    The following results table shows the number of rows for each table in the tickit dataset:

    Table Name                    Record Count
    DATE                           365
    USERS                       49,990
    EVENT                        8,798
    spectrum_eltblogpost.sales 172,456

    In addition to the record counts, you can also check for a few sample records from each table.

  6. To compute the necessary pre-aggregates, execute the following three ELT queries available on the Github repo from your SQL Workbench/J:
    • ELT Query 1 – Total quantity sold on a given calendar date.
    • ELT Query 2 – Total quantity sold to each buyer.
    • ELT Query 3 – Events in the 99.9 percentile in terms of all-time gross sales.
  7. To unload the aggregated data to S3 with Parquet file format and proper partitioning to help with the access patterns of the unloaded data in the data lake, execute the three UNLOAD queries available on the Github repo from your SQL Workbench/J.
  8. To use Redshift Spectrum for querying the unloaded data, you can either create a new AWS Glue crawler or modify the previous crawler named eltblogpost_redshift_spectrum_etl_elt_glue_crawler. Update the existing crawler using the following code from AWS CLI (replace <Your AWS Account>):
    aws glue update-crawler --cli-input-json file://mycrawler.json --region us-west-2
    Where the file mycrawler.json contains:
    {
        "Name": "eltblogpost_redshift_spectrum_etl_elt_glue_crawler",
        "Role": "arn:aws:iam::<Your AWS Account>:role/redshift-elt-test-role",
        "DatabaseName": "eltblogpost",
        "Description": "",
        "Targets": {
            "S3Targets": [
                {
                    "Path": "s3://eltblogpost/unload_parquet/monthly_revenue_by_region_manufacturer_category_brand"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/monthly_revenue_by_region_city_brand"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/yearly_revenue_by_city"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/total_quantity_sold_by_date"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/total_quantity_sold_by_buyer_by_date"
                },
                {
                    "Path": "s3://eltblogpost/unload_parquet/total_price_by_eventname"
                }
            ]
        }
    }

  9. After you create the crawler successfully, run it manually from AWS CLI with the following command:
    aws glue start-crawler --name "eltblogpost_redshift_spectrum_etl_elt_glue_crawler" --region us-west-2
  10. When the crawler run is complete, go to the AWS Glue console. The following additional catalog tables are in the catalog database eltblogpost:
    • total_quantity_sold_by_date
    • total_quantity_sold_by_buyer_by_date
    • total_price_by_eventname
  11. Using Spectrum, you can now query the three preceding catalog tables. Go to SQL Workbench/J and run the following sample queries:
      • Top 10 days for quantities sold in February and March 2008:
        SELECT caldate, total_quantity
        FROM "spectrum_eltblogpost"."total_quantity_sold_by_date"
        where caldate between '2008-02-01' and '2008-03-30'
        order by total_quantity desc
        limit 10;
        
        caldate | total_quantity
        -----------+---------------
        2008-02-20 | 1170
        2008-02-25 | 1146
        2008-02-19 | 1145
        2008-02-24 | 1141
        2008-03-26 | 1138
        2008-03-22 | 1136
        2008-03-17 | 1129
        2008-03-08 | 1129
        2008-02-16 | 1127
        2008-03-23 | 1121

      • Top 10 buyers for quantities sold in February and March 2008:
        SELECT firstname,lastname,total_quantity
        FROM "spectrum_eltblogpost"."total_quantity_sold_by_buyer_by_date"
        where caldate between '2008-02-01' and '2008-03-31'
        order by total_quantity desc
        limit 10;
        
        firstname | lastname | total_quantity
        ----------+------------+---------------
        Laurel | Clay | 9
        Carolyn | Valentine | 8
        Amelia | Osborne | 8
        Kai | Gill | 8
        Gannon | Summers | 8
        Ignacia | Nichols | 8
        Ahmed | Mcclain | 8
        Amanda | Mccullough | 8
        Blair | Medina | 8
        Hadley | Bennett | 8

      • Top 10 event names for total price:
        SELECT eventname, total_price
        FROM "spectrum_eltblogpost"."total_price_by_eventname"
        order by total_price desc
        limit 10;
        
        eventname | total_price
        ---------------------+------------
        Adriana Lecouvreur | 51846.00
        Janet Jackson | 51049.00
        Phantom of the Opera | 50301.00
        The Little Mermaid | 49956.00
        Citizen Cope | 49823.00
        Sevendust | 48020.00
        Electra | 47883.00
        Mary Poppins | 46780.00
        Live | 46661.00

After the data is in S3 and cataloged in the AWS Glue catalog, you can query the same catalog tables using Amazon Athena, AWS Glue, Amazon EMR, Amazon SageMaker, Amazon QuickSight, and many more AWS services that have seamless integration with S3.

Scaling ELT and unload running in parallel using Concurrency Scaling

Assume that you have a mixed workload under concurrency when the UNLOAD queries and the ELT jobs run in parallel in your cluster with Concurrency Scaling turned on. When Concurrency Scaling is enabled, Amazon Redshift automatically adds additional cluster capacity when you need to process an increase in concurrent read queries including UNLOAD queries. By default, Concurrency Scaling mode is turned off for your cluster. In this post, you enable the Concurrency Scaling mode for your cluster.

Complete the following steps:

  1. Go to your cluster parameter group named eltblogpost-parameter-group and complete the following:
    • Update max_concurrency_scaling_clusters to 5.
    • Create a new queue named Queue 1 with Concurrency Scaling mode set to Auto and a query group named unload_query for the UNLOAD jobs in the next steps.
  2. After you make these changes, reboot your cluster for the changes to take effect.
  3. For this post, use psql client to connect to your cluster rseltblogpost from the EC2 instance that you set up earlier.
  4. Open an SSH session to your EC2 instance and copy the nine files as shown below from the concurrency folder in the Github repo to /home/ec2-user/eltblogpost/ in your EC2 instance.
  5. Review the concurrency-elt-unload.sh script that runs the following eight jobs in parallel:
    • An ELT script for SSB dataset, which kicks off one query at a time.
    • An ELT script for the tickit dataset, which kicks off one query at a time.
    • Three unload queries for the SSB datasets kicked off in parallel.
    • Three unload queries for the tickit datasets kicked off in parallel.
  6. Run the concurrency-elt-unload.sh While the script is running, you will see the following sample output:             The following are the response times taken by the script:
    real 2m40.245s
    user 0m0.104s
    sys 0m0.000s
  7. Run the following query to validate that some of the UNLOAD queries ran in the Concurrency Scaling clusters (look for “which_cluster = Concurrency Scaling” in the query output below):
    SELECT query,
    Substring(querytxt,1,90) query_text,
    starttime starttime_utc,
    (endtime-starttime)/(1000*1000) elapsed_time_secs,
    case when aborted= 0 then 'complete' else 'error' end status,
    case when concurrency_scaling_status = 1 then 'Concurrency Scaling' else 'Main' end which_cluster
    FROM stl_query
    WHERE database = 'rselttest'
    AND starttime between '2019-10-20 22:53:00' and '2019-10-20 22:56:00’
    AND userid=100
    AND querytxt NOT LIKE 'padb_fetch_sample%'
    AND (querytxt LIKE 'create%' or querytxt LIKE 'UNLOAD%')
    ORDER BY query DESC;

    See the following output from the query: 

  8. Comment out the following SET statement in the six UNLOAD query files (ssb-unload<1-3>.sql and tickit-unload<1-3>.sql) to force all six UNLOAD queries to run in the main cluster:
    set query_group to 'unload_query';

    In other words, disable Concurrency Scaling mode for the UNLOAD queries.

  9. Run the concurrency-elt-unload.sh script. While the script is running, you will see the following sample output:              The following are the response times taken by the script:
    real 3m40.328s
    user 0m0.104s
    sys 0m0.000s

    The following shows the Workload Management settings for the Redshift cluster: 

  10. Run the following query to validate that all the queries ran in the main cluster (look for “which_cluster = Main” in the query output below):
    SELECT query,
    Substring(querytxt,1,90) query_text,
    starttime starttime_utc,
    (endtime-starttime)/(1000*1000) elapsed_time_secs,
    case when aborted= 0 then 'complete' else 'error' end status,
    case when concurrency_scaling_status = 1 then 'Concurrency Scaling' else 'Main' end which_cluster
    FROM stl_query
    WHERE database = 'rselttest'
    AND starttime between '2019-10-20 23:19:00' and '2019-10-20 23:24:00’
    AND userid=100
    AND querytxt NOT LIKE 'padb_fetch_sample%'
    AND (querytxt LIKE 'create%' or querytxt LIKE 'UNLOAD%')
    ORDER BY query DESC;

    See the following output from the query:With Concurrency Scaling, the end-to-end runtime improved by 37.5% (60 seconds faster).

    Summary

    This post provided a step-by-step walkthrough of a few straightforward examples of the common ELT and ETL design patterns of Amazon Redshift using some key Amazon Redshift features such as Amazon Redshift Spectrum, Concurrency Scaling, and recent support for data lake export.

    As always, AWS welcomes feedback. Please submit thoughts or questions in the comments.

     


    About the Authors

    Asim Kumar Sasmal is a senior data architect – IoT in the Global Specialty Practice of AWS Professional Services. He helps AWS customers around the globe to design and build data driven solutions by providing expert technical consulting, best practices guidance, and implementation services on AWS platform. He is passionate about working backwards from customer ask, help them to think big, and dive deep to solve real business problems by leveraging the power of AWS platform.

     

    Maor Kleider is a principal product manager for Amazon Redshift, a fast, simple and cost-effective data warehouse. Maor is passionate about collaborating with customers and partners, learning about their unique big data use cases and making their experience even better. In his spare time, Maor enjoys traveling and exploring new restaurants with his family.

ETL and ELT design patterns for lake house architecture using Amazon Redshift: Part 1

Post Syndicated from Asim Kumar Sasmal original https://aws.amazon.com/blogs/big-data/etl-and-elt-design-patterns-for-lake-house-architecture-using-amazon-redshift-part-1/

Part 1 of this multi-post series discusses design best practices for building scalable ETL (extract, transform, load) and ELT (extract, load, transform) data processing pipelines using both primary and short-lived Amazon Redshift clusters. You also learn about related use cases for some key Amazon Redshift features such as Amazon Redshift Spectrum, Concurrency Scaling, and recent support for data lake export.

Part 2 of this series, ETL and ELT design patterns for lake house architecture using Amazon Redshift: Part 2, shows a step-by-step walkthrough to get started using Amazon Redshift for your ETL and ELT use cases.

ETL and ELT

There are two common design patterns when moving data from source systems to a data warehouse. The primary difference between the two patterns is the point in the data-processing pipeline at which transformations happen. This also determines the set of tools used to ingest and transform the data, along with the underlying data structures, queries, and optimization engines used to analyze the data. The first pattern is ETL, which transforms the data before it is loaded into the data warehouse. The second pattern is ELT, which loads the data into the data warehouse and uses the familiar SQL semantics and power of the Massively Parallel Processing (MPP) architecture to perform the transformations within the data warehouse.

In the following diagram, the first represents ETL, in which data transformation is performed outside of the data warehouse with tools such as Apache Spark or Apache Hive on Amazon EMR or AWS Glue. This pattern allows you to select your preferred tools for data transformations. The second diagram is ELT, in which the data transformation engine is built into the data warehouse for relational and SQL workloads. This pattern is powerful because it uses the highly optimized and scalable data storage and compute power of MPP architecture.

Redshift Spectrum

Amazon Redshift is a fully managed data warehouse service on AWS. It uses a distributed, MPP, and shared nothing architecture. Redshift Spectrum is a native feature of Amazon Redshift that enables you to run the familiar SQL of Amazon Redshift with the BI application and SQL client tools you currently use against all your data stored in open file formats in your data lake (Amazon S3).

A common pattern you may follow is to run queries that span both the frequently accessed hot data stored locally in Amazon Redshift and the warm or cold data stored cost-effectively in Amazon S3, using views with no schema binding for external tables. This enables you to independently scale your compute resources and storage across your cluster and S3 for various use cases.

Redshift Spectrum supports a variety of structured and unstructured file formats such as Apache Parquet, Avro, CSV, ORC, JSON to name a few. Because the data stored in S3 is in open file formats, the same data can serve as your single source of truth and other services such as Amazon Athena, Amazon EMR, and Amazon SageMaker can access it directly from your S3 data lake.

For more information, see Amazon Redshift Spectrum Extends Data Warehousing Out to Exabytes—No Loading Required.

Concurrency Scaling

Using Concurrency Scaling, Amazon Redshift automatically and elastically scales query processing power to provide consistently fast performance for hundreds of concurrent queries. Concurrency Scaling resources are added to your Amazon Redshift cluster transparently in seconds, as concurrency increases, to serve sudden spikes in concurrent requests with fast performance without wait time. When the workload demand subsides, Amazon Redshift automatically shuts down Concurrency Scaling resources to save you cost.

The following diagram shows how the Concurrency Scaling works at a high-level:

For more information, see New – Concurrency Scaling for Amazon Redshift – Peak Performance at All Times.

Data lake export

Amazon Redshift now supports unloading the result of a query to your data lake on S3 in Apache Parquet, an efficient open columnar storage format for analytics. The Parquet format is up to two times faster to unload and consumes up to six times less storage in S3, compared to text formats. You can also specify one or more partition columns, so that unloaded data is automatically partitioned into folders in your S3 bucket to improve query performance and lower the cost for downstream consumption of the unloaded data. For example, you can choose to unload your marketing data and partition it by year, month, and day columns. This enables your queries to take advantage of partition pruning and skip scanning of non-relevant partitions when filtered by the partitioned columns, thereby improving query performance and lowering cost. For more information, see UNLOAD.

Use cases

You may be using Amazon Redshift either partially or fully as part of your data management and data integration needs. You likely transitioned from an ETL to an ELT approach with the advent of MPP databases due to your workload being primarily relational, familiar SQL syntax, and the massive scalability of MPP architecture.

This section presents common use cases for ELT and ETL for designing data processing pipelines using Amazon Redshift.

ELT

Consider a batch data processing workload that requires standard SQL joins and aggregations on a modest amount of relational and structured data. You selected initially a Hadoop-based solution to accomplish your SQL needs. However, over time, as data continued to grow, your system didn’t scale well. You now find it difficult to meet your required performance SLA goals and often refer to ever-increasing hardware and maintenance costs. Relational MPP databases bring an advantage in terms of performance and cost, and lowers the technical barriers to process data by using familiar SQL.

Amazon Redshift has significant benefits based on its massively scalable and fully managed compute underneath to process structured and semi-structured data directly from your data lake in S3.

The following diagram shows how Redshift Spectrum allows you to simplify and accelerate your data processing pipeline from a four-step to a one-step process with the CTAS (Create Table As) command.

The preceding architecture enables seamless interoperability between your Amazon Redshift data warehouse solution and your existing data lake solution on S3 hosting other Enterprise datasets such as ERP, finance, and third-party for a variety of data integration use cases.

The following diagram shows the seamless interoperability between your Amazon Redshift and your data lake on S3:

When you use an ELT pattern, you can also use your existing ELT-optimized SQL workload while migrating from your on-premises data warehouse to Amazon Redshift. This eliminates the need to rewrite relational and complex SQL workloads into a new compute framework from scratch. With Amazon Redshift, you can load, transform, and enrich your data efficiently using familiar SQL with advanced and robust SQL support, simplicity, and seamless integration with your existing SQL tools. You also need the monitoring capabilities provided by Amazon Redshift for your clusters.

ETL

You have a requirement to unload a subset of the data from Amazon Redshift back to your data lake (S3) in an open and analytics-optimized columnar file format (Parquet). You then want to query the unloaded datasets from the data lake using Redshift Spectrum and other AWS services such as Athena for ad hoc and on-demand analysis, AWS Glue and Amazon EMR for ETL, and Amazon SageMaker for machine learning.

You have a requirement to share a single version of a set of curated metrics (computed in Amazon Redshift) across multiple business processes from the data lake. You can use ELT in Amazon Redshift to compute these metrics and then use the unload operation with optimized file format and partitioning to unload the computed metrics in the data lake.

You also have a requirement to pre-aggregate a set of commonly requested metrics from your end-users on a large dataset stored in the data lake (S3) cold storage using familiar SQL and unload the aggregated metrics in your data lake for downstream consumption. In other words, consider a batch workload that requires standard SQL joins and aggregations on a fairly large volume of relational and structured cold data stored in S3 for a short duration of time. You can use the power of Redshift Spectrum by spinning up one or many short-lived Amazon Redshift clusters that can perform the required SQL transformations on the data stored in S3, unload the transformed results back to S3 in an optimized file format, and terminate the unneeded Amazon Redshift clusters at the end of the processing. This way, you only pay for the duration in which your Amazon Redshift clusters serve your workloads.

As shown in the following diagram, once the transformed results are unloaded in S3, you then query the unloaded data from your data lake either using Redshift Spectrum if you have an existing Amazon Redshift cluster, Athena with its pay-per-use and serverless ad hoc and on-demand query model, AWS Glue and Amazon EMR for performing ETL operations on the unloaded data and data integration with your other datasets (such as ERP, finance, and third-party data) stored in your data lake, and Amazon SageMaker for machine learning.

You can also scale the unloading operation by using the Concurrency Scaling feature of Amazon Redshift. This provides a scalable and serverless option to bulk export data in an open and analytics-optimized file format using familiar SQL.

Best practices

The following recommended practices can help you to optimize your ELT and ETL workload using Amazon Redshift.

Analyze requirements to decide ELT versus ETL

MPP architecture of Amazon Redshift and its Spectrum feature is efficient and designed for high-volume relational and SQL-based ELT workload (joins, aggregations) at a massive scale. A common practice to design an efficient ELT solution using Amazon Redshift is to spend sufficient time to analyze the following:

  • Type of data from source systems (structured, semi-structured, and unstructured)
  • Nature of the transformations required (usually encompassing cleansing, enrichment, harmonization, transformations, and aggregations)
  • Row-by-row, cursor-based processing needs versus batch SQL
  • Performance SLA and scalability requirements considering the data volume growth over time
  • Cost of the solution

This helps to assess if the workload is relational and suitable for SQL at MPP scale.

Key considerations for ELT

For ELT and ELT both, it is important to build a good physical data model for better performance for all tables, including staging tables with proper data types and distribution methods. A dimensional data model (star schema) with fewer joins works best for MPP architecture including ELT-based SQL workloads. Consider using a TEMPORARY table for intermediate staging tables as feasible for the ELT process for better write performance, because temporary tables only write a single copy.

A common rule of thumb for ELT workloads is to avoid row-by-row, cursor-based processing (a commonly overlooked finding for stored procedures). This is sub-optimal because such processing needs to happen on the leader node of an MPP database like Amazon Redshift. Instead, the recommendation for such a workload is to look for an alternative distributed processing programming framework, such as Apache Spark.

Several hundreds to thousands of single record inserts, updates, and deletes for highly transactional needs are not efficient using MPP architecture. Instead, stage those records for either a bulk UPDATE or DELETE/INSERT on the table as a batch operation.

With the external table capability of Redshift Spectrum, you can optimize your transformation logic using a single SQL as opposed to loading data first in Amazon Redshift local storage for staging tables and then doing the transformations on those staging tables.

Key considerations for data lake export

When you unload data from Amazon Redshift to your data lake in S3, pay attention to data skew or processing skew in your Amazon Redshift tables. The UNLOAD command uses the parallelism of the slices in your cluster. Hence, if there is a data skew at rest or processing skew at runtime, unloaded files on S3 may have different file sizes, which impacts your UNLOAD command response time and query response time downstream for the unloaded data in your data lake.

You should also control maximum file size to approximately 100 MB or less in the UNLOAD command for better performance for downstream consumption. Similarly, for S3 partitioning, a rule of thumb is to not exceed number of partitions per table on S3 to couple of hundreds by choosing the low cardinality partitioning columns (year, quarter, month, and day are good choices) in the UNLOAD command. This avoids creating too many partitions, which in turn creates a large volume of metadata in the AWS Glue catalog, leading to high query times via Athena and Redshift Spectrum.

To get the best throughput and performance under concurrency for multiple UNLOAD commands running in parallel, create a separate queue for unload queries with Concurrency Scaling turned on. This lets Amazon Redshift burst additional Concurrency Scaling clusters as required.

Key considerations for Redshift Spectrum for ELT

To get the best performance from Redshift Spectrum, pay attention to the maximum pushdown operations possible, such as S3 scan, projection, filtering, and aggregation, in your query plans for a performance boost. This is because you want to utilize the powerful infrastructure underneath that supports Redshift Spectrum. Using predicate pushdown also avoids consuming resources in the Amazon Redshift cluster.

In addition, avoid complex operations like DISTINCT or ORDER BY on more than one column and replace them with GROUP BY as applicable. Amazon Redshift can push down a single column DISTINCT as a GROUP BY to the Spectrum compute layer with a query rewrite capability underneath, whereas multi-column DISTINCT or ORDER BY operations need to happen inside Amazon Redshift cluster.

Amazon Redshift optimizer can use external table statistics to generate more optimal execution plans. Without statistics, an execution plan is generated based on heuristics with the assumption that the S3 table is relatively large. It is recommended to set the table statistics (numRows) manually for S3 external tables.

For more information on Amazon Redshift Spectrum best practices, see Twelve Best Practices for Amazon Redshift Spectrum and How to enable cross-account Amazon Redshift COPY and Redshift Spectrum query for AWS KMS–encrypted data in Amazon S3.

Summary

This post discussed the common use cases and design best practices for building ELT and ETL data processing pipelines for data lake architecture using few key features of Amazon Redshift: Spectrum, Concurrency Scaling, and the recently released support for data lake export with partitioning.

Part 2 of this series, ETL and ELT design patterns for lake house architecture using Amazon Redshift: Part 2, shows you how to get started with a step-by-step walkthrough of a few simple examples using AWS sample datasets.

As always, AWS welcomes feedback. Please submit thoughts or questions in the comments.

 


About the Authors

Asim Kumar Sasmal is a senior data architect – IoT in the Global Specialty Practice of AWS Professional Services. He helps AWS customers around the globe to design and build data driven solutions by providing expert technical consulting, best practices guidance, and implementation services on AWS platform. He is passionate about working backwards from customer ask, help them to think big, and dive deep to solve real business problems by leveraging the power of AWS platform.

 

Maor Kleider is a principal product manager for Amazon Redshift, a fast, simple and cost-effective data warehouse. Maor is passionate about collaborating with customers and partners, learning about their unique big data use cases and making their experience even better. In his spare time, Maor enjoys traveling and exploring new restaurants with his family.

How to enable cross-account Amazon Redshift COPY and Redshift Spectrum query for AWS KMS–encrypted data in Amazon S3

Post Syndicated from Asim Kumar Sasmal original https://aws.amazon.com/blogs/big-data/how-to-enable-cross-account-amazon-redshift-copy-and-redshift-spectrum-query-for-aws-kms-encrypted-data-in-amazon-s3/

This post shows a step-by-step walkthrough of how to set up a cross-account Amazon Redshift COPY and Spectrum query using a sample dataset in Amazon S3. The sample dataset is encrypted at rest using AWS KMS-managed keys (SSE-KMS).

About AWS Key Management Service (AWS KMS)

With AWS Key Management Service (AWS KMS), you can have centralized control over the encryption keys used to protect your data at rest. You can create, import, rotate, disable, delete, define usage policies, and audit the use of encryption keys used to encrypt your data. AWS KMS uses FIPS 140-2 validated cryptographic modules to protect the confidentiality and integrity of your master keys.

AWS KMS is seamlessly integrated with most AWS services. This integration means that you can easily use customer master keys (CMKs) to control the encryption of the data you store within these services. When deciding to encrypt data in a service such as Amazon Redshift, you can choose to use an AWS-managed CMK that Amazon Redshift automatically creates in KMS. You can track the usage of the key, but it’s managed by the service on your behalf. In some cases, you might need direct control over the lifecycle of a CMK or want to allow other accounts to use it. In these cases, you can create and manage your own CMK that AWS services such as Amazon Redshift can use on your behalf. These customer-managed CMKs enable you to have full control over the access permissions that determine who can use the key and under which conditions. AWS KMS is integrated with AWS CloudTrail, a service that provides a record of actions performed by a user, role, or AWS service in AWS KMS.

About Amazon Redshift and Redshift Spectrum

Amazon Redshift is a petabyte scale, fully managed data warehouse service on AWS. It uses a distributed, massively parallel processing (MPP), shared-nothing architecture that scales horizontally to meet usage requirements.

Amazon Redshift Spectrum is a feature of Amazon Redshift that extends the analytic power of Amazon Redshift beyond the data that is stored on local disks in the data warehouse. In other words, Amazon Redshift Spectrum enables you to use the same ANSI SQL syntax of Amazon Redshift on the data that is stored in an Amazon S3 data lake. You do so using external tables, without having to ingest the data into Amazon Redshift first. A common pattern is to run queries that span both the frequently accessed “hot” data stored locally in Amazon Redshift and the “warm/cold” data stored cost-effectively in Amazon S3. That pattern separates compute and storage by enabling independent scaling of both to match the use case. This means you don’t have to pay for unused compute capacity just to add more storage. More importantly, this approach enables seamless interoperability between your data lake and Amazon Redshift.

The Amazon Redshift COPY command supports the following types of Amazon S3 encryption:

  • Server-side encryption with Amazon S3-managed keys (SSE-S3)
  • Server-side encryption with AWS KMS-managed keys (SSE-KMS)
  • Client-side encryption using a client-side symmetric master key

The Amazon Redshift COPY command doesn’t support the following types of Amazon S3 encryption:

  • Server-side encryption with customer-provided keys (SSE-C)
  • Client-side encryption using an AWS KMS–managed customer master key
  • Client-side encryption using a customer-provided asymmetric master key

About the use case

A multiple-account AWS environment is a common pattern across our customers for a variety of reasons. One of the common reasons for data lake customers in AWS is to separate ownership of data assets from different business units in the company. At the same time, business units might need to grant access to some of their data assets to each other for new business insights.

As illustrated in the following drawing, in our example Account A owns an S3 bucket with SSE-KMS encrypted data and Account B owns an Amazon Redshift cluster with Redshift Spectrum enabled. Account B needs access to the same data to load to the Amazon Redshift cluster using the COPY command and also to query using Redshift Spectrum.

Solution walkthrough

Following, we walk through a couple different options to support this use case.

Prerequisites

The solution assumes that you already have the following set up:

    1. Access to two AWS accounts (we call them Account A and B) in the same AWS Region.*
    2. Grant the AdministratorAccess policy to the AWS accounts (which should be restricted further for production).
    3. Account A has a customer-managed CMK in AWS KMS with the following attributes:
      • Alias as kms_key_account_a
      • Description as Cross Account KMS Key in Account A
      • Administrator as current IAM user using which you signed in to the AWS console and created the KMS key
      • Account B added as External Accounts

      Copy and save the CMK Amazon Resource Name (ARN) to be used shortly

    4. Account A uses the following sample dataset from AWS:
      Customer - s3://awssampledbuswest2/ssbgz/customer0002_part_00.gz

    5. Account A has an S3 bucket called rs-xacct-kms-bucket with bucket encryption option set to AWS KMS using the KMS key kms_key_account_a created earlier.
    6. Use the following AWS CLI command to copy the customer table data from AWS sample dataset SSB – Sample Schema Benchmark, found in the Amazon Redshift documentation.Note: Because bucket names are global across all AWS customers, you need a unique bucket name for your test run. Be sure to replace rs-xacct-kms-bucket with your own bucket name in the following command:
      aws s3 cp s3://awssampledbuswest2/ssbgz/ s3://rs-xacct-kms-bucket/customer/ --recursive --exclude '*' --include 'customer*'

    7. After the copy is complete, check the KMS key ID for the file from S3 console, as shown following.
    8. Account B has an Amazon Redshift cluster:
      • The cluster name is rstest
      • It’s publicly accessible
      • It has an IAM role attached called redshift_role_account_b with the following two managed IAM policies:
        • AmazonS3ReadOnlyAccess
        • AWSGlueConsoleFullAccess

            Note: Be sure to update redshift_role_account_b with your own IAM role.

            You can set up a database session successfully from a client tool, such as SQL Workbench from your laptop.

* This walkthrough uses a publicly available AWS sample dataset from the US-West-2 (Oregon) Region. Hence, we recommend that you use the US-West-2 (Oregon) Region for your test run to reduce cross-region network latency and cost due to data movement.

Step-by-step walkthrough

Depending on which account’s AWS Glue Data Catalog you want to use for Redshift Spectrum, there are two solution options to choose from:

  1. AWS Glue Data Catalog in Account B
  2. AWS Glue Data Catalog in Account A

Option 1: AWS Glue Data Catalog in Account B

Set up permissions

  1. Sign in to Account A’s AWS console. Then, change the AWS Region to us-west-2 (Oregon). Add the following bucket policy for the rs-xacct-kms-bucket bucket so that Account B (which owns the Amazon Redshift cluster – rstest) can access the bucket.

Note: Replace <Account B> with AWS Account ID for Account B and rs-xacct-kms-bucket with your bucket name.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowS3",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::<Account B>:root"
            },
            "Action": [
                "s3:ListBucket",
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::rs-xacct-kms-bucket/*",
                "arn:aws:s3:::rs-xacct-kms-bucket"
            ]
        }
    ]
}
    1. Sign in to Account B’s AWS console. Then, change the AWS Region to us-west-2 (Oregon). Create IAM policies and roles as described following:

a) Create the following two IAM permission policies: rs_xacct_bucket_policy to give Account B access to the S3 bucket in Account A, and rs_xacct_kms_policy to give Account B access to the CMK in Account A.

Policy name: rs_xacct_kms_policy

Note: Replace <ARN of kms_key_account_a from Account A> with your KMS key ARN from Account A.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowUseOfTheKey",
            "Effect": "Allow",
            "Action": [
                "kms:Encrypt",
                "kms:Decrypt",
                "kms:ReEncrypt*",
                "kms:GenerateDataKey*",
                "kms:DescribeKey"
            ],
            "Resource": [
                "<ARN of kms_key_account_a from Account A>"
            ]
        },
        {
            "Sid": "AllowAttachmentOfPersistentResources",
            "Effect": "Allow",
            "Action": [
                "kms:CreateGrant",
                "kms:ListGrants",
                "kms:RevokeGrant"
            ],
            "Resource": [
                "<ARN of kms_key_account_a from Account A>"
            ],
            "Condition": {
                "Bool": {
                    "kms:GrantIsForAWSResource": true
                }
            }
        }
    ]
}

Policy name: rs_xacct_bucket_policy

Note: Replace rs-xacct-kms-bucket with your bucket name.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowS3",
            "Effect": "Allow",
            "Action": "s3:*",
            "Resource": [
                "arn:aws:s3:::rs-xacct-kms-bucket/*",
                "arn:aws:s3:::rs-xacct-kms-bucket"
            ]
        }
    ]
}

b) Create a new IAM role called xacct_kms_role_account_b for the Amazon Redshift service with the following IAM policies attached:

rs_xacct_bucket_policy
rs_xacct_kms_policy
AWSGlueConsoleFullAccess

Save the Amazon Resource Name (ARN) of the IAM role. You’ll use it soon.

c) Now let’s set up the IAM role chaining for Amazon Redshift between the two IAM roles, redshift_role_account_b and xacct_kms_role_account_b.

To chain roles, you establish a trust relationship between the roles. A role that assumes another role (for example, Role A) must have a permission policy that allows it to assume the next chained role (for example, Role B). Similarly, the role that passes permissions (Role B) must have a trust policy that allows it to pass its permissions to the previous chained role (Role A).

The first role in the chain must be a role attached to the Amazon Redshift cluster. The first role and each subsequent role that assumes the next role in the chain must have a policy that includes a specific statement. This statement has the Allow effect on the sts:AssumeRole action and the ARN of the next role in a Resource element.

In our example, Role A is redshift_role_account_b, which needs the permission policy rs_xacct_assume_role_policy, which  allows it to assume Role B (which is xacct_kms_role_account_b). Both IAM roles are owned by AWS Account B.

d) Let’s create the IAM permission policy rs_xacct_assume_role_policy and attach the policy to the IAM role redshift_role_account_b.

Policy name: rs_xacct_assume_role_policy

Note: Replace <ARN for IAM role xacct_kms_role_account_b from Account B>.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt1487639602000",
            "Effect": "Allow",
            "Action": [
                "sts:AssumeRole"
            ],
            "Resource": [
"<ARN for IAM role xacct_kms_role_account_b from Account B>"
            ]
        }
    ]
}

e) Change the trust relationship for IAM role xacct_kms_role_account_b by choosing Edit trust relationship and replacing the existing trust policy with the following:

Note: Replace <Account B> with the AWS Account ID for Account B.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "redshift.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    },
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<Account B>:root"
      },
      "Action": "sts:AssumeRole"
    }
  ]
} 

f) Create an AWS Glue service IAM role called glue_service_role_account_b with the following policies attached:

• AWSGlueServiceRole (AWS managed policy)
• rs_xacct_bucket_policy (managed policy created earlier)
• rs_xacct_kms_policy (managed policy created earlier)

Note: Be sure to update glue_service_role_account_b with your own IAM role.

Perform the Amazon Redshift COPY

  1. Log in to the Amazon Redshift cluster from your query tool and create the customer table using the DDL following.
CREATE TABLE customer 
(
  c_custkey      INTEGER NOT NULL,
  c_name         VARCHAR(25) NOT NULL,
  c_address      VARCHAR(25) NOT NULL,
  c_city         VARCHAR(10) NOT NULL,
  c_nation       VARCHAR(15) NOT NULL,
  c_region       VARCHAR(12) NOT NULL,
  c_phone        VARCHAR(15) NOT NULL,
  c_mktsegment   VARCHAR(10) NOT NULL
);

2. Now you can run the COPY statement following successfully.

copy customer from 's3://rs-xacct-kms-bucket/customer/' 
iam_role '<IAM role ARN of redshift_role_account_b,IAM role ARN of xacct_kms_role_account_b>'
gzip
region 'us-west-2';

Note: Replace the IAM role ARNs from Account B separated by a comma without any spaces around it.

3. Run the following sample query to verify that the data was loaded successfully.

select * from customer limit 10;

Set up an AWS Glue Data Catalog table for Redshift Spectrum to query

Let’s now create an AWS Glue crawler in Account B to crawl the same customer data and create a table called customer in the AWS Glue Data Catalog database spectrumdb_account_b following these steps:

  1. Navigate to Databases on the AWS Glue console and choose Add database to create an AWS Glue Data Catalog database called spectrumdb_account_b, as shown following.

  1. Navigate to Crawlers on the AWS Glue console and choose Add crawler, as shown following.

  1. Create a crawler customerxacct, as shown following.

Note: The Crawler job name (customerxacct in this case) is not same as the table name created by the crawler (a common confusion). The table name is picked up automatically from the prefix and folder name from your S3 bucket and folder structure. You also have an option to attach a table name prefix if you want to.              

  1. Choose Next to enter Data store details of the customer table, as following.

  1. Choose Next to get to the Add another data store We leave the default, No, because we don’t have any other data stores to add.

  1. Choose Next to choose the IAM role created earlier, glue_service_role_account_b, for the crawler to use, as shown following.

  1. Choose Next to go to the Schedule page and choose the schedule that you want this crawler job to run. For this example, we can choose Run on demand.

  1. Choose Next to choose the AWS Glue Data Catalog database spectrumdb_account_b (created earlier by create external schema command) as the crawler output location.

  1. Choose Next to get to the review page.

  1. After reviewing the details, choose Finish to finish creating the crawler.

  1. Now, let’s run the crawler job by selecting the job as following and choosing Run crawler.

  1. Wait and watch for the job to complete. Its status changes from Starting to Stopping to Ready. You can choose the refresh button for the latest status.

  1. If the job fails, the failure is recorded in Amazon CloudWatch logs. To view the logs, choose Logs, shown in the screenshot preceding, which takes you to the CloudWatch logs.
  1. Now, let’s go to the AWS Glue Data Catalog database to make sure that the table exists.

Choose Databases, choose the spectrumdb_account_b database, and then choose View Tables, or choose the hyperlink of the database name. You should see the customer table, as shown following.

  1. Choose the customer hyperlink to get to the external table, details following.

Because the data file didn’t have a header record, the AWS Glue crawler has assigned a default column naming convention as shown preceding. For the customer table, this naming is column 0 to column 7

  1. Choose Edit Schema and assign appropriate column names, as per the mapping following.

c0 => c_custkey

c1 => c_name

c2 => c_address

c3 => c_city

c4 => c_nation

c5 => c_region

c6 => c_phone

c7 => c_mktsegment

When you are done, choose Save.

Perform the Redshift Spectrum query

Now that the customer table is created in AWS Glue Data Catalog, let’s query the table using Redshift Spectrum.

  1. Log in to the Amazon Redshift cluster from your query tool.
  2. Run the statements following to create an external schema called spectrumxacct for Redshift Spectrum pointing to the AWS Glue Data Catalog database. This database is spectrumdb_account_b in Account B, already created on the AWS Glue console.
    drop schema if exists spectrumxacct;
    create external schema spectrumxacct
    from data catalog 
    database 'spectrumdb_account_b'
    iam_role '<IAM role ARN of redshift_role_account_b,IAM role ARN of xacct_kms_role_account_b>'
    create external database if not exists;
    

    Note: Replace the IAM role ARNs from Account B separated by a comma without any spaces around it.

  3. Run the following sample query to verify that Redshift Spectrum can query the data successfully.
    select * from spectrumxacct.customer limit 10;

Note: Redshift Spectrum uses the AWS Glue Data Catalog in Account B, not Account A.

Option 2: AWS Glue Data Catalog in Account A

 

Set up permissions

1. Sign in to the Account A AWS console, then change the AWS Region to us-west-2 (Oregon).

    • a) Create the following IAM policies:

• rs-xacct-bucket-policy to give access to the S3 bucket in Account A
• rs_xacct_kms_policy to give access to the CMK in Account A

Policy name: rs_xacct_bucket_policy

Note: Replace the bucket name rs-xacct-kms-bucket with your bucket name.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowS3",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::rs-xacct-kms-bucket/*",
                "arn:aws:s3:::rs-xacct-kms-bucket"
            ]
        }
    ]
}

Policy name: rs_xacct_kms_policy

Note: Replace <ARN of kms_key_account_a from Account A> with your KMS key ARN from Account A.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowUseOfTheKey",
            "Effect": "Allow",
            "Action": [
                "kms:Encrypt",
                "kms:Decrypt",
                "kms:ReEncrypt*",
                "kms:GenerateDataKey*",
                "kms:DescribeKey"
            ],
            "Resource": [
                "<ARN of kms_key_account_a from Account A>"
            ]
        },
        {
            "Sid": "AllowAttachmentOfPersistentResources",
            "Effect": "Allow",
            "Action": [
                "kms:CreateGrant",
                "kms:ListGrants",
                "kms:RevokeGrant"
            ],
            "Resource": [
                "<ARN of kms_key_account_a from Account A>"
            ],
            "Condition": {
                "Bool": {
                    "kms:GrantIsForAWSResource": true
                }
            }
        }
    ]
}

b) Create a new IAM role called xacct_kms_role_account_a for the Amazon Redshift service with the following IAM policies:

rs_xacct_bucket_policy
rs_xacct_kms_policy
AWSGlueConsoleFullAccess (this managed policy provides the required permissions for the AWS Glue Data Catalog)

Save the IAM role ARN to be used shortly.

c) Change the trust relationship for the IAM role xacct_kms_role_account_a by choosing Edit trust relationship and replacing the existing trust policy with the following:

Note: Replace <Account B> with the AWS account ID for Account B.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "redshift.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    },
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<Account B>:root"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

d) Create an AWS Glue service IAM role called glue_service_role_account_a with the following policies attached:

AWSGlueServiceRole (AWS managed policy)
rs_xacct_bucket_policy (managed policy created earlier)
rs_xacct_kms_policy (managed policy created earlier)

Note: Be sure to update glue_service_role_account_a with your own IAM role

2. Sign in to Account B’s AWS console and change the AWS Region to us-west-2 (Oregon) if it’s not already selected.

a) Modify the existing IAM policy rs_xacct_assume_role_policy and replace the existing JSON policy with the following:

 Note: Replace <ARN for IAM role xacct_kms_role_account_a from Account A>.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt1487639602000",
            "Effect": "Allow",
            "Action": [
                "sts:AssumeRole"
            ],
            "Resource": [
"<ARN for IAM role xacct_kms_role_account_a from Account A>"
            ]
        }
    ]
}

Perform the Amazon Redshift COPY

1. Log in to the Amazon Redshift cluster from your query tool and create the customer table using the DDL following.

CREATE TABLE customer 
(
  c_custkey      INTEGER NOT NULL,
  c_name         VARCHAR(25) NOT NULL,
  c_address      VARCHAR(25) NOT NULL,
  c_city         VARCHAR(10) NOT NULL,
  c_nation       VARCHAR(15) NOT NULL,
  c_region       VARCHAR(12) NOT NULL,
  c_phone        VARCHAR(15) NOT NULL,
  c_mktsegment   VARCHAR(10) NOT NULL
);

2. Now you should be able to run the COPY statement following successfully.

copy customer from 's3://rs-xacct-kms-bucket/customer/' 
iam_role '<ARN for IAM role redshift_role_account_b from Account B,<ARN for IAM role xacct_kms_role_account_a from Account A>'
gzip
region 'us-west-2';

Note: Replace the IAM role ARNs separated by a comma without any spaces around it.

3. Run the sample query following to validate that the data was loaded successfully.

select * from customer limit 10;

Set up AWS Glue Data Catalog table for Redshift Spectrum to query

Let’s now create an AWS Glue crawler in Account A to crawl the same customer data and create a table called customer in the AWS Glue Data Catalog database spectrumdb_account_a in Account A following these steps:

Follow the same steps as outlined in Option 1 to create and run a crawler with the following changes:

  1. This time, create the crawler in Account A (as opposed to Account B for Option 1).
  2. Create an AWS Glue Data Catalog database spectrumdb_account_a in Account A (as opposed to spectrumdb_account_b in Account B), and choose that database for crawler to create the customer table.
  3. While providing S3 path, choose the option Specified path in my account (unlike Specified path in another account chosen for Option 1).
  4. Make sure to use glue_service_role_account_a created earlier as the AWS Glue service IAM role.=

Perform the Redshift Spectrum query

Now that the customer table is created in the AWS Glue Data Catalog, let’s query the table using Redshift Spectrum.

1. Log in to the Amazon Redshift cluster from your query tool and run the statements following. These create an external schema called spectrumxacct2 for Redshift Spectrum pointing to the AWS Glue Data Catalog database spectrumdb_account_a (created earlier from AWS Glue console) in Account A.

drop schema if exists spectrumxacct2;
create external schema spectrumxacct2
from data catalog 
database 'spectrumdb_account_a' 
iam_role '<ARN for IAM role redshift_role_account_b from Account B,<ARN for IAM role xacct_kms_role_account_a from Account A>'
create external database if not exists;

Note: Replace the IAM role ARNs separated by a comma without any spaces around it.

2. Run the following query, which should run successfully.

select * from spectrumxacct2.customer limit 10;

Note: Spectrum uses the AWS Glue Data Catalog in Account A, not Account B.

Summary

This post shows a step-by-step walkthrough of how to set up a cross-account Amazon Redshift COPY and query using Redshift Spectrum for a sample KMS encrypted dataset in Amazon S3. It demonstrates two solution options to choose from depending on which account’s AWS Glue Catalog you want to use for Redshift Spectrum.

If you have questions or suggestions, please leave a comment.

 


About the Author

Asim Kumar Sasmal is a Sr. Data Architect – IoT in the Global Specialty Practice of AWS Professional Services. He helps AWS customers around the globe to design and build data driven solutions by providing expert technical consulting, best practices guidance, and implementation services on AWS platform. He is passionate about working backwards from customer ask, help them to think big, and dive deep to solve real business problems by leveraging the power of AWS platform.

Closing the customer journey loop with Amazon Redshift at Equinox Fitness Clubs

Post Syndicated from Ryan Kelly original https://aws.amazon.com/blogs/big-data/closing-the-customer-journey-loop-with-amazon-redshift-at-equinox-fitness-clubs/

Clickstream analysis tools handle their data well, and some even have impressive BI interfaces. However, analyzing clickstream data in isolation comes with many limitations. For example, a customer is interested in a product or service on your website. They go to your physical store to purchase it. The clickstream analyst asks, “What happened after they viewed the product?” and the commerce analyst asks themselves, “What happened before they bought it?”

It’s no surprise that clickstream data can enhance your other data sources. When used with purchase data, it helps you determine abandoned carts or optimize marketing spending. Similarly, it helps you analyze offline and online behavior, and also behavior before customers even registered an account. However, once the benefits of clickstream data feeds are evident, you must accommodate new requests quickly.

This blog post shows how we, at Equinox Fitness Clubs, moved our data from Amazon Redshift to Amazon S3 to use a late-binding view strategy with our clickstream data. Expect such fun things as Apache Spark, Apache Parquet, data lakes, hive partitioning, and external tables, all of which we will talk about extensively in this post!

When we first started passing our clickstream data from its own tool to our Amazon Redshift data warehouse, speed was the primary concern. Our initial use case was to tie Salesforce data and Adobe Analytics data together to get a better understanding about our lead process. Adobe Analytics could tell us what channel and campaigns people came from, what pages they looked at during the visit, and if they submitted a lead form on our site. Salesforce could tell us if a lead was qualified, if they ever met an advisor, and ultimately if they became a member. Tying these two datasets together helped us better understand and optimize our marketing.

To begin, we knew the steps involved to centralize Salesforce and Adobe Analytics data into Amazon Redshift. However, even when joined together in Redshift, they needed a common identifier to talk to each other. Our first step involved generating and sending the same GUID to both Salesforce and Adobe Analytics when a person submitted a lead form on our website.

Next, we had to pass Salesforce data to Redshift. Luckily, those feeds already existed, so we could add this new GUID attribute in the feed and describe it in Redshift.

Similarly, we had to generate data feeds from Adobe Analytics to Amazon Redshift. Adobe Analytics provides Amazon S3 as a destination option for our data, so we passed the data to S3 and then created a job to send it to Redshift. The job involved taking the daily Adobe Analytics feed – which comes with a data file containing hundreds of columns and hundreds of thousands of rows, a collection of lookup files like the headers for the data, and a manifest file that describes the files that were sent – and passing it all to Amazon S3 in its raw state. From there, we used Amazon EMR with Apache Spark to process the data feed files into a single CSV file, and then save it to S3 so that we could perform the COPY command to send the data to Amazon Redshift.

The job ran for a few weeks and worked well until we started to use the data more frequently. While the job was effective, backdating data with new columns (schema evolution) started to occur. That’s when we decided we needed greater flexibility because of the nature of the data.

Data lake to the rescue

When we decided to refactor the job, we had two things lined up for us. First, we were already moving towards more of a data lake strategy. Second, Redshift Spectrum had been recently released. It would enable us to query these flat files of clickstream data in our data lake without ever having to run the COPY command and store it in Redshift. Also, we could more efficiently join the clickstream data to other data sources stored inside of Redshift.

We wanted to take advantage of self-describing data which combines the schema of the data with the data itself. Converting the data to self-describing data would help us manage the wide clickstream datasets and prevent the schema-evolution related challenges. We could pull every column we desired into the data lake file and then use only the important columns in our queries to speed up processing. To accomplish this flexibility, we used the Apache Parquet file format, which is both self-describing and blazing fast due to its columnar storage technology. We used Apache Spark on Amazon EMR to convert from CSV to Parquet, and partition the data for scanning performance, as shown in the following code.

 

from datetime import date, timedelta
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import json
import argparse
	
# Usage
# spark-submit all_omniture_to_parquet.py 2017-10-31 s3a:// eqxdl-prod-l-omniture eqxios eqxdl-prod eqeqxiosprod omniture_eqxios
# python -m tasks.s2w_all_omniture_to_parquet 2017-10-31

parser = argparse.ArgumentParser()
parser.add_argument('year_month_day_arg', help='Run date (yyyy-mm-dd)', type=str, default='XXX')
parser.add_argument('s3_protocol', help='S3 protocol i.e. s3a://',type=str, default='XXX')
parser.add_argument('source_bucket', help='Omniture source data bucket',type=str, default='XXX')
parser.add_argument('source_path', help='Omniture source data path',type=str, default='XXX')
parser.add_argument('target_bucket', help='Omniture target data bucket',type=str, default='XXX')
parser.add_argument('report_suite', help='Omniture report suite ID',type=str, default='XXX')
parser.add_argument('application', help='App name for job',type=str, default='XXX')
args = parser.parse_args()

spark = SparkSession\
        .builder\
        .appName(args.application)\
        .getOrCreate()

sc = spark.sparkContext

def manifest_toJSON(file, location):
	text = sc.textFile(file).collect()
	manifest = {'lookup_files': [], 'data_files': location, 'total_rows': 0}
	for x in text:
		if 'Lookup-File:' in x:
			manifest['lookup_files'].append(location+x.split(': ')[1])
		elif 'Data-File: 01' in x:
			wildcard_path = x.replace('Data-File: 01','*')
			manifest['data_files'] += wildcard_path
		elif 'Record-Count:' in x:
			manifest['total_rows'] += int(x.split(': ')[1])
	return manifest

# Create metadata by stitching together the file paths for
# the header file and the data file from the manifest file
# base_filepath = '/Users/rkelly/projects/sparkyTest/project_remodeling/ios_test_data/'
base_filepath = '{}{}/{}/'.format(args.s3_protocol, args.source_bucket, args.source_path)
manifest_filepath = base_filepath+'{}_{}.txt'.format(args.report_suite, args.year_month_day_arg)
metadata = manifest_toJSON(manifest_filepath, base_filepath)

# Create a list of files and their data
# Look specifically for the column_headers.tsv data
# Split on \x00 to remove the garbage encoding and return a string of headers
lookup_files = sc.textFile(','.join(metadata['lookup_files'])).collect()
encoded_header = lookup_files[[idx for idx, s in enumerate(lookup_files) if 'column_headers.tsv' in s][0]].split('\x00')
header = encoded_header[[idx for idx, s in enumerate(encoded_header) if '\t' in s][0]]\
			.replace('\n', '')\
			.replace('(', '')\
			.replace(')', '')\
			.replace(' ', '-')

# Create a schema for the list from the header file splitting on tabs
# Cast everything as a string to avoid data type failures
schema = StructType([ StructField(field, StringType(), True) for field in header.split('\t')])

# Bypass RDD and write data file as a dataframe
# then save as parquet to tie headers to their respective values
df = spark.read.csv(metadata['data_files'], header=False, schema=schema, sep='\t', nullValue=None)
destination_filepath = '{}{}/{}/dt={}/'.format(args.s3_protocol, args.target_bucket, args.application, args.year_month_day_arg)
df.write.mode('overwrite').parquet(destination_filepath)

# Gracefully exit out of spark and this file
sc.stop()
exit()

 

Using the AWS Glue Data Catalog allowed us to make our clickstream data available to be queried within Amazon Redshift and other query tools like Amazon Athena and Apache Spark. This is accomplished by mapping the Parquet file to a relational schema. AWS Glue, enables querying additional data in mere seconds. This is because schema changes can occur in real time. This means you can delete and add columns, reorder column indices, and change column types all at once. You can then query the data immediately after saving the schema. Additionally, Parquet format prevents failures when the shape of the data changes, or when certain columns are deprecated and removed from the data set.

We used the following query to create our first AWS Glue table for our Adobe Analytics website data. We ran this query in Amazon Redshift in SQL Workbench.

 

--First create your schema
create external schema omniture_prod
from data catalog 
database 'omniture' 
iam_role 'arn:aws:iam:::role

--Then create your “table” 
CREATE EXTERNAL TABLE omniture_prod.eqx_web (
  date_time		VARCHAR,
  va_closer_id		VARCHAR,
  va_closer_detail	VARCHAR,
  va_finder_detail	VARCHAR,
  va_finder_id		VARCHAR,
  ip			VARCHAR,
  domain		VARCHAR,
  post_evar1		VARCHAR
)
STORED AS PARQUET
LOCATION 's3://eqxdl-prod/omniture/eqx_web/'
table properties ('parquet.compress'='SNAPPY');

--Check your databases, schemas, and tables
select * from pg_catalog.svv_external_databases;
select * from pg_catalog.svv_external_schemas;
select * from pg_catalog.svv_external_tables;

 

After running this query, we added additional columns to the schema upon request throughthe AWS Glue interface. We also used partitioning to make our queries faster and cheaper.

At this point, we had a new schema folder in our database. It contained the external table that could be queried, but we wanted to take it a step further. We needed to add some transformations to the data such as:

  • Renaming IDs to strings
  • Concatenating values
  • Manipulating strings, excluding bot traffic that we send from AWS to test the website
  • Changing the column names to be more user friendly.

To do this, we created a view of the external table, as follows:

 

create view edw_t.f_omniture_web as 
select
    REPLACE(dt, '-', '') as hive_date_key,
    va_closer_id,
    va_closer_detail as last_touch_campaign,
    CASE
        WHEN (va_closer_id) = '1' THEN 'Paid Search'
        WHEN (va_closer_id) = '2' THEN 'Natural Search'
        WHEN (va_closer_id) = '3' THEN 'Display'
        WHEN (va_closer_id) = '4' THEN 'Email Acq'
        WHEN (va_closer_id) = '5' THEN 'Direct'
        WHEN (va_closer_id) = '6' THEN 'Session Refresh'
        WHEN (va_closer_id) = '7' THEN 'Social Media'
        WHEN (va_closer_id) = '8' THEN 'Referring Domains'
        WHEN (va_closer_id) = '9' THEN 'Email Memb'
        WHEN (va_closer_id) = '10' THEN 'Social Placement'
        WHEN (va_closer_id) = '11' THEN 'Other Placement'
        WHEN (va_closer_id) = '12' THEN 'Partnership'
        WHEN (va_closer_id) = '13' THEN 'Other Eqx Sites'
        WHEN (va_closer_id) = '14' THEN 'Influencers'
        ELSE NULL
    END AS last_touch_channel,
    va_finder_detail as first_touch_campaign,
    va_finder_id as va_finder_id,
    CASE
        WHEN (va_finder_id) = '1' THEN 'Paid Search'
        WHEN (va_finder_id) = '2' THEN 'Natural Search'
        WHEN (va_finder_id) = '3' THEN 'Display'
        WHEN (va_finder_id) = '4' THEN 'Email Acq'
        WHEN (va_finder_id) = '5' THEN 'Direct'
        WHEN (va_finder_id) = '6' THEN 'Session Refresh'
        WHEN (va_finder_id) = '7' THEN 'Social Media'
        WHEN (va_finder_id) = '8' THEN 'Referring Domains'
        WHEN (va_finder_id) = '9' THEN 'Email Memb'
        WHEN (va_finder_id) = '10' THEN 'Social Placement'
        WHEN (va_finder_id) = '11' THEN 'Other Placement'
        WHEN (va_finder_id) = '12' THEN 'Partnership'
        WHEN (va_finder_id) = '13' THEN 'Other Eqx Sites'
        WHEN (va_closer_id) = '14' THEN 'Influencers'
        ELSE NULL
    END AS first_touch_channel,
    ip as ip_address,
    domain as domain,
    post_evar1 AS internal_compaign,
    post_evar10 as site_subsection_nm,
    post_evar11 as IOS_app_view_txt,
    post_evar12 AS site_section_nm,
    post_evar15 AS transaction_id,
    post_evar23 as join_barcode_id,
    post_evar3 AS page_nm,
    post_evar32 as host_nm,
    post_evar41 as class_category_id,
    post_evar42 as class_id,
    post_evar43 as class_instance_id,
    post_evar60 AS referral_source_txt,
    post_evar69 as adwords_gclid,
    post_evar7 as usersec_tracking_id, 
    post_evar8 as facility_id,
    post_event_list as post_event_list,  
    post_visid_low||post_visid_high as unique_adobe_id,
    post_visid_type as post_visid_type,
    post_page_event as hit_type,
    visit_num as visit_number,
    visit_start_time_gmt,
    post_evar25 as login_status,
    exclude_hit as exclude_hit,
    hit_source as hit_source,
    geo_zip,
    geo_city,
    geo_region,
    geo_country,
    post_evar64 as api_error_msg,
    post_evar70 as page_load_time,
    post_evar78 as join_transaction_id,
    post_evar9 as page_url,
    visit_start_pagename as entry_pg,
    post_tnt as abtest_campaign,
    post_tnt_action as abtest_experience,
    user_agent as user_agent,
    mobile_id as mobile_id,
    cast(date_time as timestamp) as date_time,
    CONVERT_TIMEZONE(
        'America/New_York', -- timezone of origin
        (cast(
            case 
            when post_t_time_info like '%undefined%' then '0'
            when post_t_time_info is null then '0'
            when post_t_time_info = '' then '0'
            when cast(split_part(post_t_time_info,' ',4) as int) < 0
              then left(split_part(post_t_time_info,' ',4),4)
            else left(split_part(post_t_time_info,' ',4),3) end as int
        )/60),
        cast(date_time as timestamp)
    ) as date_time_local,
    post_t_time_info as local_timezone
from omniture_prod.eqx_web
where exclude_hit = '0'
and hit_source not in ('5','7','8','9')

and domain <> 'amazonaws.com'
and domain <> 'amazon.com'

WITH NO SCHEMA BINDING;

 

Now we can perform queries from Amazon Redshift, blending our structured Salesforce data with our semi-structured, dynamic Adobe Analytics data. With these changes, our data became extremely flexible, friendly on storage size, and very performant when queried. Since then, we have started using Redshift Spectrum for many use cases from data quality checks, machine data, historical data archiving, and empowering our data analysts and scientists to more easily blend and onboard data.

 

with web_leads as (
  select transaction_id,last_touch_channel
  from edw_t.f_omniture_web
  where hive_date_key = '20170301'
      and post_event_list like '%201%'
      and transaction_id != '807f0cdc-80cf-42d3-8d75-e55e277a8718'
),
opp_lifecycle as (
  SELECT lifecycle,weblead_transactionid
  FROM edw_t.f_opportunity
  where weblead_transactionid is not null
  and created_date_key between '20170220'and '20170310'
)
select
  web_leads.transaction_id,
  coalesce(opp_lifecycle.lifecycle, 'N/A') as Lifecycle
from web_leads
left join opp_lifecycle on web_leads.transaction_id = opp_lifecycle.weblead_transactionid

Conclusion

We were able to setup an efficient and flexible analytics platform for clickstream data by combining the Amazon S3 data lake with Amazon Redshift. This has eliminated the need to always load clickstream data into the data warehouse, and also made the platform adaptable to schema changes in the incoming data. You can read the Redshift documentation to get started with Redshift Spectrum, and also watch our presentation at the AWS Chicago Summit 2018 below.


Additional Reading

If you found this post helpful, be sure to check out From Data Lake to Data Warehouse: Enhancing Customer 360 with Amazon Redshift Spectrum, and Narrativ is helping producers monetize their digital content with Amazon Redshift.

 


About the Author

Ryan Kelly is a data architect at Equinox, where he helps outline and implement frameworks for data initiatives. He also leads clickstream tracking which helps aid teams with insights on their digital initiatives. Ryan loves making it easier for people to reach and ingest their data for the purposes of business intelligence, analytics, and product/service enrichment. He also loves exploring and vetting new technologies to see how they can enhance what they do at Equinox.

 

 

 

Analyze Apache Parquet optimized data using Amazon Kinesis Data Firehose, Amazon Athena, and Amazon Redshift

Post Syndicated from Roy Hasson original https://aws.amazon.com/blogs/big-data/analyzing-apache-parquet-optimized-data-using-amazon-kinesis-data-firehose-amazon-athena-and-amazon-redshift/

Amazon Kinesis Data Firehose is the easiest way to capture and stream data into a data lake built on Amazon S3. This data can be anything—from AWS service logs like AWS CloudTrail log files, Amazon VPC Flow Logs, Application Load Balancer logs, and others. It can also be IoT events, game events, and much more. To efficiently query this data, a time-consuming ETL (extract, transform, and load) process is required to massage and convert the data to an optimal file format, which increases the time to insight. This situation is less than ideal, especially for real-time data that loses its value over time.

To solve this common challenge, Kinesis Data Firehose can now save data to Amazon S3 in Apache Parquet or Apache ORC format. These are optimized columnar formats that are highly recommended for best performance and cost-savings when querying data in S3. This feature directly benefits you if you use Amazon Athena, Amazon Redshift, AWS Glue, Amazon EMR, or any other big data tools that are available from the AWS Partner Network and through the open-source community.

Amazon Connect is a simple-to-use, cloud-based contact center service that makes it easy for any business to provide a great customer experience at a lower cost than common alternatives. Its open platform design enables easy integration with other systems. One of those systems is Amazon Kinesis—in particular, Kinesis Data Streams and Kinesis Data Firehose.

What’s really exciting is that you can now save events from Amazon Connect to S3 in Apache Parquet format. You can then perform analytics using Amazon Athena and Amazon Redshift Spectrum in real time, taking advantage of this key performance and cost optimization. Of course, Amazon Connect is only one example. This new capability opens the door for a great deal of opportunity, especially as organizations continue to build their data lakes.

Amazon Connect includes an array of analytics views in the Administrator dashboard. But you might want to run other types of analysis. In this post, I describe how to set up a data stream from Amazon Connect through Kinesis Data Streams and Kinesis Data Firehose and out to S3, and then perform analytics using Athena and Amazon Redshift Spectrum. I focus primarily on the Kinesis Data Firehose support for Parquet and its integration with the AWS Glue Data Catalog, Amazon Athena, and Amazon Redshift.

Solution overview

Here is how the solution is laid out:

 

 

The following sections walk you through each of these steps to set up the pipeline.

1. Define the schema

When Kinesis Data Firehose processes incoming events and converts the data to Parquet, it needs to know which schema to apply. The reason is that many times, incoming events contain all or some of the expected fields based on which values the producers are advertising. A typical process is to normalize the schema during a batch ETL job so that you end up with a consistent schema that can easily be understood and queried. Doing this introduces latency due to the nature of the batch process. To overcome this issue, Kinesis Data Firehose requires the schema to be defined in advance.

To see the available columns and structures, see Amazon Connect Agent Event Streams. For the purpose of simplicity, I opted to make all the columns of type String rather than create the nested structures. But you can definitely do that if you want.

The simplest way to define the schema is to create a table in the Amazon Athena console. Open the Athena console, and paste the following create table statement, substituting your own S3 bucket and prefix for where your event data will be stored. A Data Catalog database is a logical container that holds the different tables that you can create. The default database name shown here should already exist. If it doesn’t, you can create it or use another database that you’ve already created.

CREATE EXTERNAL TABLE default.kfhconnectblog (
  awsaccountid string,
  agentarn string,
  currentagentsnapshot string,
  eventid string,
  eventtimestamp string,
  eventtype string,
  instancearn string,
  previousagentsnapshot string,
  version string
)
STORED AS parquet
LOCATION 's3://your_bucket/kfhconnectblog/'
TBLPROPERTIES ("parquet.compression"="SNAPPY")

That’s all you have to do to prepare the schema for Kinesis Data Firehose.

2. Define the data streams

Next, you need to define the Kinesis data streams that will be used to stream the Amazon Connect events.  Open the Kinesis Data Streams console and create two streams.  You can configure them with only one shard each because you don’t have a lot of data right now.

3. Define the Kinesis Data Firehose delivery stream for Parquet

Let’s configure the Data Firehose delivery stream using the data stream as the source and Amazon S3 as the output. Start by opening the Kinesis Data Firehose console and creating a new data delivery stream. Give it a name, and associate it with the Kinesis data stream that you created in Step 2.

As shown in the following screenshot, enable Record format conversion (1) and choose Apache Parquet (2). As you can see, Apache ORC is also supported. Scroll down and provide the AWS Glue Data Catalog database name (3) and table names (4) that you created in Step 1. Choose Next.

To make things easier, the output S3 bucket and prefix fields are automatically populated using the values that you defined in the LOCATION parameter of the create table statement from Step 1. Pretty cool. Additionally, you have the option to save the raw events into another location as defined in the Source record S3 backup section. Don’t forget to add a trailing forward slash “ / “ so that Data Firehose creates the date partitions inside that prefix.

On the next page, in the S3 buffer conditions section, there is a note about configuring a large buffer size. The Parquet file format is highly efficient in how it stores and compresses data. Increasing the buffer size allows you to pack more rows into each output file, which is preferred and gives you the most benefit from Parquet.

Compression using Snappy is automatically enabled for both Parquet and ORC. You can modify the compression algorithm by using the Kinesis Data Firehose API and update the OutputFormatConfiguration.

Be sure to also enable Amazon CloudWatch Logs so that you can debug any issues that you might run into.

Lastly, finalize the creation of the Firehose delivery stream, and continue on to the next section.

4. Set up the Amazon Connect contact center

After setting up the Kinesis pipeline, you now need to set up a simple contact center in Amazon Connect. The Getting Started page provides clear instructions on how to set up your environment, acquire a phone number, and create an agent to accept calls.

After setting up the contact center, in the Amazon Connect console, choose your Instance Alias, and then choose Data Streaming. Under Agent Event, choose the Kinesis data stream that you created in Step 2, and then choose Save.

At this point, your pipeline is complete.  Agent events from Amazon Connect are generated as agents go about their day. Events are sent via Kinesis Data Streams to Kinesis Data Firehose, which converts the event data from JSON to Parquet and stores it in S3. Athena and Amazon Redshift Spectrum can simply query the data without any additional work.

So let’s generate some data. Go back into the Administrator console for your Amazon Connect contact center, and create an agent to handle incoming calls. In this example, I creatively named mine Agent One. After it is created, Agent One can get to work and log into their console and set their availability to Available so that they are ready to receive calls.

To make the data a bit more interesting, I also created a second agent, Agent Two. I then made some incoming and outgoing calls and caused some failures to occur, so I now have enough data available to analyze.

5. Analyze the data with Athena

Let’s open the Athena console and run some queries. One thing you’ll notice is that when we created the schema for the dataset, we defined some of the fields as Strings even though in the documentation they were complex structures.  The reason for doing that was simply to show some of the flexibility of Athena to be able to parse JSON data. However, you can define nested structures in your table schema so that Kinesis Data Firehose applies the appropriate schema to the Parquet file.

Let’s run the first query to see which agents have logged into the system.

The query might look complex, but it’s fairly straightforward:

WITH dataset AS (
  SELECT 
    from_iso8601_timestamp(eventtimestamp) AS event_ts,
    eventtype,
    -- CURRENT STATE
    json_extract_scalar(
      currentagentsnapshot,
      '$.agentstatus.name') AS current_status,
    from_iso8601_timestamp(
      json_extract_scalar(
        currentagentsnapshot,
        '$.agentstatus.starttimestamp')) AS current_starttimestamp,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.firstname') AS current_firstname,
    json_extract_scalar(
      currentagentsnapshot,
      '$.configuration.lastname') AS current_lastname,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.username') AS current_username,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.routingprofile.defaultoutboundqueue.name') AS               current_outboundqueue,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.routingprofile.inboundqueues[0].name') as current_inboundqueue,
    -- PREVIOUS STATE
    json_extract_scalar(
      previousagentsnapshot, 
      '$.agentstatus.name') as prev_status,
    from_iso8601_timestamp(
      json_extract_scalar(
        previousagentsnapshot, 
       '$.agentstatus.starttimestamp')) as prev_starttimestamp,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.firstname') as prev_firstname,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.lastname') as prev_lastname,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.username') as prev_username,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.routingprofile.defaultoutboundqueue.name') as current_outboundqueue,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.routingprofile.inboundqueues[0].name') as prev_inboundqueue
  from kfhconnectblog
  where eventtype <> 'HEART_BEAT'
)
SELECT
  current_status as status,
  current_username as username,
  event_ts
FROM dataset
WHERE eventtype = 'LOGIN' AND current_username <> ''
ORDER BY event_ts DESC

The query output looks something like this:

Here is another query that shows the sessions each of the agents engaged with. It tells us where they were incoming or outgoing, if they were completed, and where there were missed or failed calls.

WITH src AS (
  SELECT
     eventid,
     json_extract_scalar(currentagentsnapshot, '$.configuration.username') as username,
     cast(json_extract(currentagentsnapshot, '$.contacts') AS ARRAY(JSON)) as c,
     cast(json_extract(previousagentsnapshot, '$.contacts') AS ARRAY(JSON)) as p
  from kfhconnectblog
),
src2 AS (
  SELECT *
  FROM src CROSS JOIN UNNEST (c, p) AS contacts(c_item, p_item)
),
dataset AS (
SELECT 
  eventid,
  username,
  json_extract_scalar(c_item, '$.contactid') as c_contactid,
  json_extract_scalar(c_item, '$.channel') as c_channel,
  json_extract_scalar(c_item, '$.initiationmethod') as c_direction,
  json_extract_scalar(c_item, '$.queue.name') as c_queue,
  json_extract_scalar(c_item, '$.state') as c_state,
  from_iso8601_timestamp(json_extract_scalar(c_item, '$.statestarttimestamp')) as c_ts,
  
  json_extract_scalar(p_item, '$.contactid') as p_contactid,
  json_extract_scalar(p_item, '$.channel') as p_channel,
  json_extract_scalar(p_item, '$.initiationmethod') as p_direction,
  json_extract_scalar(p_item, '$.queue.name') as p_queue,
  json_extract_scalar(p_item, '$.state') as p_state,
  from_iso8601_timestamp(json_extract_scalar(p_item, '$.statestarttimestamp')) as p_ts
FROM src2
)
SELECT 
  username,
  c_channel as channel,
  c_direction as direction,
  p_state as prev_state,
  c_state as current_state,
  c_ts as current_ts,
  c_contactid as id
FROM dataset
WHERE c_contactid = p_contactid
ORDER BY id DESC, current_ts ASC

The query output looks similar to the following:

6. Analyze the data with Amazon Redshift Spectrum

With Amazon Redshift Spectrum, you can query data directly in S3 using your existing Amazon Redshift data warehouse cluster. Because the data is already in Parquet format, Redshift Spectrum gets the same great benefits that Athena does.

Here is a simple query to show querying the same data from Amazon Redshift. Note that to do this, you need to first create an external schema in Amazon Redshift that points to the AWS Glue Data Catalog.

SELECT 
  eventtype,
  json_extract_path_text(currentagentsnapshot,'agentstatus','name') AS current_status,
  json_extract_path_text(currentagentsnapshot, 'configuration','firstname') AS current_firstname,
  json_extract_path_text(currentagentsnapshot, 'configuration','lastname') AS current_lastname,
  json_extract_path_text(
    currentagentsnapshot,
    'configuration','routingprofile','defaultoutboundqueue','name') AS current_outboundqueue,
FROM default_schema.kfhconnectblog

The following shows the query output:

Summary

In this post, I showed you how to use Kinesis Data Firehose to ingest and convert data to columnar file format, enabling real-time analysis using Athena and Amazon Redshift. This great feature enables a level of optimization in both cost and performance that you need when storing and analyzing large amounts of data. This feature is equally important if you are investing in building data lakes on AWS.

 


Additional Reading

If you found this post useful, be sure to check out Analyzing VPC Flow Logs with Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight and Work with partitioned data in AWS Glue.


About the Author

Roy Hasson is a Global Business Development Manager for AWS Analytics. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan cheering his team on and hanging out with his family.

 

 

 

How to retain system tables’ data spanning multiple Amazon Redshift clusters and run cross-cluster diagnostic queries

Post Syndicated from Karthik Sonti original https://aws.amazon.com/blogs/big-data/how-to-retain-system-tables-data-spanning-multiple-amazon-redshift-clusters-and-run-cross-cluster-diagnostic-queries/

Amazon Redshift is a data warehouse service that logs the history of the system in STL log tables. The STL log tables manage disk space by retaining only two to five days of log history, depending on log usage and available disk space.

To retain STL tables’ data for an extended period, you usually have to create a replica table for every system table. Then, for each you load the data from the system table into the replica at regular intervals. By maintaining replica tables for STL tables, you can run diagnostic queries on historical data from the STL tables. You then can derive insights from query execution times, query plans, and disk-spill patterns, and make better cluster-sizing decisions. However, refreshing replica tables with live data from STL tables at regular intervals requires schedulers such as Cron or AWS Data Pipeline. Also, these tables are specific to one cluster and they are not accessible after the cluster is terminated. This is especially true for transient Amazon Redshift clusters that last for only a finite period of ad hoc query execution.

In this blog post, I present a solution that exports system tables from multiple Amazon Redshift clusters into an Amazon S3 bucket. This solution is serverless, and you can schedule it as frequently as every five minutes. The AWS CloudFormation deployment template that I provide automates the solution setup in your environment. The system tables’ data in the Amazon S3 bucket is partitioned by cluster name and query execution date to enable efficient joins in cross-cluster diagnostic queries.

I also provide another CloudFormation template later in this post. This second template helps to automate the creation of tables in the AWS Glue Data Catalog for the system tables’ data stored in Amazon S3. After the system tables are exported to Amazon S3, you can run cross-cluster diagnostic queries on the system tables’ data and derive insights about query executions in each Amazon Redshift cluster. You can do this using Amazon QuickSight, Amazon Athena, Amazon EMR, or Amazon Redshift Spectrum.

You can find all the code examples in this post, including the CloudFormation templates, AWS Glue extract, transform, and load (ETL) scripts, and the resolution steps for common errors you might encounter in this GitHub repository.

Solution overview

The solution in this post uses AWS Glue to export system tables’ log data from Amazon Redshift clusters into Amazon S3. The AWS Glue ETL jobs are invoked at a scheduled interval by AWS Lambda. AWS Systems Manager, which provides secure, hierarchical storage for configuration data management and secrets management, maintains the details of Amazon Redshift clusters for which the solution is enabled. The last-fetched time stamp values for the respective cluster-table combination are maintained in an Amazon DynamoDB table.

The following diagram covers the key steps involved in this solution.

The solution as illustrated in the preceding diagram flows like this:

  1. The Lambda function, invoke_rs_stl_export_etl, is triggered at regular intervals, as controlled by Amazon CloudWatch. It’s triggered to look up the AWS Systems Manager parameter store to get the details of the Amazon Redshift clusters for which the system table export is enabled.
  2. The same Lambda function, based on the Amazon Redshift cluster details obtained in step 1, invokes the AWS Glue ETL job designated for the Amazon Redshift cluster. If an ETL job for the cluster is not found, the Lambda function creates one.
  3. The ETL job invoked for the Amazon Redshift cluster gets the cluster credentials from the parameter store. It gets from the DynamoDB table the last exported time stamp of when each of the system tables was exported from the respective Amazon Redshift cluster.
  4. The ETL job unloads the system tables’ data from the Amazon Redshift cluster into an Amazon S3 bucket.
  5. The ETL job updates the DynamoDB table with the last exported time stamp value for each system table exported from the Amazon Redshift cluster.
  6. The Amazon Redshift cluster system tables’ data is available in Amazon S3 and is partitioned by cluster name and date for running cross-cluster diagnostic queries.

Understanding the configuration data

This solution uses AWS Systems Manager parameter store to store the Amazon Redshift cluster credentials securely. The parameter store also securely stores other configuration information that the AWS Glue ETL job needs for extracting and storing system tables’ data in Amazon S3. Systems Manager comes with a default AWS Key Management Service (AWS KMS) key that it uses to encrypt the password component of the Amazon Redshift cluster credentials.

The following table explains the global parameters and cluster-specific parameters required in this solution. The global parameters are defined once and applicable at the overall solution level. The cluster-specific parameters are specific to an Amazon Redshift cluster and repeat for each cluster for which you enable this post’s solution. The CloudFormation template explained later in this post creates these parameters as part of the deployment process.

Parameter nameTypeDescription
Global parametersdefined once and applied to all jobs
redshift_query_logs.global.s3_prefixStringThe Amazon S3 path where the query logs are exported. Under this path, each exported table is partitioned by cluster name and date.
redshift_query_logs.global.tempdirStringThe Amazon S3 path that AWS Glue ETL jobs use for temporarily staging the data.
redshift_query_logs.global.role>StringThe name of the role that the AWS Glue ETL jobs assume. Just the role name is sufficient. The complete Amazon Resource Name (ARN) is not required.
redshift_query_logs.global.enabled_cluster_listStringListA comma-separated list of cluster names for which system tables’ data export is enabled. This gives flexibility for a user to exclude certain clusters.
Cluster-specific parametersfor each cluster specified in the enabled_cluster_list parameter
redshift_query_logs.<<cluster_name>>.connectionStringThe name of the AWS Glue Data Catalog connection to the Amazon Redshift cluster. For example, if the cluster name is product_warehouse, the entry is redshift_query_logs.product_warehouse.connection.
redshift_query_logs.<<cluster_name>>.userStringThe user name that AWS Glue uses to connect to the Amazon Redshift cluster.
redshift_query_logs.<<cluster_name>>.passwordSecure StringThe password that AWS Glue uses to connect the Amazon Redshift cluster’s encrypted-by key that is managed in AWS KMS.

For example, suppose that you have two Amazon Redshift clusters, product-warehouse and category-management, for which the solution described in this post is enabled. In this case, the parameters shown in the following screenshot are created by the solution deployment CloudFormation template in the AWS Systems Manager parameter store.

Solution deployment

To make it easier for you to get started, I created a CloudFormation template that automatically configures and deploys the solution—only one step is required after deployment.

Prerequisites

To deploy the solution, you must have one or more Amazon Redshift clusters in a private subnet. This subnet must have a network address translation (NAT) gateway or a NAT instance configured, and also a security group with a self-referencing inbound rule for all TCP ports. For more information about why AWS Glue ETL needs the configuration it does, described previously, see Connecting to a JDBC Data Store in a VPC in the AWS Glue documentation.

To start the deployment, launch the CloudFormation template:

CloudFormation stack parameters

The following table lists and describes the parameters for deploying the solution to export query logs from multiple Amazon Redshift clusters.

PropertyDefaultDescription
S3BucketmybucketThe bucket this solution uses to store the exported query logs, stage code artifacts, and perform unloads from Amazon Redshift. For example, the mybucket/extract_rs_logs/data bucket is used for storing all the exported query logs for each system table partitioned by the cluster. The mybucket/extract_rs_logs/temp/ bucket is used for temporarily staging the unloaded data from Amazon Redshift. The mybucket/extract_rs_logs/code bucket is used for storing all the code artifacts required for Lambda and the AWS Glue ETL jobs.
ExportEnabledRedshiftClustersRequires InputA comma-separated list of cluster names from which the system table logs need to be exported.
DataStoreSecurityGroupsRequires InputA list of security groups with an inbound rule to the Amazon Redshift clusters provided in the parameter, ExportEnabledClusters. These security groups should also have a self-referencing inbound rule on all TCP ports, as explained on Connecting to a JDBC Data Store in a VPC.

After you launch the template and create the stack, you see that the following resources have been created:

  1. AWS Glue connections for each Amazon Redshift cluster you provided in the CloudFormation stack parameter, ExportEnabledRedshiftClusters.
  2. All parameters required for this solution created in the parameter store.
  3. The Lambda function that invokes the AWS Glue ETL jobs for each configured Amazon Redshift cluster at a regular interval of five minutes.
  4. The DynamoDB table that captures the last exported time stamps for each exported cluster-table combination.
  5. The AWS Glue ETL jobs to export query logs from each Amazon Redshift cluster provided in the CloudFormation stack parameter, ExportEnabledRedshiftClusters.
  6. The IAM roles and policies required for the Lambda function and AWS Glue ETL jobs.

After the deployment

For each Amazon Redshift cluster for which you enabled the solution through the CloudFormation stack parameter, ExportEnabledRedshiftClusters, the automated deployment includes temporary credentials that you must update after the deployment:

  1. Go to the parameter store.
  2. Note the parameters <<cluster_name>>.user and redshift_query_logs.<<cluster_name>>.password that correspond to each Amazon Redshift cluster for which you enabled this solution. Edit these parameters to replace the placeholder values with the right credentials.

For example, if product-warehouse is one of the clusters for which you enabled system table export, you edit these two parameters with the right user name and password and choose Save parameter.

Querying the exported system tables

Within a few minutes after the solution deployment, you should see Amazon Redshift query logs being exported to the Amazon S3 location, <<S3Bucket_you_provided>>/extract_redshift_query_logs/data/. In that bucket, you should see the eight system tables partitioned by customer name and date: stl_alert_event_log, stl_dlltext, stl_explain, stl_query, stl_querytext, stl_scan, stl_utilitytext, and stl_wlm_query.

To run cross-cluster diagnostic queries on the exported system tables, create external tables in the AWS Glue Data Catalog. To make it easier for you to get started, I provide a CloudFormation template that creates an AWS Glue crawler, which crawls the exported system tables stored in Amazon S3 and builds the external tables in the AWS Glue Data Catalog.

Launch this CloudFormation template to create external tables that correspond to the Amazon Redshift system tables. S3Bucket is the only input parameter required for this stack deployment. Provide the same Amazon S3 bucket name where the system tables’ data is being exported. After you successfully create the stack, you can see the eight tables in the database, redshift_query_logs_db, as shown in the following screenshot.

Now, navigate to the Athena console to run cross-cluster diagnostic queries. The following screenshot shows a diagnostic query executed in Athena that retrieves query alerts logged across multiple Amazon Redshift clusters.

You can build the following example Amazon QuickSight dashboard by running cross-cluster diagnostic queries on Athena to identify the hourly query count and the key query alert events across multiple Amazon Redshift clusters.

How to extend the solution

You can extend this post’s solution in two ways:

  • Add any new Amazon Redshift clusters that you spin up after you deploy the solution.
  • Add other system tables or custom query results to the list of exports from an Amazon Redshift cluster.

Extend the solution to other Amazon Redshift clusters

To extend the solution to more Amazon Redshift clusters, add the three cluster-specific parameters in the AWS Systems Manager parameter store following the guidelines earlier in this post. Modify the redshift_query_logs.global.enabled_cluster_list parameter to append the new cluster to the comma-separated string.

Extend the solution to add other tables or custom queries to an Amazon Redshift cluster

The current solution ships with the export functionality for the following Amazon Redshift system tables:

  • stl_alert_event_log
  • stl_dlltext
  • stl_explain
  • stl_query
  • stl_querytext
  • stl_scan
  • stl_utilitytext
  • stl_wlm_query

You can easily add another system table or custom query by adding a few lines of code to the AWS Glue ETL job, <<cluster-name>_extract_rs_query_logs. For example, suppose that from the product-warehouse Amazon Redshift cluster you want to export orders greater than $2,000. To do so, add the following five lines of code to the AWS Glue ETL job product-warehouse_extract_rs_query_logs, where product-warehouse is your cluster name:

  1. Get the last-processed time-stamp value. The function creates a value if it doesn’t already exist.

salesLastProcessTSValue = functions.getLastProcessedTSValue(trackingEntry=”mydb.sales_2000",job_configs=job_configs)

  1. Run the custom query with the time stamp.

returnDF=functions.runQuery(query="select * from sales s join order o where o.order_amnt > 2000 and sale_timestamp > '{}'".format (salesLastProcessTSValue) ,tableName="mydb.sales_2000",job_configs=job_configs)

  1. Save the results to Amazon S3.

functions.saveToS3(dataframe=returnDF,s3Prefix=s3Prefix,tableName="mydb.sales_2000",partitionColumns=["sale_date"],job_configs=job_configs)

  1. Get the latest time-stamp value from the returned data frame in Step 2.

latestTimestampVal=functions.getMaxValue(returnDF,"sale_timestamp",job_configs)

  1. Update the last-processed time-stamp value in the DynamoDB table.

functions.updateLastProcessedTSValue(“mydb.sales_2000",latestTimestampVal[0],job_configs)

Conclusion

In this post, I demonstrate a serverless solution to retain the system tables’ log data across multiple Amazon Redshift clusters. By using this solution, you can incrementally export the data from system tables into Amazon S3. By performing this export, you can build cross-cluster diagnostic queries, build audit dashboards, and derive insights into capacity planning by using services such as Athena. I also demonstrate how you can extend this solution to other ad hoc query use cases or tables other than system tables by adding a few lines of code.


Additional Reading

If you found this post useful, be sure to check out Using Amazon Redshift Spectrum, Amazon Athena, and AWS Glue with Node.js in Production and Amazon Redshift – 2017 Recap.


About the Author

Karthik Sonti is a senior big data architect at Amazon Web Services. He helps AWS customers build big data and analytical solutions and provides guidance on architecture and best practices.

 

 

 

 

Amazon Redshift – 2017 Recap

Post Syndicated from Larry Heathcote original https://aws.amazon.com/blogs/big-data/amazon-redshift-2017-recap/

We have been busy adding new features and capabilities to Amazon Redshift, and we wanted to give you a glimpse of what we’ve been doing over the past year. In this article, we recap a few of our enhancements and provide a set of resources that you can use to learn more and get the most out of your Amazon Redshift implementation.

In 2017, we made more than 30 announcements about Amazon Redshift. We listened to you, our customers, and delivered Redshift Spectrum, a feature of Amazon Redshift, that gives you the ability to extend analytics to your data lake—without moving data. We launched new DC2 nodes, doubling performance at the same price. We also announced many new features that provide greater scalability, better performance, more automation, and easier ways to manage your analytics workloads.

To see a full list of our launches, visit our what’s new page—and be sure to subscribe to our RSS feed.

Major launches in 2017

Amazon Redshift Spectrumextend analytics to your data lake, without moving data

We launched Amazon Redshift Spectrum to give you the freedom to store data in Amazon S3, in open file formats, and have it available for analytics without the need to load it into your Amazon Redshift cluster. It enables you to easily join datasets across Redshift clusters and S3 to provide unique insights that you would not be able to obtain by querying independent data silos.

With Redshift Spectrum, you can run SQL queries against data in an Amazon S3 data lake as easily as you analyze data stored in Amazon Redshift. And you can do it without loading data or resizing the Amazon Redshift cluster based on growing data volumes. Redshift Spectrum separates compute and storage to meet workload demands for data size, concurrency, and performance. Redshift Spectrum scales processing across thousands of nodes, so results are fast, even with massive datasets and complex queries. You can query open file formats that you already use—such as Apache Avro, CSV, Grok, ORC, Apache Parquet, RCFile, RegexSerDe, SequenceFile, TextFile, and TSV—directly in Amazon S3, without any data movement.

For complex queries, Redshift Spectrum provided a 67 percent performance gain,” said Rafi Ton, CEO, NUVIAD. “Using the Parquet data format, Redshift Spectrum delivered an 80 percent performance improvement. For us, this was substantial.

To learn more about Redshift Spectrum, watch our AWS Summit session Intro to Amazon Redshift Spectrum: Now Query Exabytes of Data in S3, and read our announcement blog post Amazon Redshift Spectrum – Exabyte-Scale In-Place Queries of S3 Data.

DC2 nodes—twice the performance of DC1 at the same price

We launched second-generation Dense Compute (DC2) nodes to provide low latency and high throughput for demanding data warehousing workloads. DC2 nodes feature powerful Intel E5-2686 v4 (Broadwell) CPUs, fast DDR4 memory, and NVMe-based solid state disks (SSDs). We’ve tuned Amazon Redshift to take advantage of the better CPU, network, and disk on DC2 nodes, providing up to twice the performance of DC1 at the same price. Our DC2.8xlarge instances now provide twice the memory per slice of data and an optimized storage layout with 30 percent better storage utilization.

Redshift allows us to quickly spin up clusters and provide our data scientists with a fast and easy method to access data and generate insights,” said Bradley Todd, technology architect at Liberty Mutual. “We saw a 9x reduction in month-end reporting time with Redshift DC2 nodes as compared to DC1.”

Read our customer testimonials to see the performance gains our customers are experiencing with DC2 nodes. To learn more, read our blog post Amazon Redshift Dense Compute (DC2) Nodes Deliver Twice the Performance as DC1 at the Same Price.

Performance enhancements— 3x-5x faster queries

On average, our customers are seeing 3x to 5x performance gains for most of their critical workloads.

We introduced short query acceleration to speed up execution of queries such as reports, dashboards, and interactive analysis. Short query acceleration uses machine learning to predict the execution time of a query, and to move short running queries to an express short query queue for faster processing.

We launched results caching to deliver sub-second response times for queries that are repeated, such as dashboards, visualizations, and those from BI tools. Results caching has an added benefit of freeing up resources to improve the performance of all other queries.

We also introduced late materialization to reduce the amount of data scanned for queries with predicate filters by batching and factoring in the filtering of predicates before fetching data blocks in the next column. For example, if only 10 percent of the table rows satisfy the predicate filters, Amazon Redshift can potentially save 90 percent of the I/O for the remaining columns to improve query performance.

We launched query monitoring rules and pre-defined rule templates. These features make it easier for you to set metrics-based performance boundaries for workload management (WLM) queries, and specify what action to take when a query goes beyond those boundaries. For example, for a queue that’s dedicated to short-running queries, you might create a rule that aborts queries that run for more than 60 seconds. To track poorly designed queries, you might have another rule that logs queries that contain nested loops.

Customer insights

Amazon Redshift and Redshift Spectrum serve customers across a variety of industries and sizes, from startups to large enterprises. Visit our customer page to see the success that customers are having with our recent enhancements. Learn how companies like Liberty Mutual Insurance saw a 9x reduction in month-end reporting time using DC2 nodes. On this page, you can find case studies, videos, and other content that show how our customers are using Amazon Redshift to drive innovation and business results.

In addition, check out these resources to learn about the success our customers are having building out a data warehouse and data lake integration solution with Amazon Redshift:

Partner solutions

You can enhance your Amazon Redshift data warehouse by working with industry-leading experts. Our AWS Partner Network (APN) Partners have certified their solutions to work with Amazon Redshift. They offer software, tools, integration, and consulting services to help you at every step. Visit our Amazon Redshift Partner page and choose an APN Partner. Or, use AWS Marketplace to find and immediately start using third-party software.

To see what our Partners are saying about Amazon Redshift Spectrum and our DC2 nodes mentioned earlier, read these blog posts:

Resources

Blog posts

Visit the AWS Big Data Blog for a list of all Amazon Redshift articles.

YouTube videos

GitHub

Our community of experts contribute on GitHub to provide tips and hints that can help you get the most out of your deployment. Visit GitHub frequently to get the latest technical guidance, code samples, administrative task automation utilities, the analyze & vacuum schema utility, and more.

Customer support

If you are evaluating or considering a proof of concept with Amazon Redshift, or you need assistance migrating your on-premises or other cloud-based data warehouse to Amazon Redshift, our team of product experts and solutions architects can help you with architecting, sizing, and optimizing your data warehouse. Contact us using this support request form, and let us know how we can assist you.

If you are an Amazon Redshift customer, we offer a no-cost health check program. Our team of database engineers and solutions architects give you recommendations for optimizing Amazon Redshift and Amazon Redshift Spectrum for your specific workloads. To learn more, email us at [email protected].

If you have any questions, email us at [email protected].

 


Additional Reading

If you found this post useful, be sure to check out Amazon Redshift Spectrum – Exabyte-Scale In-Place Queries of S3 Data, Using Amazon Redshift for Fast Analytical Reports and How to Migrate Your Oracle Data Warehouse to Amazon Redshift Using AWS SCT and AWS DMS.


About the Author

Larry Heathcote is a Principle Product Marketing Manager at Amazon Web Services for data warehousing and analytics. Larry is passionate about seeing the results of data-driven insights on business outcomes. He enjoys family time, home projects, grilling out and the taste of classic barbeque.

 

 

 

How I built a data warehouse using Amazon Redshift and AWS services in record time

Post Syndicated from Stephen Borg original https://aws.amazon.com/blogs/big-data/how-i-built-a-data-warehouse-using-amazon-redshift-and-aws-services-in-record-time/

This is a customer post by Stephen Borg, the Head of Big Data and BI at Cerberus Technologies.

Cerberus Technologies, in their own words: Cerberus is a company founded in 2017 by a team of visionary iGaming veterans. Our mission is simple – to offer the best tech solutions through a data-driven and a customer-first approach, delivering innovative solutions that go against traditional forms of working and process. This mission is based on the solid foundations of reliability, flexibility and security, and we intend to fundamentally change the way iGaming and other industries interact with technology.

Over the years, I have developed and created a number of data warehouses from scratch. Recently, I built a data warehouse for the iGaming industry single-handedly. To do it, I used the power and flexibility of Amazon Redshift and the wider AWS data management ecosystem. In this post, I explain how I was able to build a robust and scalable data warehouse without the large team of experts typically needed.

In two of my recent projects, I ran into challenges when scaling our data warehouse using on-premises infrastructure. Data was growing at many tens of gigabytes per day, and query performance was suffering. Scaling required major capital investment for hardware and software licenses, and also significant operational costs for maintenance and technical staff to keep it running and performing well. Unfortunately, I couldn’t get the resources needed to scale the infrastructure with data growth, and these projects were abandoned. Thanks to cloud data warehousing, the bottleneck of infrastructure resources, capital expense, and operational costs have been significantly reduced or have totally gone away. There is no more excuse for allowing obstacles of the past to delay delivering timely insights to decision makers, no matter how much data you have.

With Amazon Redshift and AWS, I delivered a cloud data warehouse to the business very quickly, and with a small team: me. I didn’t have to order hardware or software, and I no longer needed to install, configure, tune, or keep up with patches and version updates. Instead, I easily set up a robust data processing pipeline and we were quickly ingesting and analyzing data. Now, my data warehouse team can be extremely lean, and focus more time on bringing in new data and delivering insights. In this post, I show you the AWS services and the architecture that I used.

Handling data feeds

I have several different data sources that provide everything needed to run the business. The data includes activity from our iGaming platform, social media posts, clickstream data, marketing and campaign performance, and customer support engagements.

To handle the diversity of data feeds, I developed abstract integration applications using Docker that run on Amazon EC2 Container Service (Amazon ECS) and feed data to Amazon Kinesis Data Streams. These data streams can be used for real time analytics. In my system, each record in Kinesis is preprocessed by an AWS Lambda function to cleanse and aggregate information. My system then routes it to be stored where I need on Amazon S3 by Amazon Kinesis Data Firehose. Suppose that you used an on-premises architecture to accomplish the same task. A team of data engineers would be required to maintain and monitor a Kafka cluster, develop applications to stream data, and maintain a Hadoop cluster and the infrastructure underneath it for data storage. With my stream processing architecture, there are no servers to manage, no disk drives to replace, and no service monitoring to write.

Setting up a Kinesis stream can be done with a few clicks, and the same for Kinesis Firehose. Firehose can be configured to automatically consume data from a Kinesis Data Stream, and then write compressed data every N minutes to Amazon S3. When I want to process a Kinesis data stream, it’s very easy to set up a Lambda function to be executed on each message received. I can just set a trigger from the AWS Lambda Management Console, as shown following.

I also monitor the duration of function execution using Amazon CloudWatch and AWS X-Ray.

Regardless of the format I receive the data from our partners, I can send it to Kinesis as JSON data using my own formatters. After Firehose writes this to Amazon S3, I have everything in nearly the same structure I received but compressed, encrypted, and optimized for reading.

This data is automatically crawled by AWS Glue and placed into the AWS Glue Data Catalog. This means that I can immediately query the data directly on S3 using Amazon Athena or through Amazon Redshift Spectrum. Previously, I used Amazon EMR and an Amazon RDS–based metastore in Apache Hive for catalog management. Now I can avoid the complexity of maintaining Hive Metastore catalogs. Glue takes care of high availability and the operations side so that I know that end users can always be productive.

Working with Amazon Athena and Amazon Redshift for analysis

I found Amazon Athena extremely useful out of the box for ad hoc analysis. Our engineers (me) use Athena to understand new datasets that we receive and to understand what transformations will be needed for long-term query efficiency.

For our data analysts and data scientists, we’ve selected Amazon Redshift. Amazon Redshift has proven to be the right tool for us over and over again. It easily processes 20+ million transactions per day, regardless of the footprint of the tables and the type of analytics required by the business. Latency is low and query performance expectations have been more than met. We use Redshift Spectrum for long-term data retention, which enables me to extend the analytic power of Amazon Redshift beyond local data to anything stored in S3, and without requiring me to load any data. Redshift Spectrum gives me the freedom to store data where I want, in the format I want, and have it available for processing when I need it.

To load data directly into Amazon Redshift, I use AWS Data Pipeline to orchestrate data workflows. I create Amazon EMR clusters on an intra-day basis, which I can easily adjust to run more or less frequently as needed throughout the day. EMR clusters are used together with Amazon RDS, Apache Spark 2.0, and S3 storage. The data pipeline application loads ETL configurations from Spring RESTful services hosted on AWS Elastic Beanstalk. The application then loads data from S3 into memory, aggregates and cleans the data, and then writes the final version of the data to Amazon Redshift. This data is then ready to use for analysis. Spark on EMR also helps with recommendations and personalization use cases for various business users, and I find this easy to set up and deliver what users want. Finally, business users use Amazon QuickSight for self-service BI to slice, dice, and visualize the data depending on their requirements.

Each AWS service in this architecture plays its part in saving precious time that’s crucial for delivery and getting different departments in the business on board. I found the services easy to set up and use, and all have proven to be highly reliable for our use as our production environments. When the architecture was in place, scaling out was either completely handled by the service, or a matter of a simple API call, and crucially doesn’t require me to change one line of code. Increasing shards for Kinesis can be done in a minute by editing a stream. Increasing capacity for Lambda functions can be accomplished by editing the megabytes allocated for processing, and concurrency is handled automatically. EMR cluster capacity can easily be increased by changing the master and slave node types in Data Pipeline, or by using Auto Scaling. Lastly, RDS and Amazon Redshift can be easily upgraded without any major tasks to be performed by our team (again, me).

In the end, using AWS services including Kinesis, Lambda, Data Pipeline, and Amazon Redshift allows me to keep my team lean and highly productive. I eliminated the cost and delays of capital infrastructure, as well as the late night and weekend calls for support. I can now give maximum value to the business while keeping operational costs down. My team pushed out an agile and highly responsive data warehouse solution in record time and we can handle changing business requirements rapidly, and quickly adapt to new data and new user requests.


Additional Reading

If you found this post useful, be sure to check out Deploy a Data Warehouse Quickly with Amazon Redshift, Amazon RDS for PostgreSQL and Tableau Server and Top 8 Best Practices for High-Performance ETL Processing Using Amazon Redshift.


About the Author

Stephen Borg is the Head of Big Data and BI at Cerberus Technologies. He has a background in platform software engineering, and first became involved in data warehousing using the typical RDBMS, SQL, ETL, and BI tools. He quickly became passionate about providing insight to help others optimize the business and add personalization to products. He is now the Head of Big Data and BI at Cerberus Technologies.

 

 

 

Top 8 Best Practices for High-Performance ETL Processing Using Amazon Redshift

Post Syndicated from Thiyagarajan Arumugam original https://aws.amazon.com/blogs/big-data/top-8-best-practices-for-high-performance-etl-processing-using-amazon-redshift/

An ETL (Extract, Transform, Load) process enables you to load data from source systems into your data warehouse. This is typically executed as a batch or near-real-time ingest process to keep the data warehouse current and provide up-to-date analytical data to end users.

Amazon Redshift is a fast, petabyte-scale data warehouse that enables you easily to make data-driven decisions. With Amazon Redshift, you can get insights into your big data in a cost-effective fashion using standard SQL. You can set up any type of data model, from star and snowflake schemas, to simple de-normalized tables for running any analytical queries.

To operate a robust ETL platform and deliver data to Amazon Redshift in a timely manner, design your ETL processes to take account of Amazon Redshift’s architecture. When migrating from a legacy data warehouse to Amazon Redshift, it is tempting to adopt a lift-and-shift approach, but this can result in performance and scale issues long term. This post guides you through the following best practices for ensuring optimal, consistent runtimes for your ETL processes:

  • COPY data from multiple, evenly sized files.
  • Use workload management to improve ETL runtimes.
  • Perform table maintenance regularly.
  • Perform multiple steps in a single transaction.
  • Loading data in bulk.
  • Use UNLOAD to extract large result sets.
  • Use Amazon Redshift Spectrum for ad hoc ETL processing.
  • Monitor daily ETL health using diagnostic queries.

1. COPY data from multiple, evenly sized files

Amazon Redshift is an MPP (massively parallel processing) database, where all the compute nodes divide and parallelize the work of ingesting data. Each node is further subdivided into slices, with each slice having one or more dedicated cores, equally dividing the processing capacity. The number of slices per node depends on the node type of the cluster. For example, each DS2.XLARGE compute node has two slices, whereas each DS2.8XLARGE compute node has 16 slices.

When you load data into Amazon Redshift, you should aim to have each slice do an equal amount of work. When you load the data from a single large file or from files split into uneven sizes, some slices do more work than others. As a result, the process runs only as fast as the slowest, or most heavily loaded, slice. In the example shown below, a single large file is loaded into a two-node cluster, resulting in only one of the nodes, “Compute-0”, performing all the data ingestion:

When splitting your data files, ensure that they are of approximately equal size – between 1 MB and 1 GB after compression. The number of files should be a multiple of the number of slices in your cluster. Also, I strongly recommend that you individually compress the load files using gzip, lzop, or bzip2 to efficiently load large datasets.

When loading multiple files into a single table, use a single COPY command for the table, rather than multiple COPY commands. Amazon Redshift automatically parallelizes the data ingestion. Using a single COPY command to bulk load data into a table ensures optimal use of cluster resources, and quickest possible throughput.

2. Use workload management to improve ETL runtimes

Use Amazon Redshift’s workload management (WLM) to define multiple queues dedicated to different workloads (for example, ETL versus reporting) and to manage the runtimes of queries. As you migrate more workloads into Amazon Redshift, your ETL runtimes can become inconsistent if WLM is not appropriately set up.

I recommend limiting the overall concurrency of WLM across all queues to around 15 or less. This WLM guide helps you organize and monitor the different queues for your Amazon Redshift cluster.

When managing different workloads on your Amazon Redshift cluster, consider the following for the queue setup:

  • Create a queue dedicated to your ETL processes. Configure this queue with a small number of slots (5 or fewer). Amazon Redshift is designed for analytics queries, rather than transaction processing. The cost of COMMIT is relatively high, and excessive use of COMMIT can result in queries waiting for access to the commit queue. Because ETL is a commit-intensive process, having a separate queue with a small number of slots helps mitigate this issue.
  • Claim extra memory available in a queue. When executing an ETL query, you can take advantage of the wlm_query_slot_count to claim the extra memory available in a particular queue. For example, a typical ETL process might involve COPYing raw data into a staging table so that downstream ETL jobs can run transformations that calculate daily, weekly, and monthly aggregates. To speed up the COPY process (so that the downstream tasks can start in parallel sooner), the wlm_query_slot_count can be increased for this step.
  • Create a separate queue for reporting queries. Configure query monitoring rules on this queue to further manage long-running and expensive queries.
  • Take advantage of the dynamic memory parameters. They swap the memory from your ETL to your reporting queue after the ETL job has completed.

3. Perform table maintenance regularly

Amazon Redshift is a columnar database, which enables fast transformations for aggregating data. Performing regular table maintenance ensures that transformation ETLs are predictable and performant. To get the best performance from your Amazon Redshift database, you must ensure that database tables regularly are VACUUMed and ANALYZEd. The Analyze & Vacuum schema utility helps you automate the table maintenance task and have VACUUM & ANALYZE executed in a regular fashion.

  • Use VACUUM to sort tables and remove deleted blocks

During a typical ETL refresh process, tables receive new incoming records using COPY, and unneeded data (cold data) is removed using DELETE. New rows are added to the unsorted region in a table. Deleted rows are simply marked for deletion.

DELETE does not automatically reclaim the space occupied by the deleted rows. Adding and removing large numbers of rows can therefore cause the unsorted region and the number of deleted blocks to grow. This can degrade the performance of queries executed against these tables.

After an ETL process completes, perform VACUUM to ensure that user queries execute in a consistent manner. The complete list of tables that need VACUUMing can be found using the Amazon Redshift Util’s table_info script.

Use the following approaches to ensure that VACCUM is completed in a timely manner:

  • Use wlm_query_slot_count to claim all the memory allocated in the ETL WLM queue during the VACUUM process.
  • DROP or TRUNCATE intermediate or staging tables, thereby eliminating the need to VACUUM them.
  • If your table has a compound sort key with only one sort column, try to load your data in sort key order. This helps reduce or eliminate the need to VACUUM the table.
  • Consider using time series This helps reduce the amount of data you need to VACUUM.
  • Use ANALYZE to update database statistics

Amazon Redshift uses a cost-based query planner and optimizer using statistics about tables to make good decisions about the query plan for the SQL statements. Regular statistics collection after the ETL completion ensures that user queries run fast, and that daily ETL processes are performant. The Amazon Redshift utility table_info script provides insights into the freshness of the statistics. Keeping the statistics off (pct_stats_off) less than 20% ensures effective query plans for the SQL queries.

4. Perform multiple steps in a single transaction

ETL transformation logic often spans multiple steps. Because commits in Amazon Redshift are expensive, if each ETL step performs a commit, multiple concurrent ETL processes can take a long time to execute.

To minimize the number of commits in a process, the steps in an ETL script should be surrounded by a BEGIN…END statement so that a single commit is performed only after all the transformation logic has been executed. For example, here is an example multi-step ETL script that performs one commit at the end:

Begin
CREATE temporary staging_table;
INSERT INTO staging_table SELECT .. FROM source (transformation logic);
DELETE FROM daily_table WHERE dataset_date =?;
INSERT INTO daily_table SELECT .. FROM staging_table (daily aggregate);
DELETE FROM weekly_table WHERE weekending_date=?;
INSERT INTO weekly_table SELECT .. FROM staging_table(weekly aggregate);
Commit

5. Loading data in bulk

Amazon Redshift is designed to store and query petabyte-scale datasets. Using Amazon S3 you can stage and accumulate data from multiple source systems before executing a bulk COPY operation. The following methods allow efficient and fast transfer of these bulk datasets into Amazon Redshift:

  • Use a manifest file to ingest large datasets that span multiple files. The manifest file is a JSON file that lists all the files to be loaded into Amazon Redshift. Using a manifest file ensures that Amazon Redshift has a consistent view of the data to be loaded from S3, while also ensuring that duplicate files do not result in the same data being loaded more than one time.
  • Use temporary staging tables to hold the data for transformation. These tables are automatically dropped after the ETL session is complete. Temporary tables can be created using the CREATE TEMPORARY TABLE syntax, or by issuing a SELECT … INTO #TEMP_TABLE query. Explicitly specifying the CREATE TEMPORARY TABLE statement allows you to control the DISTRIBUTION KEY, SORT KEY, and compression settings to further improve performance.
  • User ALTER table APPEND to swap data from the staging tables to the target table. Data in the source table is moved to matching columns in the target table. Column order doesn’t matter. After data is successfully appended to the target table, the source table is empty. ALTER TABLE APPEND is much faster than a similar CREATE TABLE AS or INSERT INTO operation because it doesn’t involve copying or moving data.

6. Use UNLOAD to extract large result sets

Fetching a large number of rows using SELECT is expensive and takes a long time. When a large amount of data is fetched from the Amazon Redshift cluster, the leader node has to hold the data temporarily until the fetches are complete. Further, data is streamed out sequentially, which results in longer elapsed time. As a result, the leader node can become hot, which not only affects the SELECT that is being executed, but also throttles resources for creating execution plans and managing the overall cluster resources. Here is an example of a large SELECT statement. Notice that the leader node is doing most of the work to stream out the rows:

Use UNLOAD to extract large results sets directly to S3. After it’s in S3, the data can be shared with multiple downstream systems. By default, UNLOAD writes data in parallel to multiple files according to the number of slices in the cluster. All the compute nodes participate to quickly offload the data into S3.

If you are extracting data for use with Amazon Redshift Spectrum, you should make use of the MAXFILESIZE parameter to and keep files are 150 MB. Similar to item 1 above, having many evenly sized files ensures that Redshift Spectrum can do the maximum amount of work in parallel.

7. Use Redshift Spectrum for ad hoc ETL processing

Events such as data backfill, promotional activity, and special calendar days can trigger additional data volumes that affect the data refresh times in your Amazon Redshift cluster. To help address these spikes in data volumes and throughput, I recommend staging data in S3. After data is organized in S3, Redshift Spectrum enables you to query it directly using standard SQL. In this way, you gain the benefits of additional capacity without having to resize your cluster.

For tips on getting started with and optimizing the use of Redshift Spectrum, see the previous post, 10 Best Practices for Amazon Redshift Spectrum.

8. Monitor daily ETL health using diagnostic queries

Monitoring the health of your ETL processes on a regular basis helps identify the early onset of performance issues before they have a significant impact on your cluster. The following monitoring scripts can be used to provide insights into the health of your ETL processes:

ScriptUse when…Solution
commit_stats.sql – Commit queue statistics from past days, showing largest queue length and queue time firstDML statements such as INSERT/UPDATE/COPY/DELETE operations take several times longer to execute when multiple of these operations are in progressSet up separate WLM queues for the ETL process and limit the concurrency to < 5.
copy_performance.sql –  Copy command statistics for the past days Daily COPY operations take longer to execute• Follow the best practices for the COPY command.
• Analyze data growth with the incoming datasets and consider cluster resize to meet the expected SLA.
table_info.sql – Table skew and unsorted statistics along with storage and key informationTransformation steps take longer to execute• Set up regular VACCUM jobs to address unsorted rows and claim the deleted blocks so that transformation SQL execute optimally.
• Consider a table redesign to avoid data skewness.
v_check_transaction_locks.sql – Monitor transaction locksINSERT/UPDATE/COPY/DELETE operations on particular tables do not respond back in timely manner, compared to when run after the ETLMultiple DML statements are operating on the same target table at the same moment from different transactions. Set up ETL job dependency so that they execute serially for the same target table.
v_get_schema_priv_by_user.sql – Get the schema that the user has access toReporting users can view intermediate tablesSet up separate database groups for reporting and ETL users, and grants access to objects using GRANT.
v_generate_tbl_ddl.sql – Get the table DDLYou need to create an empty table with same structure as target table for data backfillGenerate DDL using this script for data backfill.
v_space_used_per_tbl.sql – monitor space used by individual tablesAmazon Redshift data warehouse space growth is trending upwards more than normal

Analyze the individual tables that are growing at higher rate than normal. Consider data archival using UNLOAD to S3 and Redshift Spectrum for later analysis.

Use unscanned_table_summary.sql to find unused table and archive or drop them.

top_queries.sql – Return the top 50 time consuming statements aggregated by its textETL transformations are taking longer to executeAnalyze the top transformation SQL and use EXPLAIN to find opportunities for tuning the query plan.

There are several other useful scripts available in the amazon-redshift-utils repository. The AWS Lambda Utility Runner runs a subset of these scripts on a scheduled basis, allowing you to automate much of monitoring of your ETL processes.

Example ETL process

The following ETL process reinforces some of the best practices discussed in this post. Consider the following four-step daily ETL workflow where data from an RDBMS source system is staged in S3 and then loaded into Amazon Redshift. Amazon Redshift is used to calculate daily, weekly, and monthly aggregations, which are then unloaded to S3, where they can be further processed and made available for end-user reporting using a number of different tools, including Redshift Spectrum and Amazon Athena.

Step 1:  Extract from the RDBMS source to a S3 bucket

In this ETL process, the data extract job fetches change data every 1 hour and it is staged into multiple hourly files. For example, the staged S3 folder looks like the following:

 [[email protected] ~]$ aws s3 ls s3://<<S3 Bucket>>/batch/2017/07/02/
2017-07-02 01:59:58   81900220 20170702T01.export.gz
2017-07-02 02:59:56   84926844 20170702T02.export.gz
2017-07-02 03:59:54   78990356 20170702T03.export.gz
…
2017-07-02 22:00:03   75966745 20170702T21.export.gz
2017-07-02 23:00:02   89199874 20170702T22.export.gz
2017-07-02 00:59:59   71161715 20170702T23.export.gz

Organizing the data into multiple, evenly sized files enables the COPY command to ingest this data using all available resources in the Amazon Redshift cluster. Further, the files are compressed (gzipped) to further reduce COPY times.

Step 2: Stage data to the Amazon Redshift table for cleansing

Ingesting the data can be accomplished using a JSON-based manifest file. Using the manifest file ensures that S3 eventual consistency issues can be eliminated and also provides an opportunity to dedupe any files if needed. A sample manifest20170702.json file looks like the following:

{
  "entries": [
    {"url":" s3://<<S3 Bucket>>/batch/2017/07/02/20170702T01.export.gz", "mandatory":true},
    {"url":" s3://<<S3 Bucket>>/batch/2017/07/02/20170702T02.export.gz", "mandatory":true},
    …
    {"url":" s3://<<S3 Bucket>>/batch/2017/07/02/20170702T23.export.gz", "mandatory":true}
  ]
}

The data can be ingested using the following command:

SET wlm_query_slot_count TO <<max available concurrency in the ETL queue>>;
COPY stage_tbl FROM 's3:// <<S3 Bucket>>/batch/manifest20170702.json' iam_role 'arn:aws:iam::0123456789012:role/MyRedshiftRole' manifest;

Because the downstream ETL processes depend on this COPY command to complete, the wlm_query_slot_count is used to claim all the memory available to the queue. This helps the COPY command complete as quickly as possible.

Step 3: Transform data to create daily, weekly, and monthly datasets and load into target tables

Data is staged in the “stage_tbl” from where it can be transformed into the daily, weekly, and monthly aggregates and loaded into target tables. The following job illustrates a typical weekly process:

Begin
INSERT into ETL_LOG (..) values (..);
DELETE from weekly_tbl where dataset_week = <<current week>>;
INSERT into weekly_tbl (..)
  SELECT date_trunc('week', dataset_day) AS week_begin_dataset_date, SUM(C1) AS C1, SUM(C2) AS C2
	FROM   stage_tbl
GROUP BY date_trunc('week', dataset_day);
INSERT into AUDIT_LOG values (..);
COMMIT;
End;

As shown above, multiple steps are combined into one transaction to perform a single commit, reducing contention on the commit queue.

Step 4: Unload the daily dataset to populate the S3 data lake bucket

The transformed results are now unloaded into another S3 bucket, where they can be further processed and made available for end-user reporting using a number of different tools, including Redshift Spectrum and Amazon Athena.

unload ('SELECT * FROM weekly_tbl WHERE dataset_week = <<current week>>’) TO 's3:// <<S3 Bucket>>/datalake/weekly/20170526/' iam_role 'arn:aws:iam::0123456789012:role/MyRedshiftRole';

Summary

Amazon Redshift lets you easily operate petabyte-scale data warehouses on the cloud. This post summarized the best practices for operating scalable ETL natively within Amazon Redshift. I demonstrated efficient ways to ingest and transform data, along with close monitoring. I also demonstrated the best practices being used in a typical sample ETL workload to transform the data into Amazon Redshift.

If you have questions or suggestions, please comment below.

 


About the Author

Thiyagarajan Arumugam is a Big Data Solutions Architect at Amazon Web Services and designs customer architectures to process data at scale. Prior to AWS, he built data warehouse solutions at Amazon.com. In his free time, he enjoys all outdoor sports and practices the Indian classical drum mridangam.

 

Combine Transactional and Analytical Data Using Amazon Aurora and Amazon Redshift

Post Syndicated from Re Alvarez-Parmar original https://aws.amazon.com/blogs/big-data/combine-transactional-and-analytical-data-using-amazon-aurora-and-amazon-redshift/

A few months ago, we published a blog post about capturing data changes in an Amazon Aurora database and sending it to Amazon Athena and Amazon QuickSight for fast analysis and visualization. In this post, I want to demonstrate how easy it can be to take the data in Aurora and combine it with data in Amazon Redshift using Amazon Redshift Spectrum.

With Amazon Redshift, you can build petabyte-scale data warehouses that unify data from a variety of internal and external sources. Because Amazon Redshift is optimized for complex queries (often involving multiple joins) across large tables, it can handle large volumes of retail, inventory, and financial data without breaking a sweat.

In this post, we describe how to combine data in Aurora in Amazon Redshift. Here’s an overview of the solution:

  • Use AWS Lambda functions with Amazon Aurora to capture data changes in a table.
  • Save data in an Amazon S3
  • Query data using Amazon Redshift Spectrum.

We use the following services:

Serverless architecture for capturing and analyzing Aurora data changes

Consider a scenario in which an e-commerce web application uses Amazon Aurora for a transactional database layer. The company has a sales table that captures every single sale, along with a few corresponding data items. This information is stored as immutable data in a table. Business users want to monitor the sales data and then analyze and visualize it.

In this example, you take the changes in data in an Aurora database table and save it in Amazon S3. After the data is captured in Amazon S3, you combine it with data in your existing Amazon Redshift cluster for analysis.

By the end of this post, you will understand how to capture data events in an Aurora table and push them out to other AWS services using AWS Lambda.

The following diagram shows the flow of data as it occurs in this tutorial:

The starting point in this architecture is a database insert operation in Amazon Aurora. When the insert statement is executed, a custom trigger calls a Lambda function and forwards the inserted data. Lambda writes the data that it received from Amazon Aurora to a Kinesis data delivery stream. Kinesis Data Firehose writes the data to an Amazon S3 bucket. Once the data is in an Amazon S3 bucket, it is queried in place using Amazon Redshift Spectrum.

Creating an Aurora database

First, create a database by following these steps in the Amazon RDS console:

  1. Sign in to the AWS Management Console, and open the Amazon RDS console.
  2. Choose Launch a DB instance, and choose Next.
  3. For Engine, choose Amazon Aurora.
  4. Choose a DB instance class. This example uses a small, since this is not a production database.
  5. In Multi-AZ deployment, choose No.
  6. Configure DB instance identifier, Master username, and Master password.
  7. Launch the DB instance.

After you create the database, use MySQL Workbench to connect to the database using the CNAME from the console. For information about connecting to an Aurora database, see Connecting to an Amazon Aurora DB Cluster.

The following screenshot shows the MySQL Workbench configuration:

Next, create a table in the database by running the following SQL statement:

Create Table
CREATE TABLE Sales (
InvoiceID int NOT NULL AUTO_INCREMENT,
ItemID int NOT NULL,
Category varchar(255),
Price double(10,2), 
Quantity int not NULL,
OrderDate timestamp,
DestinationState varchar(2),
ShippingType varchar(255),
Referral varchar(255),
PRIMARY KEY (InvoiceID)
)

You can now populate the table with some sample data. To generate sample data in your table, copy and run the following script. Ensure that the highlighted (bold) variables are replaced with appropriate values.

#!/usr/bin/python
import MySQLdb
import random
import datetime

db = MySQLdb.connect(host="AURORA_CNAME",
                     user="DBUSER",
                     passwd="DBPASSWORD",
                     db="DB")

states = ("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI","ID","IL","IN",
"IA","KS","KY","LA","ME","MD","MA","MI","MN","MS","MO","MT","NE","NV","NH","NJ",
"NM","NY","NC","ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT","VT","VA",
"WA","WV","WI","WY")

shipping_types = ("Free", "3-Day", "2-Day")

product_categories = ("Garden", "Kitchen", "Office", "Household")
referrals = ("Other", "Friend/Colleague", "Repeat Customer", "Online Ad")

for i in range(0,10):
    item_id = random.randint(1,100)
    state = states[random.randint(0,len(states)-1)]
    shipping_type = shipping_types[random.randint(0,len(shipping_types)-1)]
    product_category = product_categories[random.randint(0,len(product_categories)-1)]
    quantity = random.randint(1,4)
    referral = referrals[random.randint(0,len(referrals)-1)]
    price = random.randint(1,100)
    order_date = datetime.date(2016,random.randint(1,12),random.randint(1,30)).isoformat()

    data_order = (item_id, product_category, price, quantity, order_date, state,
    shipping_type, referral)

    add_order = ("INSERT INTO Sales "
                   "(ItemID, Category, Price, Quantity, OrderDate, DestinationState, \
                   ShippingType, Referral) "
                   "VALUES (%s, %s, %s, %s, %s, %s, %s, %s)")

    cursor = db.cursor()
    cursor.execute(add_order, data_order)

    db.commit()

cursor.close()
db.close() 

The following screenshot shows how the table appears with the sample data:

Sending data from Amazon Aurora to Amazon S3

There are two methods available to send data from Amazon Aurora to Amazon S3:

  • Using a Lambda function
  • Using SELECT INTO OUTFILE S3

To demonstrate the ease of setting up integration between multiple AWS services, we use a Lambda function to send data to Amazon S3 using Amazon Kinesis Data Firehose.

Alternatively, you can use a SELECT INTO OUTFILE S3 statement to query data from an Amazon Aurora DB cluster and save it directly in text files that are stored in an Amazon S3 bucket. However, with this method, there is a delay between the time that the database transaction occurs and the time that the data is exported to Amazon S3 because the default file size threshold is 6 GB.

Creating a Kinesis data delivery stream

The next step is to create a Kinesis data delivery stream, since it’s a dependency of the Lambda function.

To create a delivery stream:

  1. Open the Kinesis Data Firehose console
  2. Choose Create delivery stream.
  3. For Delivery stream name, type AuroraChangesToS3.
  4. For Source, choose Direct PUT.
  5. For Record transformation, choose Disabled.
  6. For Destination, choose Amazon S3.
  7. In the S3 bucket drop-down list, choose an existing bucket, or create a new one.
  8. Enter a prefix if needed, and choose Next.
  9. For Data compression, choose GZIP.
  10. In IAM role, choose either an existing role that has access to write to Amazon S3, or choose to generate one automatically. Choose Next.
  11. Review all the details on the screen, and choose Create delivery stream when you’re finished.

 

Creating a Lambda function

Now you can create a Lambda function that is called every time there is a change that needs to be tracked in the database table. This Lambda function passes the data to the Kinesis data delivery stream that you created earlier.

To create the Lambda function:

  1. Open the AWS Lambda console.
  2. Ensure that you are in the AWS Region where your Amazon Aurora database is located.
  3. If you have no Lambda functions yet, choose Get started now. Otherwise, choose Create function.
  4. Choose Author from scratch.
  5. Give your function a name and select Python 3.6 for Runtime
  6. Choose and existing or create a new Role, the role would need to have access to call firehose:PutRecord
  7. Choose Next on the trigger selection screen.
  8. Paste the following code in the code window. Change the stream_name variable to the Kinesis data delivery stream that you created in the previous step.
  9. Choose File -> Save in the code editor and then choose Save.
import boto3
import json

firehose = boto3.client('firehose')
stream_name = ‘AuroraChangesToS3’


def Kinesis_publish_message(event, context):
    
    firehose_data = (("%s,%s,%s,%s,%s,%s,%s,%s\n") %(event['ItemID'], 
    event['Category'], event['Price'], event['Quantity'],
    event['OrderDate'], event['DestinationState'], event['ShippingType'], 
    event['Referral']))
    
    firehose_data = {'Data': str(firehose_data)}
    print(firehose_data)
    
    firehose.put_record(DeliveryStreamName=stream_name,
    Record=firehose_data)

Note the Amazon Resource Name (ARN) of this Lambda function.

Giving Aurora permissions to invoke a Lambda function

To give Amazon Aurora permissions to invoke a Lambda function, you must attach an IAM role with appropriate permissions to the cluster. For more information, see Invoking a Lambda Function from an Amazon Aurora DB Cluster.

Once you are finished, the Amazon Aurora database has access to invoke a Lambda function.

Creating a stored procedure and a trigger in Amazon Aurora

Now, go back to MySQL Workbench, and run the following command to create a new stored procedure. When this stored procedure is called, it invokes the Lambda function you created. Change the ARN in the following code to your Lambda function’s ARN.

DROP PROCEDURE IF EXISTS CDC_TO_FIREHOSE;
DELIMITER ;;
CREATE PROCEDURE CDC_TO_FIREHOSE (IN ItemID VARCHAR(255), 
									IN Category varchar(255), 
									IN Price double(10,2),
                                    IN Quantity int(11),
                                    IN OrderDate timestamp,
                                    IN DestinationState varchar(2),
                                    IN ShippingType varchar(255),
                                    IN Referral  varchar(255)) LANGUAGE SQL 
BEGIN
  CALL mysql.lambda_async('arn:aws:lambda:us-east-1:XXXXXXXXXXXXX:function:CDCFromAuroraToKinesis', 
     CONCAT('{ "ItemID" : "', ItemID, 
            '", "Category" : "', Category,
            '", "Price" : "', Price,
            '", "Quantity" : "', Quantity, 
            '", "OrderDate" : "', OrderDate, 
            '", "DestinationState" : "', DestinationState, 
            '", "ShippingType" : "', ShippingType, 
            '", "Referral" : "', Referral, '"}')
     );
END
;;
DELIMITER ;

Create a trigger TR_Sales_CDC on the Sales table. When a new record is inserted, this trigger calls the CDC_TO_FIREHOSE stored procedure.

DROP TRIGGER IF EXISTS TR_Sales_CDC;
 
DELIMITER ;;
CREATE TRIGGER TR_Sales_CDC
  AFTER INSERT ON Sales
  FOR EACH ROW
BEGIN
  SELECT  NEW.ItemID , NEW.Category, New.Price, New.Quantity, New.OrderDate
  , New.DestinationState, New.ShippingType, New.Referral
  INTO @ItemID , @Category, @Price, @Quantity, @OrderDate
  , @DestinationState, @ShippingType, @Referral;
  CALL  CDC_TO_FIREHOSE(@ItemID , @Category, @Price, @Quantity, @OrderDate
  , @DestinationState, @ShippingType, @Referral);
END
;;
DELIMITER ;

If a new row is inserted in the Sales table, the Lambda function that is mentioned in the stored procedure is invoked.

Verify that data is being sent from the Lambda function to Kinesis Data Firehose to Amazon S3 successfully. You might have to insert a few records, depending on the size of your data, before new records appear in Amazon S3. This is due to Kinesis Data Firehose buffering. To learn more about Kinesis Data Firehose buffering, see the “Amazon S3” section in Amazon Kinesis Data Firehose Data Delivery.

Every time a new record is inserted in the sales table, a stored procedure is called, and it updates data in Amazon S3.

Querying data in Amazon Redshift

In this section, you use the data you produced from Amazon Aurora and consume it as-is in Amazon Redshift. In order to allow you to process your data as-is, where it is, while taking advantage of the power and flexibility of Amazon Redshift, you use Amazon Redshift Spectrum. You can use Redshift Spectrum to run complex queries on data stored in Amazon S3, with no need for loading or other data prep.

Just create a data source and issue your queries to your Amazon Redshift cluster as usual. Behind the scenes, Redshift Spectrum scales to thousands of instances on a per-query basis, ensuring that you get fast, consistent performance even as your dataset grows to beyond an exabyte! Being able to query data that is stored in Amazon S3 means that you can scale your compute and your storage independently. You have the full power of the Amazon Redshift query model and all the reporting and business intelligence tools at your disposal. Your queries can reference any combination of data stored in Amazon Redshift tables and in Amazon S3.

Redshift Spectrum supports open, common data types, including CSV/TSV, Apache Parquet, SequenceFile, and RCFile. Files can be compressed using gzip or Snappy, with other data types and compression methods in the works.

First, create an Amazon Redshift cluster. Follow the steps in Launch a Sample Amazon Redshift Cluster.

Next, create an IAM role that has access to Amazon S3 and Athena. By default, Amazon Redshift Spectrum uses the Amazon Athena data catalog. Your cluster needs authorization to access your external data catalog in AWS Glue or Athena and your data files in Amazon S3.

In the demo setup, I attached AmazonS3FullAccess and AmazonAthenaFullAccess. In a production environment, the IAM roles should follow the standard security of granting least privilege. For more information, see IAM Policies for Amazon Redshift Spectrum.

Attach the newly created role to the Amazon Redshift cluster. For more information, see Associate the IAM Role with Your Cluster.

Next, connect to the Amazon Redshift cluster, and create an external schema and database:

create external schema if not exists spectrum_schema
from data catalog 
database 'spectrum_db' 
region 'us-east-1'
IAM_ROLE 'arn:aws:iam::XXXXXXXXXXXX:role/RedshiftSpectrumRole'
create external database if not exists;

Don’t forget to replace the IAM role in the statement.

Then create an external table within the database:

 CREATE EXTERNAL TABLE IF NOT EXISTS spectrum_schema.ecommerce_sales(
  ItemID int,
  Category varchar,
  Price DOUBLE PRECISION,
  Quantity int,
  OrderDate TIMESTAMP,
  DestinationState varchar,
  ShippingType varchar,
  Referral varchar)
ROW FORMAT DELIMITED
      FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
LOCATION 's3://{BUCKET_NAME}/CDC/'

Query the table, and it should contain data. This is a fact table.

select top 10 * from spectrum_schema.ecommerce_sales

 

Next, create a dimension table. For this example, we create a date/time dimension table. Create the table:

CREATE TABLE date_dimension (
  d_datekey           integer       not null sortkey,
  d_dayofmonth        integer       not null,
  d_monthnum          integer       not null,
  d_dayofweek                varchar(10)   not null,
  d_prettydate        date       not null,
  d_quarter           integer       not null,
  d_half              integer       not null,
  d_year              integer       not null,
  d_season            varchar(10)   not null,
  d_fiscalyear        integer       not null)
diststyle all;

Populate the table with data:

copy date_dimension from 's3://reparmar-lab/2016dates' 
iam_role 'arn:aws:iam::XXXXXXXXXXXX:role/redshiftspectrum'
DELIMITER ','
dateformat 'auto';

The date dimension table should look like the following:

Querying data in local and external tables using Amazon Redshift

Now that you have the fact and dimension table populated with data, you can combine the two and run analysis. For example, if you want to query the total sales amount by weekday, you can run the following:

select sum(quantity*price) as total_sales, date_dimension.d_season
from spectrum_schema.ecommerce_sales 
join date_dimension on spectrum_schema.ecommerce_sales.orderdate = date_dimension.d_prettydate 
group by date_dimension.d_season

You get the following results:

Similarly, you can replace d_season with d_dayofweek to get sales figures by weekday:

With Amazon Redshift Spectrum, you pay only for the queries you run against the data that you actually scan. We encourage you to use file partitioning, columnar data formats, and data compression to significantly minimize the amount of data scanned in Amazon S3. This is important for data warehousing because it dramatically improves query performance and reduces cost.

Partitioning your data in Amazon S3 by date, time, or any other custom keys enables Amazon Redshift Spectrum to dynamically prune nonrelevant partitions to minimize the amount of data processed. If you store data in a columnar format, such as Parquet, Amazon Redshift Spectrum scans only the columns needed by your query, rather than processing entire rows. Similarly, if you compress your data using one of the supported compression algorithms in Amazon Redshift Spectrum, less data is scanned.

Analyzing and visualizing Amazon Redshift data in Amazon QuickSight

Modify the Amazon Redshift security group to allow an Amazon QuickSight connection. For more information, see Authorizing Connections from Amazon QuickSight to Amazon Redshift Clusters.

After modifying the Amazon Redshift security group, go to Amazon QuickSight. Create a new analysis, and choose Amazon Redshift as the data source.

Enter the database connection details, validate the connection, and create the data source.

Choose the schema to be analyzed. In this case, choose spectrum_schema, and then choose the ecommerce_sales table.

Next, we add a custom field for Total Sales = Price*Quantity. In the drop-down list for the ecommerce_sales table, choose Edit analysis data sets.

On the next screen, choose Edit.

In the data prep screen, choose New Field. Add a new calculated field Total Sales $, which is the product of the Price*Quantity fields. Then choose Create. Save and visualize it.

Next, to visualize total sales figures by month, create a graph with Total Sales on the x-axis and Order Data formatted as month on the y-axis.

After you’ve finished, you can use Amazon QuickSight to add different columns from your Amazon Redshift tables and perform different types of visualizations. You can build operational dashboards that continuously monitor your transactional and analytical data. You can publish these dashboards and share them with others.

Final notes

Amazon QuickSight can also read data in Amazon S3 directly. However, with the method demonstrated in this post, you have the option to manipulate, filter, and combine data from multiple sources or Amazon Redshift tables before visualizing it in Amazon QuickSight.

In this example, we dealt with data being inserted, but triggers can be activated in response to an INSERT, UPDATE, or DELETE trigger.

Keep the following in mind:

  • Be careful when invoking a Lambda function from triggers on tables that experience high write traffic. This would result in a large number of calls to your Lambda function. Although calls to the lambda_async procedure are asynchronous, triggers are synchronous.
  • A statement that results in a large number of trigger activations does not wait for the call to the AWS Lambda function to complete. But it does wait for the triggers to complete before returning control to the client.
  • Similarly, you must account for Amazon Kinesis Data Firehose limits. By default, Kinesis Data Firehose is limited to a maximum of 5,000 records/second. For more information, see Monitoring Amazon Kinesis Data Firehose.

In certain cases, it may be optimal to use AWS Database Migration Service (AWS DMS) to capture data changes in Aurora and use Amazon S3 as a target. For example, AWS DMS might be a good option if you don’t need to transform data from Amazon Aurora. The method used in this post gives you the flexibility to transform data from Aurora using Lambda before sending it to Amazon S3. Additionally, the architecture has the benefits of being serverless, whereas AWS DMS requires an Amazon EC2 instance for replication.

For design considerations while using Redshift Spectrum, see Using Amazon Redshift Spectrum to Query External Data.

If you have questions or suggestions, please comment below.


Additional Reading

If you found this post useful, be sure to check out Capturing Data Changes in Amazon Aurora Using AWS Lambda and 10 Best Practices for Amazon Redshift Spectrum


About the Authors

Re Alvarez-Parmar is a solutions architect for Amazon Web Services. He helps enterprises achieve success through technical guidance and thought leadership. In his spare time, he enjoys spending time with his two kids and exploring outdoors.

 

 

 

Simplify Querying Nested JSON with the AWS Glue Relationalize Transform

Post Syndicated from Trevor Roberts original https://aws.amazon.com/blogs/big-data/simplify-querying-nested-json-with-the-aws-glue-relationalize-transform/

AWS Glue has a transform called Relationalize that simplifies the extract, transform, load (ETL) process by converting nested JSON into columns that you can easily import into relational databases. Relationalize transforms the nested JSON into key-value pairs at the outermost level of the JSON document. The transformed data maintains a list of the original keys from the nested JSON separated by periods.

Let’s look at how Relationalize can help you with a sample use case.

An example of Relationalize in action

Suppose that the developers of a video game want to use a data warehouse like Amazon Redshift to run reports on player behavior based on data that is stored in JSON. Sample 1 shows example user data from the game. The player named “user1” has characteristics such as race, class, and location in nested JSON data. Further down, the player’s arsenal information includes additional nested JSON data. If the developers want to ETL this data into their data warehouse, they might have to resort to nested loops or recursive functions in their code.

Sample 1: Nested JSON

{
	"player": {
		"username": "user1",
		"characteristics": {
			"race": "Human",
			"class": "Warlock",
			"subclass": "Dawnblade",
			"power": 300,
			"playercountry": "USA"
		},
		"arsenal": {
			"kinetic": {
				"name": "Sweet Business",
				"type": "Auto Rifle",
				"power": 300,
				"element": "Kinetic"
			},
			"energy": {
				"name": "MIDA Mini-Tool",
				"type": "Submachine Gun",
				"power": 300,
				"element": "Solar"
			},
			"power": {
				"name": "Play of the Game",
				"type": "Grenade Launcher",
				"power": 300,
				"element": "Arc"
			}
		},
		"armor": {
			"head": "Eye of Another World",
			"arms": "Philomath Gloves",
			"chest": "Philomath Robes",
			"leg": "Philomath Boots",
			"classitem": "Philomath Bond"
		},
		"location": {
			"map": "Titan",
			"waypoint": "The Rig"
		}
	}
}

Instead, the developers can use the Relationalize transform. Sample 2 shows what the transformed data looks like.

Sample 2: Flattened JSON

{
    "player.username": "user1",
    "player.characteristics.race": "Human",
    "player.characteristics.class": "Warlock",
    "player.characteristics.subclass": "Dawnblade",
    "player.characteristics.power": 300,
    "player.characteristics.playercountry": "USA",
    "player.arsenal.kinetic.name": "Sweet Business",
    "player.arsenal.kinetic.type": "Auto Rifle",
    "player.arsenal.kinetic.power": 300,
    "player.arsenal.kinetic.element": "Kinetic",
    "player.arsenal.energy.name": "MIDA Mini-Tool",
    "player.arsenal.energy.type": "Submachine Gun",
    "player.arsenal.energy.power": 300,
    "player.arsenal.energy.element": "Solar",
    "player.arsenal.power.name": "Play of the Game",
    "player.arsenal.power.type": "Grenade Launcher",
    "player.arsenal.power.power": 300,
    "player.arsenal.power.element": "Arc",
    "player.armor.head": "Eye of Another World",
    "player.armor.arms": "Philomath Gloves",
    "player.armor.chest": "Philomath Robes",
    "player.armor.leg": "Philomath Boots",
    "player.armor.classitem": "Philomath Bond",
    "player.location.map": "Titan",
    "player.location.waypoint": "The Rig"
}

You can then write the data to a database or to a data warehouse. You can also write it to delimited text files, such as in comma-separated value (CSV) format, or columnar file formats such as Optimized Row Columnar (ORC) format. You can use either of these format types for long-term storage in Amazon S3. Storing the transformed files in S3 provides the additional benefit of being able to query this data using Amazon Athena or Amazon Redshift Spectrum. You can further extend the usefulness of the data by performing joins between data stored in S3 and the data stored in an Amazon Redshift data warehouse.

Before we get started…

In my example, I took two preparatory steps that save some time in your ETL code development:

  1. I stored my data in an Amazon S3 bucket and used an AWS Glue crawler to make my data available in the AWS Glue data catalog. You can find instructions on how to do that in Cataloging Tables with a Crawler in the AWS Glue documentation. The AWS Glue database name I used was “blog,” and the table name was “players.” You can see these values in use in the sample code that follows.
  2. I deployed a Zeppelin notebook using the automated deployment available within AWS Glue. If you already used an AWS Glue development endpoint to deploy a Zeppelin notebook, you can skip the deployment instructions. Otherwise, let’s quickly review how to deploy Zeppelin.

Deploying a Zeppelin notebook with AWS Glue

The following steps are outlined in the AWS Glue documentation, and I include a few screenshots here for clarity.

First, create two IAM roles:

Next, in the AWS Glue Management Console, choose Dev endpoints, and then choose Add endpoint.

Specify a name for the endpoint and the AWS Glue IAM role that you created.

On the networking screen, choose Skip Networking because our code only communicates with S3.

Complete the development endpoint process by providing a Secure Shell (SSH) public key and confirming your settings.

When your new development endpoint’s Provisioning status changes from PROVISIONING to READY, choose your endpoint, and then for Actions choose Create notebook server.

Enter the notebook server details, including the role you previously created and a security group with inbound access allowed on TCP port 443.

Doing this automatically launches an AWS CloudFormation template. The output specifies the URL that you can use to access your Zeppelin notebook with the username and password you specified in the wizard.

How do we flatten nested JSON?

With my data loaded and my notebook server ready, I accessed Zeppelin, created a new note, and set my interpreter to spark. I used some Python code that AWS Glue previously generated for another job that outputs to ORC. Then I added the Relationalize transform. You can see the resulting Python code in Sample 3.­

Sample 3: Python code to transform the nested JSON and output it to ORC

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
#from awsglue.transforms import Relationalize

# Begin variables to customize with your information
glue_source_database = "blog"
glue_source_table = "players"
glue_temp_storage = "s3://blog-example-edz/temp"
glue_relationalize_output_s3_path = "s3://blog-example-edz/output-flat"
dfc_root_table_name = "root" #default value is "roottable"
# End variables to customize with your information

glueContext = GlueContext(spark.sparkContext)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = glue_source_database, table_name = glue_source_table, transformation_ctx = "datasource0")
dfc = Relationalize.apply(frame = datasource0, staging_path = glue_temp_storage, name = dfc_root_table_name, transformation_ctx = "dfc")
blogdata = dfc.select(dfc_root_table_name)
blogdataoutput = glueContext.write_dynamic_frame.from_options(frame = blogdata, connection_type = "s3", connection_options = {"path": glue_relationalize_output_s3_path}, format = "orc", transformation_ctx = "blogdataoutput")

What exactly is going on in this script?

After the import statements, we instantiate a GlueContext object, which allows us to work with the data in AWS Glue. Next, we create a DynamicFrame (datasource0) from the “players” table in the AWS Glue “blog” database. We use this DynamicFrame to perform any necessary operations on the data structure before it’s written to our desired output format. The source files remain unchanged.

We then run the Relationalize transform (Relationalize.apply()) with our datasource0 as one of the parameters. Another important parameter is the name parameter, which is a key that identifies our data after the transformation completes.

The Relationalize.apply() method returns a DynamicFrameCollection, and this is stored in the dfc variable. Before we can write our data to S3, we need to select the DynamicFrame from the DynamicFrameCollection object. We do this with the dfc.select() method. The correct DynamicFrame is stored in the blogdata variable.

You might be curious why a DynamicFrameCollection was returned when we started with a single DynamicFrame. This return value comes from the way Relationalize treats arrays in the JSON document: A DynamicFrame is created for each array. Together with the root data structure, each generated DynamicFrame is added to a DynamicFrameCollection when Relationalize completes its work. Although we didn’t have any arrays in our data, it’s good to keep this in mind. Finally, we output (blogdataoutput) the root DynamicFrame to ORC files in S3.

Using the transformed data

One of the use cases we discussed earlier was using Amazon Athena or Amazon Redshift Spectrum to query the ORC files.

I used the following SQL DDL statements to create external tables in both services to enable queries of my data stored in Amazon S3.

Sample 4: Amazon Athena DDL

CREATE EXTERNAL TABLE IF NOT EXISTS blog.blog_data_athena_test (
  `characteristics_race` string,
  `characteristics_class` string,
  `characteristics_subclass` string,
  `characteristics_power` int,
  `characteristics_playercountry` string,
  `kinetic_name` string,
  `kinetic_type` string,
  `kinetic_power` int,
  `kinetic_element` string,
  `energy_name` string,
  `energy_type` string,
  `energy_power` int,
  `energy_element` string,
  `power_name` string,
  `power_type` string,
  `power_power` int,
  `power_element` string,
  `armor_head` string,
  `armor_arms` string,
  `armor_chest` string,
  `armor_leg` string,
  `armor_classitem` string,
  `map` string,
  `waypoint` string 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 's3://blog-example-edz/output-flat/'
TBLPROPERTIES ('has_encrypted_data'='false');

 

Sample 5: Amazon Redshift Spectrum DDL

-- Create a Schema
-- A single schema can be used with multiple external tables.
-- This step is only required once for the external tables you create.
create external schema spectrum 
from data catalog 
database 'blog' 
iam_role 'arn:aws:iam::0123456789:role/redshift-role'
create external database if not exists;

-- Create an external table in the schema
create external table spectrum.blog(
  username VARCHAR,
  characteristics_race VARCHAR,
  characteristics_class VARCHAR,
  characteristics_subclass VARCHAR,
  characteristics_power INTEGER,
  characteristics_playercountry VARCHAR,
  kinetic_name VARCHAR,
  kinetic_type VARCHAR,
  kinetic_power INTEGER,
  kinetic_element VARCHAR,
  energy_name VARCHAR,
  energy_type VARCHAR,
  energy_power INTEGER,
  energy_element VARCHAR,
  power_name VARCHAR,
  power_type VARCHAR,
  power_power INTEGER,
  power_element VARCHAR,
  armor_head VARCHAR,
  armor_arms VARCHAR,
  armor_chest VARCHAR,
  armor_leg VARCHAR,
  armor_classItem VARCHAR,
  map VARCHAR,
  waypoint VARCHAR)
stored as orc
location 's3://blog-example-edz/output-flat';

I even ran a query, shown in Sample 6, that joined my Redshift Spectrum table (spectrum.playerdata) with data in an Amazon Redshift table (public.raids) to generate advanced reports. In the where clause, I join the two tables based on the username values that are common to both data sources.

Sample 6: Select statement with a join of Redshift Spectrum data with Amazon Redshift data

-- Get Total Raid Completions for the Hunter Class.
select spectrum.playerdata.characteristics_class as class, sum(public.raids."completions.val.raids.leviathan") as "Total Hunter Leviathan Raid Completions" from spectrum.playerdata, public.raids
where spectrum.playerdata.username = public.raids."completions.val.username"
and spectrum.playerdata.characteristics_class = 'Hunter'
group by spectrum.playerdata.characteristics_class;

Summary

This post demonstrated how simple it can be to flatten nested JSON data with AWS Glue, using the Relationalize transform to automate the conversion of nested JSON. AWS Glue also automates the deployment of Zeppelin notebooks that you can use to develop your Python automation script. Finally, AWS Glue can output the transformed data directly to a relational database, or to files in Amazon S3 for further analysis with tools such as Amazon Athena and Amazon Redshift Spectrum.

As great as Relationalize is, it’s not the only transform available with AWS Glue. You can see a complete list of available transforms in Built-In Transforms in the AWS Glue documentation. Try them out today!


Additional Reading

If you found this post useful, be sure to check out Using Amazon Redshift Spectrum, Amazon Athena and AWS Glue with Node.js in Production and Build a Data Lake Foundation with AWS Glue and Amazon S3.


About the Author

Trevor Roberts Jr is a Solutions Architect with AWS. He provides architectural guidance to help customers achieve success in the cloud. In his spare time, Trevor enjoys traveling to new places and spending time with family.

Using Amazon Redshift Spectrum, Amazon Athena, and AWS Glue with Node.js in Production

Post Syndicated from Rafi Ton original https://aws.amazon.com/blogs/big-data/using-amazon-redshift-spectrum-amazon-athena-and-aws-glue-with-node-js-in-production/

This is a guest post by Rafi Ton, founder and CEO of NUVIAD. NUVIAD is, in their own words, “a mobile marketing platform providing professional marketers, agencies and local businesses state of the art tools to promote their products and services through hyper targeting, big data analytics and advanced machine learning tools.”

At NUVIAD, we’ve been using Amazon Redshift as our main data warehouse solution for more than 3 years.

We store massive amounts of ad transaction data that our users and partners analyze to determine ad campaign strategies. When running real-time bidding (RTB) campaigns in large scale, data freshness is critical so that our users can respond rapidly to changes in campaign performance. We chose Amazon Redshift because of its simplicity, scalability, performance, and ability to load new data in near real time.

Over the past three years, our customer base grew significantly and so did our data. We saw our Amazon Redshift cluster grow from three nodes to 65 nodes. To balance cost and analytics performance, we looked for a way to store large amounts of less-frequently analyzed data at a lower cost. Yet, we still wanted to have the data immediately available for user queries and to meet their expectations for fast performance. We turned to Amazon Redshift Spectrum.

In this post, I explain the reasons why we extended Amazon Redshift with Redshift Spectrum as our modern data warehouse. I cover how our data growth and the need to balance cost and performance led us to adopt Redshift Spectrum. I also share key performance metrics in our environment, and discuss the additional AWS services that provide a scalable and fast environment, with data available for immediate querying by our growing user base.

Amazon Redshift as our foundation

The ability to provide fresh, up-to-the-minute data to our customers and partners was always a main goal with our platform. We saw other solutions provide data that was a few hours old, but this was not good enough for us. We insisted on providing the freshest data possible. For us, that meant loading Amazon Redshift in frequent micro batches and allowing our customers to query Amazon Redshift directly to get results in near real time.

The benefits were immediately evident. Our customers could see how their campaigns performed faster than with other solutions, and react sooner to the ever-changing media supply pricing and availability. They were very happy.

However, this approach required Amazon Redshift to store a lot of data for long periods, and our data grew substantially. In our peak, we maintained a cluster running 65 DC1.large nodes. The impact on our Amazon Redshift cluster was evident, and we saw our CPU utilization grow to 90%.

Why we extended Amazon Redshift to Redshift Spectrum

Redshift Spectrum gives us the ability to run SQL queries using the powerful Amazon Redshift query engine against data stored in Amazon S3, without needing to load the data. With Redshift Spectrum, we store data where we want, at the cost that we want. We have the data available for analytics when our users need it with the performance they expect.

Seamless scalability, high performance, and unlimited concurrency

Scaling Redshift Spectrum is a simple process. First, it allows us to leverage Amazon S3 as the storage engine and get practically unlimited data capacity.

Second, if we need more compute power, we can leverage Redshift Spectrum’s distributed compute engine over thousands of nodes to provide superior performance – perfect for complex queries running against massive amounts of data.

Third, all Redshift Spectrum clusters access the same data catalog so that we don’t have to worry about data migration at all, making scaling effortless and seamless.

Lastly, since Redshift Spectrum distributes queries across potentially thousands of nodes, they are not affected by other queries, providing much more stable performance and unlimited concurrency.

Keeping it SQL

Redshift Spectrum uses the same query engine as Amazon Redshift. This means that we did not need to change our BI tools or query syntax, whether we used complex queries across a single table or joins across multiple tables.

An interesting capability introduced recently is the ability to create a view that spans both Amazon Redshift and Redshift Spectrum external tables. With this feature, you can query frequently accessed data in your Amazon Redshift cluster and less-frequently accessed data in Amazon S3, using a single view.

Leveraging Parquet for higher performance

Parquet is a columnar data format that provides superior performance and allows Redshift Spectrum (or Amazon Athena) to scan significantly less data. With less I/O, queries run faster and we pay less per query. You can read all about Parquet at https://parquet.apache.org/ or https://en.wikipedia.org/wiki/Apache_Parquet.

Lower cost

From a cost perspective, we pay standard rates for our data in Amazon S3, and only small amounts per query to analyze data with Redshift Spectrum. Using the Parquet format, we can significantly reduce the amount of data scanned. Our costs are now lower, and our users get fast results even for large complex queries.

What we learned about Amazon Redshift vs. Redshift Spectrum performance

When we first started looking at Redshift Spectrum, we wanted to put it to the test. We wanted to know how it would compare to Amazon Redshift, so we looked at two key questions:

  1. What is the performance difference between Amazon Redshift and Redshift Spectrum on simple and complex queries?
  2. Does the data format impact performance?

During the migration phase, we had our dataset stored in Amazon Redshift and S3 as CSV/GZIP and as Parquet file formats. We tested three configurations:

  • Amazon Redshift cluster with 28 DC1.large nodes
  • Redshift Spectrum using CSV/GZIP
  • Redshift Spectrum using Parquet

We performed benchmarks for simple and complex queries on one month’s worth of data. We tested how much time it took to perform the query, and how consistent the results were when running the same query multiple times. The data we used for the tests was already partitioned by date and hour. Properly partitioning the data improves performance significantly and reduces query times.

Simple query

First, we tested a simple query aggregating billing data across a month:

SELECT 
  user_id, 
  count(*) AS impressions, 
  SUM(billing)::decimal /1000000 AS billing 
FROM <table_name> 
WHERE 
  date >= '2017-08-01' AND 
  date <= '2017-08-31'  
GROUP BY 
  user_id;

We ran the same query seven times and measured the response times (red marking the longest time and green the shortest time):

Execution Time (seconds)
 Amazon RedshiftRedshift Spectrum
CSV
Redshift Spectrum Parquet
Run #139.6545.1111.92
Run #215.2643.1312.05
Run #315.2746.4713.38
Run #421.2251.0212.74
Run #517.2743.3511.76
Run #616.6744.2313.67
Run #725.3740.3912.75
Average21.53 44.82 12.61

For simple queries, Amazon Redshift performed better than Redshift Spectrum, as we thought, because the data is local to Amazon Redshift.

What was surprising was that using Parquet data format in Redshift Spectrum significantly beat ‘traditional’ Amazon Redshift performance. For our queries, using Parquet data format with Redshift Spectrum delivered an average 40% performance gain over traditional Amazon Redshift. Furthermore, Redshift Spectrum showed high consistency in execution time with a smaller difference between the slowest run and the fastest run.

Comparing the amount of data scanned when using CSV/GZIP and Parquet, the difference was also significant:

Data Scanned (GB)
CSV (Gzip)135.49
Parquet2.83

Because we pay only for the data scanned by Redshift Spectrum, the cost saving of using Parquet is evident and substantial.

Complex query

Next, we compared the same three configurations with a complex query.

Execution Time (seconds)
 Amazon RedshiftRedshift Spectrum CSVRedshift Spectrum Parquet
Run #1329.8084.2042.40
Run #2167.6065.3035.10
Run #3165.2062.2023.90
Run #4273.9074.9055.90
Run #5167.7069.0058.40
Average220.8471.1243.14

This time, Redshift Spectrum using Parquet cut the average query time by 80% compared to traditional Amazon Redshift!

Bottom line: For complex queries, Redshift Spectrum provided a 67% performance gain over Amazon Redshift. Using the Parquet data format, Redshift Spectrum delivered an 80% performance improvement over Amazon Redshift. For us, this was substantial.

Optimizing the data structure for different workloads

Because the cost of S3 is relatively inexpensive and we pay only for the data scanned by each query, we believe that it makes sense to keep our data in different formats for different workloads and different analytics engines. It is important to note that we can have any number of tables pointing to the same data on S3. It all depends on how we partition the data and update the table partitions.

Data permutations

For example, we have a process that runs every minute and generates statistics for the last minute of data collected. With Amazon Redshift, this would be done by running the query on the table with something as follows:

SELECT 
  user, 
  COUNT(*) 
FROM 
  events_table 
WHERE 
  ts BETWEEN ‘2017-08-01 14:00:00’ AND ‘2017-08-01 14:00:59’ 
GROUP BY 
  user;

(Assuming ‘ts’ is your column storing the time stamp for each event.)

With Redshift Spectrum, we pay for the data scanned in each query. If the data is partitioned by the minute instead of the hour, a query looking at one minute would be 1/60th the cost. If we use a temporary table that points only to the data of the last minute, we save that unnecessary cost.

Creating Parquet data efficiently

On the average, we have 800 instances that process our traffic. Each instance sends events that are eventually loaded into Amazon Redshift. When we started three years ago, we would offload data from each server to S3 and then perform a periodic copy command from S3 to Amazon Redshift.

Recently, Amazon Kinesis Firehose added the capability to offload data directly to Amazon Redshift. While this is now a viable option, we kept the same collection process that worked flawlessly and efficiently for three years.

This changed, however, when we incorporated Redshift Spectrum. With Redshift Spectrum, we needed to find a way to:

  • Collect the event data from the instances.
  • Save the data in Parquet format.
  • Partition the data effectively.

To accomplish this, we save the data as CSV and then transform it to Parquet. The most effective method to generate the Parquet files is to:

  1. Send the data in one-minute intervals from the instances to Kinesis Firehose with an S3 temporary bucket as the destination.
  2. Aggregate hourly data and convert it to Parquet using AWS Lambda and AWS Glue.
  3. Add the Parquet data to S3 by updating the table partitions.

With this new process, we had to give more attention to validating the data before we sent it to Kinesis Firehose, because a single corrupted record in a partition fails queries on that partition.

Data validation

To store our click data in a table, we considered the following SQL create table command:

create external TABLE spectrum.blog_clicks (
    user_id varchar(50),
    campaign_id varchar(50),
    os varchar(50),
    ua varchar(255),
    ts bigint,
    billing float
)
partitioned by (date date, hour smallint)  
stored as parquet
location 's3://nuviad-temp/blog/clicks/';

The above statement defines a new external table (all Redshift Spectrum tables are external tables) with a few attributes. We stored ‘ts’ as a Unix time stamp and not as Timestamp, and billing data is stored as float and not decimal (more on that later). We also said that the data is partitioned by date and hour, and then stored as Parquet on S3.

First, we need to get the table definitions. This can be achieved by running the following query:

SELECT 
  * 
FROM 
  svv_external_columns 
WHERE 
  tablename = 'blog_clicks';

This query lists all the columns in the table with their respective definitions:

schemanametablenamecolumnnameexternal_typecolumnnumpart_key
spectrumblog_clicksuser_idvarchar(50)10
spectrumblog_clickscampaign_idvarchar(50)20
spectrumblog_clicksosvarchar(50)30
spectrumblog_clicksuavarchar(255)40
spectrumblog_clickstsbigint50
spectrumblog_clicksbillingdouble60
spectrumblog_clicksdatedate71
spectrumblog_clickshoursmallint82

Now we can use this data to create a validation schema for our data:

const rtb_request_schema = {
    "name": "clicks",
    "items": {
        "user_id": {
            "type": "string",
            "max_length": 100
        },
        "campaign_id": {
            "type": "string",
            "max_length": 50
        },
        "os": {
            "type": "string",
            "max_length": 50            
        },
        "ua": {
            "type": "string",
            "max_length": 255            
        },
        "ts": {
            "type": "integer",
            "min_value": 0,
            "max_value": 9999999999999
        },
        "billing": {
            "type": "float",
            "min_value": 0,
            "max_value": 9999999999999
        }
    }
};

Next, we create a function that uses this schema to validate data:

function valueIsValid(value, item_schema) {
    if (schema.type == 'string') {
        return (typeof value == 'string' && value.length <= schema.max_length);
    }
    else if (schema.type == 'integer') {
        return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);
    }
    else if (schema.type == 'float' || schema.type == 'double') {
        return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);
    }
    else if (schema.type == 'boolean') {
        return typeof value == 'boolean';
    }
    else if (schema.type == 'timestamp') {
        return (new Date(value)).getTime() > 0;
    }
    else {
        return true;
    }
}

Near real-time data loading with Kinesis Firehose

On Kinesis Firehose, we created a new delivery stream to handle the events as follows:

Delivery stream name: events
Source: Direct PUT
S3 bucket: nuviad-events
S3 prefix: rtb/
IAM role: firehose_delivery_role_1
Data transformation: Disabled
Source record backup: Disabled
S3 buffer size (MB): 100
S3 buffer interval (sec): 60
S3 Compression: GZIP
S3 Encryption: No Encryption
Status: ACTIVE
Error logging: Enabled

This delivery stream aggregates event data every minute, or up to 100 MB, and writes the data to an S3 bucket as a CSV/GZIP compressed file. Next, after we have the data validated, we can safely send it to our Kinesis Firehose API:

if (validated) {
    let itemString = item.join('|')+'\n'; //Sending csv delimited by pipe and adding new line

    let params = {
        DeliveryStreamName: 'events',
        Record: {
            Data: itemString
        }
    };

    firehose.putRecord(params, function(err, data) {
        if (err) {
            console.error(err, err.stack);        
        }
        else {
            // Continue to your next step 
        }
    });
}

Now, we have a single CSV file representing one minute of event data stored in S3. The files are named automatically by Kinesis Firehose by adding a UTC time prefix in the format YYYY/MM/DD/HH before writing objects to S3. Because we use the date and hour as partitions, we need to change the file naming and location to fit our Redshift Spectrum schema.

Automating data distribution using AWS Lambda

We created a simple Lambda function triggered by an S3 put event that copies the file to a different location (or locations), while renaming it to fit our data structure and processing flow. As mentioned before, the files generated by Kinesis Firehose are structured in a pre-defined hierarchy, such as:

S3://your-bucket/your-prefix/2017/08/01/20/events-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz

All we need to do is parse the object name and restructure it as we see fit. In our case, we did the following (the event is an object received in the Lambda function with all the data about the object written to S3):

/*
	object key structure in the event object:
your-prefix/2017/08/01/20/event-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz
	*/

let key_parts = event.Records[0].s3.object.key.split('/'); 

let event_type = key_parts[0];
let date = key_parts[1] + '-' + key_parts[2] + '-' + key_parts[3];
let hour = key_parts[4];
if (hour.indexOf('0') == 0) {
 		hour = parseInt(hour, 10) + '';
}
    
let parts1 = key_parts[5].split('-');
let minute = parts1[7];
if (minute.indexOf('0') == 0) {
        minute = parseInt(minute, 10) + '';
}

Now, we can redistribute the file to the two destinations we need—one for the minute processing task and the other for hourly aggregation:

    copyObjectToHourlyFolder(event, date, hour, minute)
        .then(copyObjectToMinuteFolder.bind(null, event, date, hour, minute))
        .then(addPartitionToSpectrum.bind(null, event, date, hour, minute))
        .then(deleteOldMinuteObjects.bind(null, event))
        .then(deleteStreamObject.bind(null, event))        
        .then(result => {
            callback(null, { message: 'done' });            
        })
        .catch(err => {
            console.error(err);
            callback(null, { message: err });            
        }); 

Kinesis Firehose stores the data in a temporary folder. We copy the object to another folder that holds the data for the last processed minute. This folder is connected to a small Redshift Spectrum table where the data is being processed without needing to scan a much larger dataset. We also copy the data to a folder that holds the data for the entire hour, to be later aggregated and converted to Parquet.

Because we partition the data by date and hour, we created a new partition on the Redshift Spectrum table if the processed minute is the first minute in the hour (that is, minute 0). We ran the following:

ALTER TABLE 
  spectrum.events 
ADD partition
  (date='2017-08-01', hour=0) 
  LOCATION 's3://nuviad-temp/events/2017-08-01/0/';

After the data is processed and added to the table, we delete the processed data from the temporary Kinesis Firehose storage and from the minute storage folder.

Migrating CSV to Parquet using AWS Glue and Amazon EMR

The simplest way we found to run an hourly job converting our CSV data to Parquet is using Lambda and AWS Glue (and thanks to the awesome AWS Big Data team for their help with this).

Creating AWS Glue jobs

What this simple AWS Glue script does:

  • Gets parameters for the job, date, and hour to be processed
  • Creates a Spark EMR context allowing us to run Spark code
  • Reads CSV data into a DataFrame
  • Writes the data as Parquet to the destination S3 bucket
  • Adds or modifies the Redshift Spectrum / Amazon Athena table partition for the table
import sys
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME','day_partition_key', 'hour_partition_key', 'day_partition_value', 'hour_partition_value' ])

#day_partition_key = "partition_0"
#hour_partition_key = "partition_1"
#day_partition_value = "2017-08-01"
#hour_partition_value = "0"

day_partition_key = args['day_partition_key']
hour_partition_key = args['hour_partition_key']
day_partition_value = args['day_partition_value']
hour_partition_value = args['hour_partition_value']

print("Running for " + day_partition_value + "/" + hour_partition_value)

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

df = spark.read.option("delimiter","|").csv("s3://nuviad-temp/events/"+day_partition_value+"/"+hour_partition_value)
df.registerTempTable("data")

df1 = spark.sql("select _c0 as user_id, _c1 as campaign_id, _c2 as os, _c3 as ua, cast(_c4 as bigint) as ts, cast(_c5 as double) as billing from data")

df1.repartition(1).write.mode("overwrite").parquet("s3://nuviad-temp/parquet/"+day_partition_value+"/hour="+hour_partition_value)

client = boto3.client('athena', region_name='us-east-1')

response = client.start_query_execution(
    QueryString='alter table parquet_events add if not exists partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ')  location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' ,
    QueryExecutionContext={
        'Database': 'spectrumdb'
    },
    ResultConfiguration={
        'OutputLocation': 's3://nuviad-temp/convertresults'
    }
)

response = client.start_query_execution(
    QueryString='alter table parquet_events partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ') set location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' ,
    QueryExecutionContext={
        'Database': 'spectrumdb'
    },
    ResultConfiguration={
        'OutputLocation': 's3://nuviad-temp/convertresults'
    }
)

job.commit()

Note: Because Redshift Spectrum and Athena both use the AWS Glue Data Catalog, we could use the Athena client to add the partition to the table.

Here are a few words about float, decimal, and double. Using decimal proved to be more challenging than we expected, as it seems that Redshift Spectrum and Spark use them differently. Whenever we used decimal in Redshift Spectrum and in Spark, we kept getting errors, such as:

S3 Query Exception (Fetch). Task failed due to an internal error. File 'https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parquet has an incompatible Parquet schema for column 's3://nuviad-events/events.lat'. Column type: DECIMAL(18, 8), Parquet schema:\noptional float lat [i:4 d:1 r:0]\n (https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parq

We had to experiment with a few floating-point formats until we found that the only combination that worked was to define the column as double in the Spark code and float in Spectrum. This is the reason you see billing defined as float in Spectrum and double in the Spark code.

Creating a Lambda function to trigger conversion

Next, we created a simple Lambda function to trigger the AWS Glue script hourly using a simple Python code:

import boto3
import json
from datetime import datetime, timedelta
 
client = boto3.client('glue')
 
def lambda_handler(event, context):
    last_hour_date_time = datetime.now() - timedelta(hours = 1)
    day_partition_value = last_hour_date_time.strftime("%Y-%m-%d") 
    hour_partition_value = last_hour_date_time.strftime("%-H") 
    response = client.start_job_run(
    JobName='convertEventsParquetHourly',
    Arguments={
         '--day_partition_key': 'date',
         '--hour_partition_key': 'hour',
         '--day_partition_value': day_partition_value,
         '--hour_partition_value': hour_partition_value
         }
    )

Using Amazon CloudWatch Events, we trigger this function hourly. This function triggers an AWS Glue job named ‘convertEventsParquetHourly’ and runs it for the previous hour, passing job names and values of the partitions to process to AWS Glue.

Redshift Spectrum and Node.js

Our development stack is based on Node.js, which is well-suited for high-speed, light servers that need to process a huge number of transactions. However, a few limitations of the Node.js environment required us to create workarounds and use other tools to complete the process.

Node.js and Parquet

The lack of Parquet modules for Node.js required us to implement an AWS Glue/Amazon EMR process to effectively migrate data from CSV to Parquet. We would rather save directly to Parquet, but we couldn’t find an effective way to do it.

One interesting project in the works is the development of a Parquet NPM by Marc Vertes called node-parquet (https://www.npmjs.com/package/node-parquet). It is not in a production state yet, but we think it would be well worth following the progress of this package.

Timestamp data type

According to the Parquet documentation, Timestamp data are stored in Parquet as 64-bit integers. However, JavaScript does not support 64-bit integers, because the native number type is a 64-bit double, giving only 53 bits of integer range.

The result is that you cannot store Timestamp correctly in Parquet using Node.js. The solution is to store Timestamp as string and cast the type to Timestamp in the query. Using this method, we did not witness any performance degradation whatsoever.

Lessons learned

You can benefit from our trial-and-error experience.

Lesson #1: Data validation is critical

As mentioned earlier, a single corrupt entry in a partition can fail queries running against this partition, especially when using Parquet, which is harder to edit than a simple CSV file. Make sure that you validate your data before scanning it with Redshift Spectrum.

Lesson #2: Structure and partition data effectively

One of the biggest benefits of using Redshift Spectrum (or Athena for that matter) is that you don’t need to keep nodes up and running all the time. You pay only for the queries you perform and only for the data scanned per query.

Keeping different permutations of your data for different queries makes a lot of sense in this case. For example, you can partition your data by date and hour to run time-based queries, and also have another set partitioned by user_id and date to run user-based queries. This results in faster and more efficient performance of your data warehouse.

Storing data in the right format

Use Parquet whenever you can. The benefits of Parquet are substantial. Faster performance, less data to scan, and much more efficient columnar format. However, it is not supported out-of-the-box by Kinesis Firehose, so you need to implement your own ETL. AWS Glue is a great option.

Creating small tables for frequent tasks

When we started using Redshift Spectrum, we saw our Amazon Redshift costs jump by hundreds of dollars per day. Then we realized that we were unnecessarily scanning a full day’s worth of data every minute. Take advantage of the ability to define multiple tables on the same S3 bucket or folder, and create temporary and small tables for frequent queries.

Lesson #3: Combine Athena and Redshift Spectrum for optimal performance

Moving to Redshift Spectrum also allowed us to take advantage of Athena as both use the AWS Glue Data Catalog. Run fast and simple queries using Athena while taking advantage of the advanced Amazon Redshift query engine for complex queries using Redshift Spectrum.

Redshift Spectrum excels when running complex queries. It can push many compute-intensive tasks, such as predicate filtering and aggregation, down to the Redshift Spectrum layer, so that queries use much less of your cluster’s processing capacity.

Lesson #4: Sort your Parquet data within the partition

We achieved another performance improvement by sorting data within the partition using sortWithinPartitions(sort_field). For example:

df.repartition(1).sortWithinPartitions("campaign_id")…

Conclusion

We were extremely pleased with using Amazon Redshift as our core data warehouse for over three years. But as our client base and volume of data grew substantially, we extended Amazon Redshift to take advantage of scalability, performance, and cost with Redshift Spectrum.

Redshift Spectrum lets us scale to virtually unlimited storage, scale compute transparently, and deliver super-fast results for our users. With Redshift Spectrum, we store data where we want at the cost we want, and have the data available for analytics when our users need it with the performance they expect.


About the Author

With 7 years of experience in the AdTech industry and 15 years in leading technology companies, Rafi Ton is the founder and CEO of NUVIAD. He enjoys exploring new technologies and putting them to use in cutting edge products and services, in the real world generating real money. Being an experienced entrepreneur, Rafi believes in practical-programming and fast adaptation of new technologies to achieve a significant market advantage.